Module Gnumed.pycommon.gmConnectionPool

GNUmed connection pooler.

Currently, only readonly connections are pooled.

This pool is (supposedly) thread safe.

Functions

def exception_is_connection_loss(exc: Exception) ‑> bool
Expand source code
def exception_is_connection_loss(exc: Exception) -> bool:
        """Checks whether exception represents connection loss."""
        if not isinstance(exc, psycopg2.Error):
                # not a PG/psycopg2 exception
                return False

        try:
                if isinstance(exc, psycopg2.errors.AdminShutdown):
                        _log.debug('indicates connection loss due to admin shutdown')
                        return True

        except AttributeError:  # psycopg2 2.7/2.8 transition (no AdminShutdown exception)
                pass
        try:
                msg = '%s' % exc.args[0]
        except (AttributeError, IndexError, TypeError):
                _log.debug('cannot extract message from exception')
                return False

        _log.debug('interpreting: %s', msg)
        for snippet in _connection_loss_markers:
                if snippet in msg:
                        _log.debug('indicates connection loss')
                        return True

        is_conn_loss = (
                # OperationalError
                ('erver' in msg)
                        and
                (
                        ('terminat' in msg)
                                or
                        ('abnorm' in msg)
                                or
                        ('end' in msg)
                                or
                        ('no route' in msg)
                )
        ) or (
                # InterfaceError
                ('onnect' in msg)
                        and
                (
                        ('close' in msg)
                                or
                        ('end' in msg)
                )
        )
        if is_conn_loss:
                _log.debug('indicates connection loss')
        return is_conn_loss

Checks whether exception represents connection loss.

def log_conn_state(conn: psycopg2.extras.DictConnection) ‑> None
Expand source code
def log_conn_state(conn:psycopg2.extras.DictConnection) -> None:
        """Log details about a DB-API connection."""
        tx_status = conn.get_transaction_status()
        if tx_status in [ psycopg2.extensions.TRANSACTION_STATUS_INERROR, psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN ]:
                isolation_level = '%s (tx aborted or unknown, cannot retrieve)' % conn.isolation_level
        else:
                isolation_level = '%s (%s)' % (conn.isolation_level, _map_psyco_iso_level2str[conn.isolation_level])
        conn_status = '%s (%s)' % (conn.status, _map_psyco_conn_status2str[conn.status])
        if conn.closed != 0:
                conn_status = 'undefined (%s)' % conn_status
                backend_pid = '<conn closed, cannot retrieve>'
        else:
                backend_pid = str(conn.get_backend_pid())
        try:
                conn_deferrable = str(conn.deferrable)
        except AttributeError:
                conn_deferrable = '<unavailable>'
        d = {
                'identity': id(conn),
                'backend PID': backend_pid,
                'protocol version': conn.protocol_version,
                'encoding': conn.encoding,
                'closed': conn.closed,
                'readonly': conn.readonly,
                'autocommit': conn.autocommit,
                'isolation level (psyco)': isolation_level,
                'async': conn.async_,
                'deferrable': conn_deferrable,
                'transaction status': '%s (%s)' % (tx_status, _map_psyco_tx_status2str[tx_status]),
                'connection status': conn_status,
                'executing async op': conn.isexecuting(),
                'type': type(conn)
        }
        _log.debug(conn)
        for key in d:
                _log.debug('%s: %s', key, d[key])

Log details about a DB-API connection.

def log_cursor_state(cursor) ‑> None
Expand source code
def log_cursor_state(cursor) -> None:
        """Log details about a DB-API cursor."""
        if cursor is None:
                _log.debug('cursor: None')
                return

        conn = cursor.connection
        tx_status = conn.get_transaction_status()
        if tx_status in [ psycopg2.extensions.TRANSACTION_STATUS_INERROR, psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN ]:
                isolation_level = '<tx aborted or unknown, cannot retrieve>'
        else:
                isolation_level = conn.isolation_level
        try:
                conn_deferrable = conn.deferrable
        except AttributeError:
                conn_deferrable = '<unavailable>'
        if cursor.query is None:
                query = '<no query>'
        else:
                query = cursor.query.decode(errors = 'replace')
        if conn.closed != 0:
                backend_pid = '<conn closed, cannot retrieve>'
        else:
                backend_pid = conn.get_backend_pid()
        txt = """Cursor
 identity: %s; name: %s
 closed: %s; scrollable: %s; with hold: %s; arraysize: %s; itersize: %s;
 last rowcount: %s; rownumber: %s; lastrowid (OID): %s;
 last description: %s
 statusmessage: %s
Connection
 identity: %s; backend pid: %s; protocol version: %s;
 closed: %s; autocommit: %s; isolation level: %s; encoding: %s; async: %s; deferrable: %s; readonly: %s;
 TX status: %s; CX status: %s; executing async op: %s;
Query
 %s""" % (
                # cursor level:
                id(cursor),
                cursor.name,
                cursor.closed,
                cursor.scrollable,
                cursor.withhold,
                cursor.arraysize,
                cursor.itersize,
                cursor.rowcount,
                cursor.rownumber,
                cursor.lastrowid,
                cursor.description,
                cursor.statusmessage,
                # connection level:
                id(conn),
                backend_pid,
                conn.protocol_version,
                conn.closed,
                conn.autocommit,
                isolation_level,
                conn.encoding,
                conn.async_,
                conn_deferrable,
                conn.readonly,
                _map_psyco_tx_status2str[tx_status],
                _map_psyco_conn_status2str[conn.status],
                conn.isexecuting(),
                # query level:
                query
        )
        gmLog2.log_multiline(logging.DEBUG, message = 'Link state:', line_prefix = '', text = txt)

