Module Gnumed.pycommon.gmPsql

Expand source code
# A Python class to replace the PSQL command-line interpreter
# NOTE: this is not a full replacement for the interpeter, merely
# enough functionality to run gnumed installation scripts
#
# Copyright (C) 2003, 2004 - 2010 GNUmed developers
# Licence: GPL v2 or later
#===================================================================
__author__ = "Ian Haywood"
__license__ = "GPL v2 or later (details at https://www.gnu.org)"

# stdlib
import sys
import os
import re
import logging


_log = logging.getLogger('gm.bootstrapper')

unformattable_error_id = 12345

#===================================================================
class Psql:

        def __init__ (self, conn):
                """
                db : the interpreter to connect to, must be a DBAPI compliant interface
                """
                self.conn = conn
                self.vars = {'ON_ERROR_STOP': None}

        #---------------------------------------------------------------
        def match(self, pattern):
                match = re.match(pattern, self.line)
                if match is None:
                        return 0

                self.groups = match.groups()
                return 1

        #---------------------------------------------------------------
        def fmt_msg(self, aMsg):
                try:
                        tmp = "%s:%d: %s" % (self.filename, self.lineno-1, aMsg)
                        tmp = tmp.replace('\r', '')
                        tmp = tmp.replace('\n', '')
                except UnicodeDecodeError:
                        global unformattable_error_id
                        tmp = "%s:%d: <cannot str(msg), printing on console with ID [#%d]>" % (self.filename, self.lineno-1, unformattable_error_id)
                        try:
                                print('ERROR: GNUmed bootstrap #%d:' % unformattable_error_id)
                                print(aMsg)
                        except Exception: pass
                        unformattable_error_id += 1
                return tmp

        #---------------------------------------------------------------
        def run (self, filename):
                """
                filename: a file, containing semicolon-separated SQL commands
                """
                _log.debug('processing [%s]', filename)
                curs = self.conn.cursor()
                curs.execute('show session authorization')
                start_auth = curs.fetchall()[0][0]
                curs.close()
                _log.debug('session auth: %s', start_auth)

                if os.access (filename, os.R_OK):
                        sql_file = open(filename, mode = 'rt', encoding = 'utf-8-sig')
                else:
                        _log.error("cannot open file [%s]", filename)
                        return 1

                self.lineno = 0
                self.filename = filename
                in_string = False
                bracketlevel = 0
                curr_cmd = ''
                curs = self.conn.cursor()

                for self.line in sql_file:
                        self.lineno += 1
                        if len(self.line.strip()) == 0:
                                continue

                        # \set
                        if self.match(r"^\\set (\S+) (\S+)"):
                                _log.debug('"\set" found: %s', self.groups)
                                self.vars[self.groups[0]] = self.groups[1]
                                if self.groups[0] == 'ON_ERROR_STOP':
                                        # adjusting from string to int so that "1" -> 1 -> True
                                        self.vars['ON_ERROR_STOP'] = int(self.vars['ON_ERROR_STOP'])
                                continue

                        # \unset
                        if self.match (r"^\\unset (\S+)"):
                                self.vars[self.groups[0]] = None
                                continue

                        # other '\' commands
                        if self.match (r"^\\(.*)") and not in_string:
                                # most other \ commands are for controlling output formats, don't make
                                # much sense in an installation script, so we gently ignore them
                                _log.warning(self.fmt_msg("psql command \"\\%s\" being ignored " % self.groups[0]))
                                continue

                        # non-'\' commands
                        this_char = self.line[0]
                        # loop over characters in line
                        for next_char in self.line[1:] + ' ':

                                # start/end of string detected
                                if this_char == "'":
                                        in_string = not in_string

                                # detect "--"-style comments
                                if this_char == '-' and next_char == '-' and not in_string:
                                        break

                                # detect bracketing
                                if this_char == '(' and not in_string:
                                        bracketlevel += 1
                                if this_char == ')' and not in_string:
                                        bracketlevel -= 1

                                # have we:
                                # - found end of command ?
                                # - are not inside a string ?
                                # - are not inside bracket pair ?
                                if not ((in_string is False) and (bracketlevel == 0) and (this_char == ';')):
                                        curr_cmd += this_char
                                else:
                                        if curr_cmd.strip() != '':
                                                try:
                                                        curs.execute(curr_cmd)
                                                        try:
                                                                data = curs.fetchall()
                                                                _log.debug('cursor data: %s', data)
                                                        except Exception:       # actually: psycopg2.ProgrammingError but no handle
                                                                pass
                                                except Exception as error:
                                                        _log.exception(curr_cmd)
                                                        if re.match(r"^NOTICE:.*", str(error)):
                                                                _log.warning(self.fmt_msg(error))
                                                        else:
                                                                _log.error(self.fmt_msg(error))
                                                                if hasattr(error, 'diag'):
                                                                        for prop in dir(error.diag):                    # pylint: disable=no-member
                                                                                if prop.startswith('__'):
                                                                                        continue
                                                                                val = getattr(error.diag, prop)         # pylint: disable=no-member
                                                                                if val is None:
                                                                                        continue
                                                                                _log.error('PG diags %s: %s', prop, val)
                                                                if self.vars['ON_ERROR_STOP']:
                                                                        self.conn.commit()
                                                                        curs.close()
                                                                        return 1

                                        self.conn.commit()
                                        curs.close()
                                        curs = self.conn.cursor()
                                        curr_cmd = ''

                                this_char = next_char
                        # end of loop over chars

                # end of loop over lines
                self.conn.commit()
                curs.execute('show session authorization')
                end_auth = curs.fetchall()[0][0]
                curs.close()
                _log.debug('session auth after sql file processing: %s', end_auth)
                if start_auth != end_auth:
                        _log.error('session auth changed before/after processing sql file')

                return 0