Log details about a DB-API cursor.

def log_pg_exception_details(exc: Exception) ‑> bool
Expand source code
def log_pg_exception_details(exc: Exception) -> bool:
        """Logs details from a database exception."""
        if not isinstance(exc, psycopg2.Error):
                return False

        _log.error(type(exc))
        try:
                for arg in exc.args:
                        _log.debug('exc.arg: %s', arg)
        except AttributeError:
                _log.debug('exception has no <.args>')
        _log.debug('pgerror: [%s]', exc.pgerror)
        if exc.pgcode is None:
                _log.debug('pgcode : %s', exc.pgcode)
        else:
                _log.debug('pgcode : %s (%s)', exc.pgcode, PG_error_codes.lookup(exc.pgcode))
        log_cursor_state(exc.cursor)
        try:
                diags = exc.diag
        except AttributeError:
                _log.debug('<.diag> not available')
                diags = None
        if diags is None:
                return True

        for attr in dir(diags):
                if attr.startswith('__'):
                        continue
                val = getattr(diags, attr)
                if val is None:
                        continue
                _log.debug('%s: %s', attr, val)
        return True

Logs details from a database exception.

def log_pg_settings(curs) ‑> bool
Expand source code
def log_pg_settings(curs) -> bool:
        """Log PostgreSQL server settings."""
        # config settings
        try:
                curs.execute('SELECT * FROM pg_settings')
        except psycopg2.Error:
                _log.exception('cannot retrieve PG settings ("SELECT ... FROM pg_settings" failed)')
                return False

        settings = curs.fetchall()
        if settings:
                for setting in settings:
                        if setting['unit'] is None:
                                unit = ''
                        else:
                                unit = ' %s' % setting['unit']
                        if setting['sourcefile'] is None:
                                sfile = ''
                        else:
                                sfile = '// %s @ %s' % (setting['sourcefile'], setting['sourceline'])
                        pending_restart = u''
                        try:
                                if setting['pending_restart']:
                                        pending_restart = u'// needs restart'
                        except KeyError:
                                pass    # 'pending_restart' does not exist in PG 9.4 yet
                        _log.debug('%s: %s%s (set from: [%s] // session RESET will set to: [%s]%s%s)',
                                setting['name'],
                                setting['setting'],
                                unit,
                                setting['source'],
                                setting['reset_val'],
                                pending_restart,
                                sfile
                        )
        # extensions
        try:
                curs.execute('select pg_available_extensions()')
        except Exception:
                _log.exception('cannot log available PG extensions')
                return False

        extensions = curs.fetchall()
        if extensions:
                for ext in extensions:
                        _log.debug('PG extension: %s', ext['pg_available_extensions'])
        else:
                _log.error('no PG extensions available')
        # log pg_config -- can only be read by superusers :-/
        # database collation
        try:
                curs.execute('SELECT *, pg_database_collation_actual_version(oid), pg_encoding_to_char(encoding) FROM pg_database WHERE datname = current_database()')
        except psycopg2.Error:
                _log.exception('cannot log actual collation version (probably PG < 15)')
                curs.execute('SELECT * FROM pg_database WHERE datname = current_database()')
        config = curs.fetchall()
        gmLog2.log_multiline(10, message = 'PG database config', text = gmTools.format_dict_like(dict(config[0]), tabular = True))
        return True

Log PostgreSQL server settings.

def log_role_permissions(curs, role: str = None)
Expand source code
def log_role_permissions(curs, role:str=None):
        """Log permissions for role."""

        if role:
                SQL = SQL__get_permissions_for_role_name % {'role_name': role}
                msg = 'permissions for role [%s]:' % role
        else:
                SQL = SQL__get_permissions_for_current_role
                msg = 'permissions for role [current_user]:'
        try:
                curs.execute(SQL)
        except psycopg2.Error:
                _log.exception('cannot retrieve permissions')
                return

        perms = curs.fetchall()
        if not perms:
                _log.debug('no permissions')
                return

        gmLog2.log_multiline (
                message = msg,
                line_prefix = ' ',
                text = [ '%(privilege_type)10s  ON  %(name_1)s.%(name_2)s.%(name_3)s (%(object_type)s)' % p for p in perms ]
        )

Log permissions for role.

Classes

class cAdapterPyDateTime (dt)
Expand source code
class cAdapterPyDateTime(object):

        def __init__(self, dt):
                if dt.tzinfo is None:
                        raise ValueError('datetime.datetime instance is lacking a time zone: [%s]' % _timestamp_template % dt.isoformat())
                self.__dt = dt

        def getquoted(self):
                return _timestamp_template % self.__dt.isoformat()

Methods

def getquoted(self)
Expand source code
def getquoted(self):
        return _timestamp_template % self.__dt.isoformat()
class cAuthenticationError (creds=None, prev_val=None)
Expand source code
class cAuthenticationError(psycopg2.OperationalError):

        def __init__(self, creds=None, prev_val=None):
                self.creds = creds
                self.prev_val = prev_val

        def __str__(self):
                return 'PostgreSQL: %sDSN: %s' % (self.prev_val, self.creds)

Error related to database operation (disconnect, memory allocation etc).

Ancestors

  • psycopg2.OperationalError
  • psycopg2.DatabaseError
  • psycopg2.Error
  • builtins.Exception
  • builtins.BaseException
class cPGCredentials
Expand source code
class cPGCredentials:
        """Holds PostgreSQL credentials"""

        def __init__(self) -> None:
                self.__host:str = None                  # None: left out -> defaults to $PGHOST or implicit <localhost>
                self.__port:int = None                  # None: left out -> defaults to $PGPORT or libpq compiled-in default (typically 5432)
                self.__database:str = None              # must be set before connecting
                self.__user:str = None                  # must be set before connecting
                self.__password:str = None              # None: left out
                                                                                # -> try password-less connect (TRUST/IDENT/PEER)
                                                                                # -> try connect with password from <passfile> parameter or $PGPASSFILE or ~/.pgpass

        #--------------------------------------------------
        # properties
        #--------------------------------------------------
        def __format_credentials(self) -> str:
                """Database credentials formatted as string."""
                cred_parts = [
                        'dbname=%s' % self.__database,
                        'host=%s' % self.__host,
                        'port=%s' % self.__port,
                        'user=%s' % self.__user
                ]
                return ' '.join(cred_parts)

        formatted_credentials = property(__format_credentials)

        #--------------------------------------------------
        def generate_credentials_kwargs(self, connection_name:str=None) -> dict:
                """Return dictionary with credentials suitable for psycopg2.connection() keyword arguments."""
                assert (self.__database is not None), 'self.__database must be defined'
                assert (self.__user is not None), 'self.__user must be defined'

                kwargs = {
                        'dbname': self.__database,
                        'user': self.__user,
                        'application_name': gmTools.coalesce(connection_name, 'GNUmed'),
                        'fallback_application_name': 'GNUmed',
                        'sslmode': 'prefer',
                        # try to enforce a useful encoding early on so that we
                        # have a good chance of decoding authentication errors
                        # containing foreign language characters
                        'client_encoding': 'UTF8'
                }
                if self.__host is not None:
                        kwargs['host'] = self.__host
                if self.__port is not None:
                        kwargs['port'] = self.__port
                if self.__password is not None:
                        kwargs['password'] = self.__password
                return kwargs

        credentials_kwargs = property(generate_credentials_kwargs)

        #--------------------------------------------------
        def _get_database(self) -> str:
                return self.__database

        def _set_database(self, database:str=None):
                assert database, '<database> must not be None'
                assert database.strip(), '<database> must not be empty'
                assert ('salaam.homeunix' not in database), 'The public database is not hosted by <salaam.homeunix.com> anymore.\n\nPlease point your configuration files to <publicdb.gnumed.de>.'
                self.__database = database.strip()
                _log.info('[%s]', self.__database)

        database = property(_get_database, _set_database)

        #--------------------------------------------------
        def _get_host(self) -> str:
                return self.__host

        def _set_host(self, host:str=None):
                if host is not None:
                        host = host.strip()
                        if host == '':
                                host = None
                self.__host = host
                _log.info('[%s]', self.__host)

        host = property(_get_host, _set_host)

        #--------------------------------------------------
        def _get_port(self) -> int:
                return self.__port

        def _set_port(self, port=None):
                _log.info('[%s]', port)
                if port is None:
                        self.__port = None
                        return

                self.__port = int(port)

        port = property(_get_port, _set_port)

        #--------------------------------------------------
        def _get_user(self) -> str:
                return self.__user

        def _set_user(self, user:str=None):
                assert (user is not None), '<user> must not be None'
                assert (user.strip() != ''), '<user> must not be empty'
                self.__user = user.strip()
                _log.info('[%s]', self.__user)

        user = property(_get_user, _set_user)

        #--------------------------------------------------
        def _get_password(self) -> str:
                return self.__password

        def _set_password(self, password:str=None):
                if password is not None:
                        gmLog2.add_word2hide(password)
                self.__password = password
                _log.info('password was set')

        password = property(_get_password, _set_password)

Holds PostgreSQL credentials

Instance variables

prop credentials_kwargs : dict
Expand source code
def generate_credentials_kwargs(self, connection_name:str=None) -> dict:
        """Return dictionary with credentials suitable for psycopg2.connection() keyword arguments."""
        assert (self.__database is not None), 'self.__database must be defined'
        assert (self.__user is not None), 'self.__user must be defined'

        kwargs = {
                'dbname': self.__database,
                'user': self.__user,
                'application_name': gmTools.coalesce(connection_name, 'GNUmed'),
                'fallback_application_name': 'GNUmed',
                'sslmode': 'prefer',
                # try to enforce a useful encoding early on so that we
                # have a good chance of decoding authentication errors
                # containing foreign language characters
                'client_encoding': 'UTF8'
        }
        if self.__host is not None:
                kwargs['host'] = self.__host
        if self.__port is not None:
                kwargs['port'] = self.__port
        if self.__password is not None:
                kwargs['password'] = self.__password
        return kwargs