#===================================================================
# testing code
if __name__ == '__main__':

        if len(sys.argv) < 2:
                sys.exit()

        if sys.argv[1] != 'test':
                sys.exit()

        #conn = PgSQL.connect(user='gm-dbo', database = 'gnumed')
        #psql = Psql(conn)
        #psql.run(sys.argv[1])
        #conn.close()

Classes

class Psql (conn)

db : the interpreter to connect to, must be a DBAPI compliant interface

Expand source code
class Psql:

        def __init__ (self, conn):
                """
                db : the interpreter to connect to, must be a DBAPI compliant interface
                """
                self.conn = conn
                self.vars = {'ON_ERROR_STOP': None}

        #---------------------------------------------------------------
        def match(self, pattern):
                match = re.match(pattern, self.line)
                if match is None:
                        return 0

                self.groups = match.groups()
                return 1

        #---------------------------------------------------------------
        def fmt_msg(self, aMsg):
                try:
                        tmp = "%s:%d: %s" % (self.filename, self.lineno-1, aMsg)
                        tmp = tmp.replace('\r', '')
                        tmp = tmp.replace('\n', '')
                except UnicodeDecodeError:
                        global unformattable_error_id
                        tmp = "%s:%d: <cannot str(msg), printing on console with ID [#%d]>" % (self.filename, self.lineno-1, unformattable_error_id)
                        try:
                                print('ERROR: GNUmed bootstrap #%d:' % unformattable_error_id)
                                print(aMsg)
                        except Exception: pass
                        unformattable_error_id += 1
                return tmp

        #---------------------------------------------------------------
        def run (self, filename):
                """
                filename: a file, containing semicolon-separated SQL commands
                """
                _log.debug('processing [%s]', filename)
                curs = self.conn.cursor()
                curs.execute('show session authorization')
                start_auth = curs.fetchall()[0][0]
                curs.close()
                _log.debug('session auth: %s', start_auth)

                if os.access (filename, os.R_OK):
                        sql_file = open(filename, mode = 'rt', encoding = 'utf-8-sig')
                else:
                        _log.error("cannot open file [%s]", filename)
                        return 1

                self.lineno = 0
                self.filename = filename
                in_string = False
                bracketlevel = 0
                curr_cmd = ''
                curs = self.conn.cursor()

                for self.line in sql_file:
                        self.lineno += 1
                        if len(self.line.strip()) == 0:
                                continue

                        # \set
                        if self.match(r"^\\set (\S+) (\S+)"):
                                _log.debug('"\set" found: %s', self.groups)
                                self.vars[self.groups[0]] = self.groups[1]
                                if self.groups[0] == 'ON_ERROR_STOP':
                                        # adjusting from string to int so that "1" -> 1 -> True
                                        self.vars['ON_ERROR_STOP'] = int(self.vars['ON_ERROR_STOP'])
                                continue

                        # \unset
                        if self.match (r"^\\unset (\S+)"):
                                self.vars[self.groups[0]] = None
                                continue

                        # other '\' commands
                        if self.match (r"^\\(.*)") and not in_string:
                                # most other \ commands are for controlling output formats, don't make
                                # much sense in an installation script, so we gently ignore them
                                _log.warning(self.fmt_msg("psql command \"\\%s\" being ignored " % self.groups[0]))
                                continue

                        # non-'\' commands
                        this_char = self.line[0]
                        # loop over characters in line
                        for next_char in self.line[1:] + ' ':

                                # start/end of string detected
                                if this_char == "'":
                                        in_string = not in_string

                                # detect "--"-style comments
                                if this_char == '-' and next_char == '-' and not in_string:
                                        break

                                # detect bracketing
                                if this_char == '(' and not in_string:
                                        bracketlevel += 1
                                if this_char == ')' and not in_string:
                                        bracketlevel -= 1

                                # have we:
                                # - found end of command ?
                                # - are not inside a string ?
                                # - are not inside bracket pair ?
                                if not ((in_string is False) and (bracketlevel == 0) and (this_char == ';')):
                                        curr_cmd += this_char
                                else:
                                        if curr_cmd.strip() != '':
                                                try:
                                                        curs.execute(curr_cmd)
                                                        try:
                                                                data = curs.fetchall()
                                                                _log.debug('cursor data: %s', data)
                                                        except Exception:       # actually: psycopg2.ProgrammingError but no handle
                                                                pass
                                                except Exception as error:
                                                        _log.exception(curr_cmd)
                                                        if re.match(r"^NOTICE:.*", str(error)):
                                                                _log.warning(self.fmt_msg(error))
                                                        else:
                                                                _log.error(self.fmt_msg(error))
                                                                if hasattr(error, 'diag'):
                                                                        for prop in dir(error.diag):                    # pylint: disable=no-member
                                                                                if prop.startswith('__'):
                                                                                        continue
                                                                                val = getattr(error.diag, prop)         # pylint: disable=no-member
                                                                                if val is None:
                                                                                        continue
                                                                                _log.error('PG diags %s: %s', prop, val)
                                                                if self.vars['ON_ERROR_STOP']:
                                                                        self.conn.commit()
                                                                        curs.close()
                                                                        return 1

                                        self.conn.commit()
                                        curs.close()
                                        curs = self.conn.cursor()
                                        curr_cmd = ''

                                this_char = next_char
                        # end of loop over chars

                # end of loop over lines
                self.conn.commit()
                curs.execute('show session authorization')
                end_auth = curs.fetchall()[0][0]
                curs.close()
                _log.debug('session auth after sql file processing: %s', end_auth)
                if start_auth != end_auth:
                        _log.error('session auth changed before/after processing sql file')

                return 0