Return dictionary with credentials suitable for psycopg2.connection() keyword arguments.

prop database : str
Expand source code
def _get_database(self) -> str:
        return self.__database
prop formatted_credentials : str
Expand source code
def __format_credentials(self) -> str:
        """Database credentials formatted as string."""
        cred_parts = [
                'dbname=%s' % self.__database,
                'host=%s' % self.__host,
                'port=%s' % self.__port,
                'user=%s' % self.__user
        ]
        return ' '.join(cred_parts)

Database credentials formatted as string.

prop host : str
Expand source code
def _get_host(self) -> str:
        return self.__host
prop password : str
Expand source code
def _get_password(self) -> str:
        return self.__password
prop port : int
Expand source code
def _get_port(self) -> int:
        return self.__port
prop user : str
Expand source code
def _get_user(self) -> str:
        return self.__user

Methods

def generate_credentials_kwargs(self, connection_name: str = None) ‑> dict
Expand source code
def generate_credentials_kwargs(self, connection_name:str=None) -> dict:
        """Return dictionary with credentials suitable for psycopg2.connection() keyword arguments."""
        assert (self.__database is not None), 'self.__database must be defined'
        assert (self.__user is not None), 'self.__user must be defined'

        kwargs = {
                'dbname': self.__database,
                'user': self.__user,
                'application_name': gmTools.coalesce(connection_name, 'GNUmed'),
                'fallback_application_name': 'GNUmed',
                'sslmode': 'prefer',
                # try to enforce a useful encoding early on so that we
                # have a good chance of decoding authentication errors
                # containing foreign language characters
                'client_encoding': 'UTF8'
        }
        if self.__host is not None:
                kwargs['host'] = self.__host
        if self.__port is not None:
                kwargs['port'] = self.__port
        if self.__password is not None:
                kwargs['password'] = self.__password
        return kwargs

Return dictionary with credentials suitable for psycopg2.connection() keyword arguments.

class gmConnectionPool
Expand source code
class gmConnectionPool(gmBorg.cBorg):
        """The Singleton connection pool class.

        Any normal connection from GNUmed to PostgreSQL should go
        through this pool. It needs credentials to be provided
        via .credentials = <cPGCredentials>.
        """
        def __init__(self) -> None:
                try:
                        self.__initialized
                        return

                except AttributeError:
                        self.__initialized:bool = True

                _log.info('[%s]: first instantiation', self.__class__.__name__)
                self.__ro_conn_pool:dict[str, psycopg2.extras.DictConnection] = {}      # keyed by "credentials::thread ID"
                self.__SQL_set_client_timezone:str = None
                self.__client_timezone = None
                self.__creds:cPGCredentials = None
                self.__log_auth_environment()

        #--------------------------------------------------
        # connection API
        #--------------------------------------------------
        def get_connection(self, readonly:bool=True, verbose:bool=False, pooled:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection:
                """Provide a database connection.

                Readonly connections can be pooled. If there is no
                suitable connection in the pool a new one will be
                created and stored. The pool is per-thread and
                per-credentials.

                Args:
                        readonly: make connection read only
                        verbose: make connection log more things
                        pooled: return a pooled connection, if possible
                        connection_name: a human readable name for the connection, avoid spaces
                        autocommit: whether to autocommit
                        credentials: use for getting a connection with other credentials different from what the pool was set to before

                Returns:
                        a working connection to a PostgreSQL database
                """
#               if _DISABLE_CONNECTION_POOL:
#                       pooled = False

                if credentials is not None:
                        pooled = False
                conn = None
                if readonly and pooled:
                        try:
                                conn = self.__ro_conn_pool[self.pool_key]
                        except KeyError:
                                _log.info('pooled RO conn with key [%s] requested, but not in pool, setting up', self.pool_key)
                        if conn is not None:
                                #if verbose:
                                #       _log.debug('using pooled conn [%s]', self.pool_key)
                                return conn

                if conn is None:
                        conn = self.get_raw_connection (
                                verbose = verbose,
                                readonly = readonly,
                                connection_name = connection_name,
                                autocommit = autocommit,
                                credentials = credentials
                        )
                if readonly and pooled:
                        # monkey patch close() for pooled RO connections
                        conn.original_close = conn.close                                                                # type: ignore [attr-defined]
                        conn.close = _raise_exception_on_pooled_ro_conn_close                   # type: ignore [assignment]
                # set connection properties
                # - client encoding
                encoding = 'UTF8'
                _log.debug('setting client (wire) encoding to %s', encoding)
                conn.set_client_encoding(encoding)
                # - transaction isolation level
                if not readonly:
                        conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
                # - client time zone
                _log.debug('client timezone [%s]', self.__client_timezone)
                curs = conn.cursor()
                curs.execute(self.__SQL_set_client_timezone, {'tz': self.__client_timezone})
                curs.close()
                conn.commit()
                if readonly and pooled:
                        _log.debug('putting RO conn with key [%s] into pool', self.pool_key)
                        self.__ro_conn_pool[self.pool_key] = conn
                if verbose:
                        log_conn_state(conn)
                return conn

        #--------------------------------------------------
        def get_rw_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection:
                return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit)

        #--------------------------------------------------
        def get_ro_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection:
                return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit)

        #--------------------------------------------------
        def get_raw_connection(self, verbose:bool=False, readonly:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection:
                """Get a raw, unadorned connection.

                This will not set any parameters such as encoding,
                timezone, or datestyle, hence it can be used for
                "service" connections for verifying encodings etc
                """
#               # FIXME: support verbose
                if credentials is None:
                        creds2use = self.__creds
                else:
                        creds2use = credentials
                creds_kwargs = creds2use.generate_credentials_kwargs(connection_name = connection_name)
                try:
                        # DictConnection now _is_ a real dictionary
                        conn = psycopg2.connect(connection_factory = psycopg2.extras.DictConnection, **creds_kwargs)
                except psycopg2.OperationalError as e:
                        _log.error('failed to establish connection [%s]', creds2use.formatted_credentials)
                        t, v, tb = sys.exc_info()
                        try:
                                msg = e.args[0]
                        except (AttributeError, IndexError, TypeError):
                                raise

                        if not self.__is_auth_fail_msg(msg):
                                raise

                        raise cAuthenticationError(creds2use.formatted_credentials, msg).with_traceback(tb)

                _log.debug('established connection "%s", backend PID: %s', gmTools.coalesce(connection_name, 'anonymous'), conn.get_backend_pid())
                # safe-guard
                conn._original_rollback = conn.rollback
                conn.rollback = types.MethodType(_safe_transaction_rollback, conn)

                # - inspect server
                self.__log_on_first_contact(conn)
                # - verify PG understands client time zone
                self.__detect_client_timezone(conn)
                # - set access mode
                if readonly:
                        _log.debug('readonly: forcing autocommit=True to avoid <IDLE IN TRANSACTION>')
                        autocommit = True
                else:
                        _log.debug('autocommit is desired to be: %s', autocommit)
                conn.commit()
                conn.autocommit = autocommit
                conn.readonly = readonly
                # - assume verbose=True to mean we want debugging in the database, too
                if verbose or _VERBOSE_PG_LOG:
                        _log.debug('enabling <plpgsql.extra_warnings/_errors>')
                        curs = conn.cursor()
                        try:
                                curs.execute("SET plpgsql.extra_warnings TO 'all'")
                        except Exception:
                                _log.exception('cannot enable <plpgsql.extra_warnings>')
                        finally:
                                curs.close()
                                conn.commit()
                        curs = conn.cursor()
                        try:
                                curs.execute("SET plpgsql.extra_errors TO 'all'")
                        except Exception:
                                _log.exception('cannot enable <plpgsql.extra_errors>')
                        finally:
                                curs.close()
                                conn.commit()
                        _log.debug('enabling auto_explain')
                        curs = conn.cursor()
                        try:
                                curs.execute("SELECT gm.load_auto_explain(3000)")
                        except Exception:
                                _log.exception('cannot enable auto_explain')
                        finally:
                                curs.close()
                                conn.commit()
                return conn

        #--------------------------------------------------
        def get_dbowner_connection(self, readonly:bool=True, verbose:bool=False, connection_name:str=None, autocommit:bool=False, dbo_password:str=None, dbo_account:str='gm-dbo') -> psycopg2.extras.DictConnection:
                """Return a connection for the database owner.

                Will not touch the pool.
                """
                dbo_creds = cPGCredentials()
                dbo_creds.user = dbo_account
                dbo_creds.password = dbo_password
                dbo_creds.database = self.__creds.database
                dbo_creds.host = self.__creds.host
                dbo_creds.port = self.__creds.port
                return self.get_connection (
                        pooled = False,
                        readonly = readonly,
                        verbose = verbose,
                        connection_name = connection_name,
                        autocommit = autocommit,
                        credentials = dbo_creds
                )

        #--------------------------------------------------
        def discard_pooled_connection_of_thread(self):
                """Discard from pool the connection of the current thread."""
                try:
                        conn = self.__ro_conn_pool[self.pool_key]
                except KeyError:
                        _log.debug('no connection pooled for thread [%s]', self.pool_key)
                        return

                del self.__ro_conn_pool[self.pool_key]
                if conn.closed:
                        return

                conn.close = conn.original_close
                conn.close()

        #--------------------------------------------------
        def shutdown(self):
                """Close and discard all pooled connections."""
                for conn_key in self.__ro_conn_pool:
                        conn = self.__ro_conn_pool[conn_key]
                        if conn.closed:
                                continue
                        _log.debug('closing open database connection, pool key: %s', conn_key)
                        log_conn_state(conn)
                        conn.close = conn.original_close
                        conn.close()
                del self.__ro_conn_pool

        #--------------------------------------------------
        # utility functions
        #--------------------------------------------------
        def __log_on_first_contact(self, conn:psycopg2.extras.DictConnection):
                global postgresql_version
                if postgresql_version is not None:
                        return

                _log.debug('_\\\\// heed Prime Directive _\\\\//')
                # FIXME: verify PG version
                curs = conn.cursor()
                curs.execute ("""
                        SELECT
                                substring(setting, E'^\\\\d{1,2}\\\\.\\\\d{1,2}')::numeric AS version
                        FROM
                                pg_settings
                        WHERE
                                name = 'server_version'"""
                )
                postgresql_version = curs.fetchone()['version']
                _log.info('PostgreSQL version (numeric): %s' % postgresql_version)
                try:
                        curs.execute("SELECT pg_size_pretty(pg_database_size(current_database()))")
                        _log.info('database size: %s', curs.fetchone()[0])
                except Exception:
                        _log.exception('cannot get database size')
                finally:
                        curs.close()
                        conn.commit()
                curs = conn.cursor()
                log_pg_settings(curs = curs)
                log_role_permissions(curs)
                curs.close()
                conn.commit()
                _log.debug('done')

        #--------------------------------------------------
        def __log_auth_environment(self):
                pgpass_file = os.path.expanduser(os.path.join('~', '.pgpass'))
                if os.path.exists(pgpass_file):
                        _log.debug('standard .pgpass (%s) exists', pgpass_file)
                else:
                        _log.debug('standard .pgpass (%s) not found', pgpass_file)
                pgpass_var = os.getenv('PGPASSFILE')
                if pgpass_var is None:
                        _log.debug('$PGPASSFILE not set')
                else:
                        if os.path.exists(pgpass_var):
                                _log.debug('$PGPASSFILE=%s -> file exists', pgpass_var)
                        else:
                                _log.debug('$PGPASSFILE=%s -> file not found')

        #--------------------------------------------------
        def __detect_client_timezone(self, conn:psycopg2.extras.DictConnection):
                """This is run on the very first connection."""

                if self.__client_timezone is not None:
                        return

                _log.debug('trying to detect timezone from system')
                # we need gmDateTime to be initialized
                if gmDateTime.current_local_iso_numeric_timezone_string is None:
                        gmDateTime.init()
                tz_candidates = [gmDateTime.current_local_timezone_name]
                try:
                        tz_candidates.append(os.environ['TZ'])
                except KeyError:
                        pass
                expanded_tzs = []
                for tz in tz_candidates:
                        expanded = self.__expand_timezone(conn, timezone = tz)
                        if expanded != tz:
                                expanded_tzs.append(expanded)
                tz_candidates.extend(expanded_tzs)
                _log.debug('candidates: %s', tz_candidates)
                # find best among candidates
                found = False
                for tz in tz_candidates:
                        if self.__validate_timezone(conn = conn, timezone = tz):
                                self.__client_timezone = tz
                                self.__SQL_set_client_timezone = 'SET timezone TO %(tz)s'
                                found = True
                                break
                if not found:
                        self.__client_timezone = gmDateTime.current_local_iso_numeric_timezone_string
                        self.__SQL_set_client_timezone = 'set time zone interval %(tz)s hour to minute'
                _log.info('client system timezone detected as equivalent to [%s]', self.__client_timezone)
                # FIXME: check whether server.timezone is the same
                # FIXME: value as what we eventually detect

        #--------------------------------------------------
        def __expand_timezone(self, conn:psycopg2.extras.DictConnection, timezone:str):
                """Some timezone defs are abbreviations so try to expand
                them because "set time zone" doesn't take abbreviations"""

                cmd = _SQL_expand_tz_name
                args = {'tz': timezone}
                conn.commit()
                curs = conn.cursor()
                result = timezone
                try:
                        curs.execute(cmd, args)
                        rows = curs.fetchall()
                except Exception:
                        _log.exception('cannot expand timezone abbreviation [%s]', timezone)
                finally:
                        curs.close()
                        conn.rollback()
                if rows:
                        result = rows[0]['name']
                        _log.debug('[%s] maps to [%s]', timezone, result)
                return result

        #---------------------------------------------------
        def __validate_timezone(self, conn:psycopg2.extras.DictConnection, timezone:str) -> bool:
                _log.debug('validating timezone [%s]', timezone)
                cmd = 'SET timezone TO %(tz)s'
                args = {'tz': timezone}
                curs = conn.cursor()
                try:
                        curs.execute(cmd, args)
                except psycopg2.DataError:
                        _log.warning('timezone [%s] is not settable', timezone)
                        return False

                except Exception:
                        _log.exception('failed to set timezone to [%s]', timezone)
                        return False

                finally:
                        conn.rollback()
                _log.info('time zone [%s] is settable', timezone)
                # can we actually use it, though ?
                SQL = "SELECT '1931-03-26 11:11:11+0'::timestamp with time zone"
                try:
                        curs.execute(SQL)
                        curs.fetchone()
                except Exception:
                        _log.exception('error using timezone [%s]', timezone)
                        return False

                finally:
                        curs.close()
                        conn.rollback()
                _log.info('timezone [%s] is usable', timezone)
                return True

        #--------------------------------------------------
        # properties
        #--------------------------------------------------
        def _get_credentials(self) -> cPGCredentials:
                return self.__creds

        def _set_credentials(self, creds:cPGCredentials=None):
                if self.__creds is None:
                        self.__creds = creds
                        return

                _log.debug('invalidating pooled connections on credentials change')
                pool_key_start_from_curr_creds = self.__creds.formatted_credentials + '::thread='
                for pool_key in self.__ro_conn_pool:
                        if not pool_key.startswith(pool_key_start_from_curr_creds):
                                continue
                        conn = self.__ro_conn_pool[pool_key]
                        del self.__ro_conn_pool[pool_key]
                        if conn.closed:
                                del conn
                                continue
                        _log.debug('closing open database connection, pool key: %s', pool_key)
                        log_conn_state(conn)
                        conn.original_close()                                                                   # type: ignore [attr-defined]
                        del conn
                self.__creds = creds

        credentials = property(_get_credentials, _set_credentials)

        #--------------------------------------------------
        def _get_pool_key(self) -> str:
                return '%s::thread=%s' % (
                        self.__creds.formatted_credentials,
                        threading.current_thread().ident
                )

        pool_key = property(_get_pool_key)

        #--------------------------------------------------
        def __is_auth_fail_msg(self, msg:str) -> bool:
                if 'fe_sendauth' in msg:
                        return True

                if regex.search(r'user ".*" does not exist', msg) is not None:
                        return True

                if 'uthenti' in msg:
                        return True

                if ((
                                (regex.search(r'user ".*"', msg) is not None)
                                        or
                                (regex.search(r'(R|r)ol{1,2}e', msg) is not None)
                        )
                        and ('exist' in msg)
                        and (regex.search(r'n(o|ich)t', msg) is not None)
                ):
                        return True

                # to the best of our knowledge
                return False

The Singleton connection pool class.

Any normal connection from GNUmed to PostgreSQL should go through this pool. It needs credentials to be provided via .credentials = .

Ancestors

Instance variables

prop credentialscPGCredentials
Expand source code
def _get_credentials(self) -> cPGCredentials:
        return self.__creds
prop pool_key : str
Expand source code
def _get_pool_key(self) -> str:
        return '%s::thread=%s' % (
                self.__creds.formatted_credentials,
                threading.current_thread().ident
        )

Methods

def discard_pooled_connection_of_thread(self)
Expand source code
def discard_pooled_connection_of_thread(self):
        """Discard from pool the connection of the current thread."""
        try:
                conn = self.__ro_conn_pool[self.pool_key]
        except KeyError:
                _log.debug('no connection pooled for thread [%s]', self.pool_key)
                return

        del self.__ro_conn_pool[self.pool_key]
        if conn.closed:
                return

        conn.close = conn.original_close
        conn.close()

Discard from pool the connection of the current thread.

def get_connection(self,
readonly: bool = True,
verbose: bool = False,
pooled: bool = True,
connection_name: str = None,
autocommit: bool = False,
credentials: cPGCredentials = None) ‑> psycopg2.extras.DictConnection
Expand source code
        def get_connection(self, readonly:bool=True, verbose:bool=False, pooled:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection:
                """Provide a database connection.

                Readonly connections can be pooled. If there is no
                suitable connection in the pool a new one will be
                created and stored. The pool is per-thread and
                per-credentials.

                Args:
                        readonly: make connection read only
                        verbose: make connection log more things
                        pooled: return a pooled connection, if possible
                        connection_name: a human readable name for the connection, avoid spaces
                        autocommit: whether to autocommit
                        credentials: use for getting a connection with other credentials different from what the pool was set to before

                Returns:
                        a working connection to a PostgreSQL database
                """
#               if _DISABLE_CONNECTION_POOL:
#                       pooled = False

                if credentials is not None:
                        pooled = False
                conn = None
                if readonly and pooled:
                        try:
                                conn = self.__ro_conn_pool[self.pool_key]
                        except KeyError:
                                _log.info('pooled RO conn with key [%s] requested, but not in pool, setting up', self.pool_key)
                        if conn is not None:
                                #if verbose:
                                #       _log.debug('using pooled conn [%s]', self.pool_key)
                                return conn

                if conn is None:
                        conn = self.get_raw_connection (
                                verbose = verbose,
                                readonly = readonly,
                                connection_name = connection_name,
                                autocommit = autocommit,
                                credentials = credentials
                        )
                if readonly and pooled:
                        # monkey patch close() for pooled RO connections
                        conn.original_close = conn.close                                                                # type: ignore [attr-defined]
                        conn.close = _raise_exception_on_pooled_ro_conn_close                   # type: ignore [assignment]
                # set connection properties
                # - client encoding
                encoding = 'UTF8'
                _log.debug('setting client (wire) encoding to %s', encoding)
                conn.set_client_encoding(encoding)
                # - transaction isolation level
                if not readonly:
                        conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
                # - client time zone
                _log.debug('client timezone [%s]', self.__client_timezone)
                curs = conn.cursor()
                curs.execute(self.__SQL_set_client_timezone, {'tz': self.__client_timezone})
                curs.close()
                conn.commit()
                if readonly and pooled:
                        _log.debug('putting RO conn with key [%s] into pool', self.pool_key)
                        self.__ro_conn_pool[self.pool_key] = conn
                if verbose:
                        log_conn_state(conn)
                return conn

Provide a database connection.

Readonly connections can be pooled. If there is no suitable connection in the pool a new one will be created and stored. The pool is per-thread and per-credentials.

Args

readonly
make connection read only
verbose
make connection log more things
pooled
return a pooled connection, if possible
connection_name
a human readable name for the connection, avoid spaces
autocommit
whether to autocommit
credentials
use for getting a connection with other credentials different from what the pool was set to before

Returns

a working connection to a PostgreSQL database

def get_dbowner_connection(self,
readonly: bool = True,
verbose: bool = False,
connection_name: str = None,
autocommit: bool = False,
dbo_password: str = None,
dbo_account: str = 'gm-dbo') ‑> psycopg2.extras.DictConnection
Expand source code
def get_dbowner_connection(self, readonly:bool=True, verbose:bool=False, connection_name:str=None, autocommit:bool=False, dbo_password:str=None, dbo_account:str='gm-dbo') -> psycopg2.extras.DictConnection:
        """Return a connection for the database owner.

        Will not touch the pool.
        """
        dbo_creds = cPGCredentials()
        dbo_creds.user = dbo_account
        dbo_creds.password = dbo_password
        dbo_creds.database = self.__creds.database
        dbo_creds.host = self.__creds.host
        dbo_creds.port = self.__creds.port
        return self.get_connection (
                pooled = False,
                readonly = readonly,
                verbose = verbose,
                connection_name = connection_name,
                autocommit = autocommit,
                credentials = dbo_creds
        )

Return a connection for the database owner.

Will not touch the pool.

def get_raw_connection(self,
verbose: bool = False,
readonly: bool = True,
connection_name: str = None,
autocommit: bool = False,
credentials: cPGCredentials = None) ‑> psycopg2.extras.DictConnection
Expand source code
        def get_raw_connection(self, verbose:bool=False, readonly:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection:
                """Get a raw, unadorned connection.

                This will not set any parameters such as encoding,
                timezone, or datestyle, hence it can be used for
                "service" connections for verifying encodings etc
                """
#               # FIXME: support verbose
                if credentials is None:
                        creds2use = self.__creds
                else:
                        creds2use = credentials
                creds_kwargs = creds2use.generate_credentials_kwargs(connection_name = connection_name)
                try:
                        # DictConnection now _is_ a real dictionary
                        conn = psycopg2.connect(connection_factory = psycopg2.extras.DictConnection, **creds_kwargs)
                except psycopg2.OperationalError as e:
                        _log.error('failed to establish connection [%s]', creds2use.formatted_credentials)
                        t, v, tb = sys.exc_info()
                        try:
                                msg = e.args[0]
                        except (AttributeError, IndexError, TypeError):
                                raise

                        if not self.__is_auth_fail_msg(msg):
                                raise

                        raise cAuthenticationError(creds2use.formatted_credentials, msg).with_traceback(tb)

                _log.debug('established connection "%s", backend PID: %s', gmTools.coalesce(connection_name, 'anonymous'), conn.get_backend_pid())
                # safe-guard
                conn._original_rollback = conn.rollback
                conn.rollback = types.MethodType(_safe_transaction_rollback, conn)

                # - inspect server
                self.__log_on_first_contact(conn)
                # - verify PG understands client time zone
                self.__detect_client_timezone(conn)
                # - set access mode
                if readonly:
                        _log.debug('readonly: forcing autocommit=True to avoid <IDLE IN TRANSACTION>')
                        autocommit = True
                else:
                        _log.debug('autocommit is desired to be: %s', autocommit)
                conn.commit()
                conn.autocommit = autocommit
                conn.readonly = readonly
                # - assume verbose=True to mean we want debugging in the database, too
                if verbose or _VERBOSE_PG_LOG:
                        _log.debug('enabling <plpgsql.extra_warnings/_errors>')
                        curs = conn.cursor()
                        try:
                                curs.execute("SET plpgsql.extra_warnings TO 'all'")
                        except Exception:
                                _log.exception('cannot enable <plpgsql.extra_warnings>')
                        finally:
                                curs.close()
                                conn.commit()
                        curs = conn.cursor()
                        try:
                                curs.execute("SET plpgsql.extra_errors TO 'all'")
                        except Exception:
                                _log.exception('cannot enable <plpgsql.extra_errors>')
                        finally:
                                curs.close()
                                conn.commit()
                        _log.debug('enabling auto_explain')
                        curs = conn.cursor()
                        try:
                                curs.execute("SELECT gm.load_auto_explain(3000)")
                        except Exception:
                                _log.exception('cannot enable auto_explain')
                        finally:
                                curs.close()
                                conn.commit()
                return conn

Get a raw, unadorned connection.

This will not set any parameters such as encoding, timezone, or datestyle, hence it can be used for "service" connections for verifying encodings etc

def get_ro_conn(self,
verbose: bool = False,
connection_name: str = None,
autocommit: bool = False) ‑> psycopg2.extras.DictConnection
Expand source code
def get_ro_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection:
        return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit)
def get_rw_conn(self,
verbose: bool = False,
connection_name: str = None,
autocommit: bool = False) ‑> psycopg2.extras.DictConnection
Expand source code
def get_rw_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection:
        return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit)
def shutdown(self)
Expand source code
def shutdown(self):
        """Close and discard all pooled connections."""
        for conn_key in self.__ro_conn_pool:
                conn = self.__ro_conn_pool[conn_key]
                if conn.closed:
                        continue
                _log.debug('closing open database connection, pool key: %s', conn_key)
                log_conn_state(conn)
                conn.close = conn.original_close
                conn.close()
        del self.__ro_conn_pool

Close and discard all pooled connections.