Methods

def fmt_msg(self, aMsg)
Expand source code
def fmt_msg(self, aMsg):
        try:
                tmp = "%s:%d: %s" % (self.filename, self.lineno-1, aMsg)
                tmp = tmp.replace('\r', '')
                tmp = tmp.replace('\n', '')
        except UnicodeDecodeError:
                global unformattable_error_id
                tmp = "%s:%d: <cannot str(msg), printing on console with ID [#%d]>" % (self.filename, self.lineno-1, unformattable_error_id)
                try:
                        print('ERROR: GNUmed bootstrap #%d:' % unformattable_error_id)
                        print(aMsg)
                except Exception: pass
                unformattable_error_id += 1
        return tmp
def match(self, pattern)
Expand source code
def match(self, pattern):
        match = re.match(pattern, self.line)
        if match is None:
                return 0

        self.groups = match.groups()
        return 1
def run(self, filename)

filename: a file, containing semicolon-separated SQL commands

Expand source code
def run (self, filename):
        """
        filename: a file, containing semicolon-separated SQL commands
        """
        _log.debug('processing [%s]', filename)
        curs = self.conn.cursor()
        curs.execute('show session authorization')
        start_auth = curs.fetchall()[0][0]
        curs.close()
        _log.debug('session auth: %s', start_auth)

        if os.access (filename, os.R_OK):
                sql_file = open(filename, mode = 'rt', encoding = 'utf-8-sig')
        else:
                _log.error("cannot open file [%s]", filename)
                return 1

        self.lineno = 0
        self.filename = filename
        in_string = False
        bracketlevel = 0
        curr_cmd = ''
        curs = self.conn.cursor()

        for self.line in sql_file:
                self.lineno += 1
                if len(self.line.strip()) == 0:
                        continue

                # \set
                if self.match(r"^\\set (\S+) (\S+)"):
                        _log.debug('"\set" found: %s', self.groups)
                        self.vars[self.groups[0]] = self.groups[1]
                        if self.groups[0] == 'ON_ERROR_STOP':
                                # adjusting from string to int so that "1" -> 1 -> True
                                self.vars['ON_ERROR_STOP'] = int(self.vars['ON_ERROR_STOP'])
                        continue

                # \unset
                if self.match (r"^\\unset (\S+)"):
                        self.vars[self.groups[0]] = None
                        continue

                # other '\' commands
                if self.match (r"^\\(.*)") and not in_string:
                        # most other \ commands are for controlling output formats, don't make
                        # much sense in an installation script, so we gently ignore them
                        _log.warning(self.fmt_msg("psql command \"\\%s\" being ignored " % self.groups[0]))
                        continue

                # non-'\' commands
                this_char = self.line[0]
                # loop over characters in line
                for next_char in self.line[1:] + ' ':

                        # start/end of string detected
                        if this_char == "'":
                                in_string = not in_string

                        # detect "--"-style comments
                        if this_char == '-' and next_char == '-' and not in_string:
                                break

                        # detect bracketing
                        if this_char == '(' and not in_string:
                                bracketlevel += 1
                        if this_char == ')' and not in_string:
                                bracketlevel -= 1

                        # have we:
                        # - found end of command ?
                        # - are not inside a string ?
                        # - are not inside bracket pair ?
                        if not ((in_string is False) and (bracketlevel == 0) and (this_char == ';')):
                                curr_cmd += this_char
                        else:
                                if curr_cmd.strip() != '':
                                        try:
                                                curs.execute(curr_cmd)
                                                try:
                                                        data = curs.fetchall()
                                                        _log.debug('cursor data: %s', data)
                                                except Exception:       # actually: psycopg2.ProgrammingError but no handle
                                                        pass
                                        except Exception as error:
                                                _log.exception(curr_cmd)
                                                if re.match(r"^NOTICE:.*", str(error)):
                                                        _log.warning(self.fmt_msg(error))
                                                else:
                                                        _log.error(self.fmt_msg(error))
                                                        if hasattr(error, 'diag'):
                                                                for prop in dir(error.diag):                    # pylint: disable=no-member
                                                                        if prop.startswith('__'):
                                                                                continue
                                                                        val = getattr(error.diag, prop)         # pylint: disable=no-member
                                                                        if val is None:
                                                                                continue
                                                                        _log.error('PG diags %s: %s', prop, val)
                                                        if self.vars['ON_ERROR_STOP']:
                                                                self.conn.commit()
                                                                curs.close()
                                                                return 1

                                self.conn.commit()
                                curs.close()
                                curs = self.conn.cursor()
                                curr_cmd = ''

                        this_char = next_char
                # end of loop over chars

        # end of loop over lines
        self.conn.commit()
        curs.execute('show session authorization')
        end_auth = curs.fetchall()[0][0]
        curs.close()
        _log.debug('session auth after sql file processing: %s', end_auth)
        if start_auth != end_auth:
                _log.error('session auth changed before/after processing sql file')

        return 0