Module Gnumed.pycommon.gmConnectionPool
GNUmed connection pooler.
Currently, only readonly connections are pooled.
This pool is (supposedly) thread safe.
Functions
def exception_is_connection_loss(exc: Exception) ‑> bool
-
Expand source code
def exception_is_connection_loss(exc: Exception) -> bool: """Checks whether exception represents connection loss.""" if not isinstance(exc, psycopg2.Error): # not a PG/psycopg2 exception return False try: if isinstance(exc, psycopg2.errors.AdminShutdown): _log.debug('indicates connection loss due to admin shutdown') return True except AttributeError: # psycopg2 2.7/2.8 transition (no AdminShutdown exception) pass try: msg = '%s' % exc.args[0] except (AttributeError, IndexError, TypeError): _log.debug('cannot extract message from exception') return False _log.debug('interpreting: %s', msg) for snippet in _connection_loss_markers: if snippet in msg: _log.debug('indicates connection loss') return True is_conn_loss = ( # OperationalError ('erver' in msg) and ( ('terminat' in msg) or ('abnorm' in msg) or ('end' in msg) or ('no route' in msg) ) ) or ( # InterfaceError ('onnect' in msg) and ( ('close' in msg) or ('end' in msg) ) ) if is_conn_loss: _log.debug('indicates connection loss') return is_conn_loss
Checks whether exception represents connection loss.
def log_conn_state(conn: psycopg2.extras.DictConnection) ‑> None
-
Expand source code
def log_conn_state(conn:psycopg2.extras.DictConnection) -> None: """Log details about a DB-API connection.""" tx_status = conn.get_transaction_status() if tx_status in [ psycopg2.extensions.TRANSACTION_STATUS_INERROR, psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN ]: isolation_level = '%s (tx aborted or unknown, cannot retrieve)' % conn.isolation_level else: isolation_level = '%s (%s)' % (conn.isolation_level, _map_psyco_iso_level2str[conn.isolation_level]) conn_status = '%s (%s)' % (conn.status, _map_psyco_conn_status2str[conn.status]) if conn.closed != 0: conn_status = 'undefined (%s)' % conn_status backend_pid = '<conn closed, cannot retrieve>' else: backend_pid = str(conn.get_backend_pid()) try: conn_deferrable = str(conn.deferrable) except AttributeError: conn_deferrable = '<unavailable>' d = { 'identity': id(conn), 'backend PID': backend_pid, 'protocol version': conn.protocol_version, 'encoding': conn.encoding, 'closed': conn.closed, 'readonly': conn.readonly, 'autocommit': conn.autocommit, 'isolation level (psyco)': isolation_level, 'async': conn.async_, 'deferrable': conn_deferrable, 'transaction status': '%s (%s)' % (tx_status, _map_psyco_tx_status2str[tx_status]), 'connection status': conn_status, 'executing async op': conn.isexecuting(), 'type': type(conn) } _log.debug(conn) for key in d: _log.debug('%s: %s', key, d[key])
Log details about a DB-API connection.
def log_cursor_state(cursor) ‑> None
-
Expand source code
def log_cursor_state(cursor) -> None: """Log details about a DB-API cursor.""" if cursor is None: _log.debug('cursor: None') return conn = cursor.connection tx_status = conn.get_transaction_status() if tx_status in [ psycopg2.extensions.TRANSACTION_STATUS_INERROR, psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN ]: isolation_level = '<tx aborted or unknown, cannot retrieve>' else: isolation_level = conn.isolation_level try: conn_deferrable = conn.deferrable except AttributeError: conn_deferrable = '<unavailable>' if cursor.query is None: query = '<no query>' else: query = cursor.query.decode(errors = 'replace') if conn.closed != 0: backend_pid = '<conn closed, cannot retrieve>' else: backend_pid = conn.get_backend_pid() txt = """Cursor identity: %s; name: %s closed: %s; scrollable: %s; with hold: %s; arraysize: %s; itersize: %s; last rowcount: %s; rownumber: %s; lastrowid (OID): %s; last description: %s statusmessage: %s Connection identity: %s; backend pid: %s; protocol version: %s; closed: %s; autocommit: %s; isolation level: %s; encoding: %s; async: %s; deferrable: %s; readonly: %s; TX status: %s; CX status: %s; executing async op: %s; Query %s""" % ( # cursor level: id(cursor), cursor.name, cursor.closed, cursor.scrollable, cursor.withhold, cursor.arraysize, cursor.itersize, cursor.rowcount, cursor.rownumber, cursor.lastrowid, cursor.description, cursor.statusmessage, # connection level: id(conn), backend_pid, conn.protocol_version, conn.closed, conn.autocommit, isolation_level, conn.encoding, conn.async_, conn_deferrable, conn.readonly, _map_psyco_tx_status2str[tx_status], _map_psyco_conn_status2str[conn.status], conn.isexecuting(), # query level: query ) gmLog2.log_multiline(logging.DEBUG, message = 'Link state:', line_prefix = '', text = txt)
Log details about a DB-API cursor.
def log_pg_exception_details(exc: Exception) ‑> bool
-
Expand source code
def log_pg_exception_details(exc: Exception) -> bool: """Logs details from a database exception.""" if not isinstance(exc, psycopg2.Error): return False _log.error(type(exc)) try: for arg in exc.args: _log.debug('exc.arg: %s', arg) except AttributeError: _log.debug('exception has no <.args>') _log.debug('pgerror: [%s]', exc.pgerror) if exc.pgcode is None: _log.debug('pgcode : %s', exc.pgcode) else: _log.debug('pgcode : %s (%s)', exc.pgcode, PG_error_codes.lookup(exc.pgcode)) log_cursor_state(exc.cursor) try: diags = exc.diag except AttributeError: _log.debug('<.diag> not available') diags = None if diags is None: return True for attr in dir(diags): if attr.startswith('__'): continue val = getattr(diags, attr) if val is None: continue _log.debug('%s: %s', attr, val) return True
Logs details from a database exception.
def log_pg_settings(curs) ‑> bool
-
Expand source code
def log_pg_settings(curs) -> bool: """Log PostgreSQL server settings.""" # config settings try: curs.execute('SELECT * FROM pg_settings') except psycopg2.Error: _log.exception('cannot retrieve PG settings ("SELECT ... FROM pg_settings" failed)') return False settings = curs.fetchall() if settings: for setting in settings: if setting['unit'] is None: unit = '' else: unit = ' %s' % setting['unit'] if setting['sourcefile'] is None: sfile = '' else: sfile = '// %s @ %s' % (setting['sourcefile'], setting['sourceline']) pending_restart = u'' try: if setting['pending_restart']: pending_restart = u'// needs restart' except KeyError: pass # 'pending_restart' does not exist in PG 9.4 yet _log.debug('%s: %s%s (set from: [%s] // session RESET will set to: [%s]%s%s)', setting['name'], setting['setting'], unit, setting['source'], setting['reset_val'], pending_restart, sfile ) # extensions try: curs.execute('select pg_available_extensions()') except Exception: _log.exception('cannot log available PG extensions') return False extensions = curs.fetchall() if extensions: for ext in extensions: _log.debug('PG extension: %s', ext['pg_available_extensions']) else: _log.error('no PG extensions available') # log pg_config -- can only be read by superusers :-/ # database collation try: curs.execute('SELECT *, pg_database_collation_actual_version(oid), pg_encoding_to_char(encoding) FROM pg_database WHERE datname = current_database()') except psycopg2.Error: _log.exception('cannot log actual collation version (probably PG < 15)') curs.execute('SELECT * FROM pg_database WHERE datname = current_database()') config = curs.fetchall() gmLog2.log_multiline(10, message = 'PG database config', text = gmTools.format_dict_like(dict(config[0]), tabular = True)) return True
Log PostgreSQL server settings.
def log_role_permissions(curs, role: str = None)
-
Expand source code
def log_role_permissions(curs, role:str=None): """Log permissions for role.""" if role: SQL = SQL__get_permissions_for_role_name % {'role_name': role} msg = 'permissions for role [%s]:' % role else: SQL = SQL__get_permissions_for_current_role msg = 'permissions for role [current_user]:' try: curs.execute(SQL) except psycopg2.Error: _log.exception('cannot retrieve permissions') return perms = curs.fetchall() if not perms: _log.debug('no permissions') return gmLog2.log_multiline ( message = msg, line_prefix = ' ', text = [ '%(privilege_type)10s ON %(name_1)s.%(name_2)s.%(name_3)s (%(object_type)s)' % p for p in perms ] )
Log permissions for role.
Classes
class cAdapterPyDateTime (dt)
-
Expand source code
class cAdapterPyDateTime(object): def __init__(self, dt): if dt.tzinfo is None: raise ValueError('datetime.datetime instance is lacking a time zone: [%s]' % _timestamp_template % dt.isoformat()) self.__dt = dt def getquoted(self): return _timestamp_template % self.__dt.isoformat()
Methods
def getquoted(self)
-
Expand source code
def getquoted(self): return _timestamp_template % self.__dt.isoformat()
class cAuthenticationError (creds=None, prev_val=None)
-
Expand source code
class cAuthenticationError(psycopg2.OperationalError): def __init__(self, creds=None, prev_val=None): self.creds = creds self.prev_val = prev_val def __str__(self): return 'PostgreSQL: %sDSN: %s' % (self.prev_val, self.creds)
Error related to database operation (disconnect, memory allocation etc).
Ancestors
- psycopg2.OperationalError
- psycopg2.DatabaseError
- psycopg2.Error
- builtins.Exception
- builtins.BaseException
class cPGCredentials
-
Expand source code
class cPGCredentials: """Holds PostgreSQL credentials""" def __init__(self) -> None: self.__host:str = None # None: left out -> defaults to $PGHOST or implicit <localhost> self.__port:int = None # None: left out -> defaults to $PGPORT or libpq compiled-in default (typically 5432) self.__database:str = None # must be set before connecting self.__user:str = None # must be set before connecting self.__password:str = None # None: left out # -> try password-less connect (TRUST/IDENT/PEER) # -> try connect with password from <passfile> parameter or $PGPASSFILE or ~/.pgpass #-------------------------------------------------- # properties #-------------------------------------------------- def __format_credentials(self) -> str: """Database credentials formatted as string.""" cred_parts = [ 'dbname=%s' % self.__database, 'host=%s' % self.__host, 'port=%s' % self.__port, 'user=%s' % self.__user ] return ' '.join(cred_parts) formatted_credentials = property(__format_credentials) #-------------------------------------------------- def generate_credentials_kwargs(self, connection_name:str=None) -> dict: """Return dictionary with credentials suitable for psycopg2.connection() keyword arguments.""" assert (self.__database is not None), 'self.__database must be defined' assert (self.__user is not None), 'self.__user must be defined' kwargs = { 'dbname': self.__database, 'user': self.__user, 'application_name': gmTools.coalesce(connection_name, 'GNUmed'), 'fallback_application_name': 'GNUmed', 'sslmode': 'prefer', # try to enforce a useful encoding early on so that we # have a good chance of decoding authentication errors # containing foreign language characters 'client_encoding': 'UTF8' } if self.__host is not None: kwargs['host'] = self.__host if self.__port is not None: kwargs['port'] = self.__port if self.__password is not None: kwargs['password'] = self.__password return kwargs credentials_kwargs = property(generate_credentials_kwargs) #-------------------------------------------------- def _get_database(self) -> str: return self.__database def _set_database(self, database:str=None): assert database, '<database> must not be None' assert database.strip(), '<database> must not be empty' assert ('salaam.homeunix' not in database), 'The public database is not hosted by <salaam.homeunix.com> anymore.\n\nPlease point your configuration files to <publicdb.gnumed.de>.' self.__database = database.strip() _log.info('[%s]', self.__database) database = property(_get_database, _set_database) #-------------------------------------------------- def _get_host(self) -> str: return self.__host def _set_host(self, host:str=None): if host is not None: host = host.strip() if host == '': host = None self.__host = host _log.info('[%s]', self.__host) host = property(_get_host, _set_host) #-------------------------------------------------- def _get_port(self) -> int: return self.__port def _set_port(self, port=None): _log.info('[%s]', port) if port is None: self.__port = None return self.__port = int(port) port = property(_get_port, _set_port) #-------------------------------------------------- def _get_user(self) -> str: return self.__user def _set_user(self, user:str=None): assert (user is not None), '<user> must not be None' assert (user.strip() != ''), '<user> must not be empty' self.__user = user.strip() _log.info('[%s]', self.__user) user = property(_get_user, _set_user) #-------------------------------------------------- def _get_password(self) -> str: return self.__password def _set_password(self, password:str=None): if password is not None: gmLog2.add_word2hide(password) self.__password = password _log.info('password was set') password = property(_get_password, _set_password)
Holds PostgreSQL credentials
Instance variables
prop credentials_kwargs : dict
-
Expand source code
def generate_credentials_kwargs(self, connection_name:str=None) -> dict: """Return dictionary with credentials suitable for psycopg2.connection() keyword arguments.""" assert (self.__database is not None), 'self.__database must be defined' assert (self.__user is not None), 'self.__user must be defined' kwargs = { 'dbname': self.__database, 'user': self.__user, 'application_name': gmTools.coalesce(connection_name, 'GNUmed'), 'fallback_application_name': 'GNUmed', 'sslmode': 'prefer', # try to enforce a useful encoding early on so that we # have a good chance of decoding authentication errors # containing foreign language characters 'client_encoding': 'UTF8' } if self.__host is not None: kwargs['host'] = self.__host if self.__port is not None: kwargs['port'] = self.__port if self.__password is not None: kwargs['password'] = self.__password return kwargs
Return dictionary with credentials suitable for psycopg2.connection() keyword arguments.
prop database : str
-
Expand source code
def _get_database(self) -> str: return self.__database
prop formatted_credentials : str
-
Expand source code
def __format_credentials(self) -> str: """Database credentials formatted as string.""" cred_parts = [ 'dbname=%s' % self.__database, 'host=%s' % self.__host, 'port=%s' % self.__port, 'user=%s' % self.__user ] return ' '.join(cred_parts)
Database credentials formatted as string.
prop host : str
-
Expand source code
def _get_host(self) -> str: return self.__host
prop password : str
-
Expand source code
def _get_password(self) -> str: return self.__password
prop port : int
-
Expand source code
def _get_port(self) -> int: return self.__port
prop user : str
-
Expand source code
def _get_user(self) -> str: return self.__user
Methods
def generate_credentials_kwargs(self, connection_name: str = None) ‑> dict
-
Expand source code
def generate_credentials_kwargs(self, connection_name:str=None) -> dict: """Return dictionary with credentials suitable for psycopg2.connection() keyword arguments.""" assert (self.__database is not None), 'self.__database must be defined' assert (self.__user is not None), 'self.__user must be defined' kwargs = { 'dbname': self.__database, 'user': self.__user, 'application_name': gmTools.coalesce(connection_name, 'GNUmed'), 'fallback_application_name': 'GNUmed', 'sslmode': 'prefer', # try to enforce a useful encoding early on so that we # have a good chance of decoding authentication errors # containing foreign language characters 'client_encoding': 'UTF8' } if self.__host is not None: kwargs['host'] = self.__host if self.__port is not None: kwargs['port'] = self.__port if self.__password is not None: kwargs['password'] = self.__password return kwargs
Return dictionary with credentials suitable for psycopg2.connection() keyword arguments.
class gmConnectionPool
-
Expand source code
class gmConnectionPool(gmBorg.cBorg): """The Singleton connection pool class. Any normal connection from GNUmed to PostgreSQL should go through this pool. It needs credentials to be provided via .credentials = <cPGCredentials>. """ def __init__(self) -> None: try: self.__initialized return except AttributeError: self.__initialized:bool = True _log.info('[%s]: first instantiation', self.__class__.__name__) self.__ro_conn_pool:dict[str, psycopg2.extras.DictConnection] = {} # keyed by "credentials::thread ID" self.__SQL_set_client_timezone:str = None self.__client_timezone = None self.__creds:cPGCredentials = None self.__log_auth_environment() #-------------------------------------------------- # connection API #-------------------------------------------------- def get_connection(self, readonly:bool=True, verbose:bool=False, pooled:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection: """Provide a database connection. Readonly connections can be pooled. If there is no suitable connection in the pool a new one will be created and stored. The pool is per-thread and per-credentials. Args: readonly: make connection read only verbose: make connection log more things pooled: return a pooled connection, if possible connection_name: a human readable name for the connection, avoid spaces autocommit: whether to autocommit credentials: use for getting a connection with other credentials different from what the pool was set to before Returns: a working connection to a PostgreSQL database """ # if _DISABLE_CONNECTION_POOL: # pooled = False if credentials is not None: pooled = False conn = None if readonly and pooled: try: conn = self.__ro_conn_pool[self.pool_key] except KeyError: _log.info('pooled RO conn with key [%s] requested, but not in pool, setting up', self.pool_key) if conn is not None: #if verbose: # _log.debug('using pooled conn [%s]', self.pool_key) return conn if conn is None: conn = self.get_raw_connection ( verbose = verbose, readonly = readonly, connection_name = connection_name, autocommit = autocommit, credentials = credentials ) if readonly and pooled: # monkey patch close() for pooled RO connections conn.original_close = conn.close # type: ignore [attr-defined] conn.close = _raise_exception_on_pooled_ro_conn_close # type: ignore [assignment] # set connection properties # - client encoding encoding = 'UTF8' _log.debug('setting client (wire) encoding to %s', encoding) conn.set_client_encoding(encoding) # - transaction isolation level if not readonly: conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) # - client time zone _log.debug('client timezone [%s]', self.__client_timezone) curs = conn.cursor() curs.execute(self.__SQL_set_client_timezone, {'tz': self.__client_timezone}) curs.close() conn.commit() if readonly and pooled: _log.debug('putting RO conn with key [%s] into pool', self.pool_key) self.__ro_conn_pool[self.pool_key] = conn if verbose: log_conn_state(conn) return conn #-------------------------------------------------- def get_rw_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection: return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit) #-------------------------------------------------- def get_ro_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection: return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit) #-------------------------------------------------- def get_raw_connection(self, verbose:bool=False, readonly:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection: """Get a raw, unadorned connection. This will not set any parameters such as encoding, timezone, or datestyle, hence it can be used for "service" connections for verifying encodings etc """ # # FIXME: support verbose if credentials is None: creds2use = self.__creds else: creds2use = credentials creds_kwargs = creds2use.generate_credentials_kwargs(connection_name = connection_name) try: # DictConnection now _is_ a real dictionary conn = psycopg2.connect(connection_factory = psycopg2.extras.DictConnection, **creds_kwargs) except psycopg2.OperationalError as e: _log.error('failed to establish connection [%s]', creds2use.formatted_credentials) t, v, tb = sys.exc_info() try: msg = e.args[0] except (AttributeError, IndexError, TypeError): raise if not self.__is_auth_fail_msg(msg): raise raise cAuthenticationError(creds2use.formatted_credentials, msg).with_traceback(tb) _log.debug('established connection "%s", backend PID: %s', gmTools.coalesce(connection_name, 'anonymous'), conn.get_backend_pid()) # safe-guard conn._original_rollback = conn.rollback conn.rollback = types.MethodType(_safe_transaction_rollback, conn) # - inspect server self.__log_on_first_contact(conn) # - verify PG understands client time zone self.__detect_client_timezone(conn) # - set access mode if readonly: _log.debug('readonly: forcing autocommit=True to avoid <IDLE IN TRANSACTION>') autocommit = True else: _log.debug('autocommit is desired to be: %s', autocommit) conn.commit() conn.autocommit = autocommit conn.readonly = readonly # - assume verbose=True to mean we want debugging in the database, too if verbose or _VERBOSE_PG_LOG: _log.debug('enabling <plpgsql.extra_warnings/_errors>') curs = conn.cursor() try: curs.execute("SET plpgsql.extra_warnings TO 'all'") except Exception: _log.exception('cannot enable <plpgsql.extra_warnings>') finally: curs.close() conn.commit() curs = conn.cursor() try: curs.execute("SET plpgsql.extra_errors TO 'all'") except Exception: _log.exception('cannot enable <plpgsql.extra_errors>') finally: curs.close() conn.commit() _log.debug('enabling auto_explain') curs = conn.cursor() try: curs.execute("SELECT gm.load_auto_explain(3000)") except Exception: _log.exception('cannot enable auto_explain') finally: curs.close() conn.commit() return conn #-------------------------------------------------- def get_dbowner_connection(self, readonly:bool=True, verbose:bool=False, connection_name:str=None, autocommit:bool=False, dbo_password:str=None, dbo_account:str='gm-dbo') -> psycopg2.extras.DictConnection: """Return a connection for the database owner. Will not touch the pool. """ dbo_creds = cPGCredentials() dbo_creds.user = dbo_account dbo_creds.password = dbo_password dbo_creds.database = self.__creds.database dbo_creds.host = self.__creds.host dbo_creds.port = self.__creds.port return self.get_connection ( pooled = False, readonly = readonly, verbose = verbose, connection_name = connection_name, autocommit = autocommit, credentials = dbo_creds ) #-------------------------------------------------- def discard_pooled_connection_of_thread(self): """Discard from pool the connection of the current thread.""" try: conn = self.__ro_conn_pool[self.pool_key] except KeyError: _log.debug('no connection pooled for thread [%s]', self.pool_key) return del self.__ro_conn_pool[self.pool_key] if conn.closed: return conn.close = conn.original_close conn.close() #-------------------------------------------------- def shutdown(self): """Close and discard all pooled connections.""" for conn_key in self.__ro_conn_pool: conn = self.__ro_conn_pool[conn_key] if conn.closed: continue _log.debug('closing open database connection, pool key: %s', conn_key) log_conn_state(conn) conn.close = conn.original_close conn.close() del self.__ro_conn_pool #-------------------------------------------------- # utility functions #-------------------------------------------------- def __log_on_first_contact(self, conn:psycopg2.extras.DictConnection): global postgresql_version if postgresql_version is not None: return _log.debug('_\\\\// heed Prime Directive _\\\\//') # FIXME: verify PG version curs = conn.cursor() curs.execute (""" SELECT substring(setting, E'^\\\\d{1,2}\\\\.\\\\d{1,2}')::numeric AS version FROM pg_settings WHERE name = 'server_version'""" ) postgresql_version = curs.fetchone()['version'] _log.info('PostgreSQL version (numeric): %s' % postgresql_version) try: curs.execute("SELECT pg_size_pretty(pg_database_size(current_database()))") _log.info('database size: %s', curs.fetchone()[0]) except Exception: _log.exception('cannot get database size') finally: curs.close() conn.commit() curs = conn.cursor() log_pg_settings(curs = curs) log_role_permissions(curs) curs.close() conn.commit() _log.debug('done') #-------------------------------------------------- def __log_auth_environment(self): pgpass_file = os.path.expanduser(os.path.join('~', '.pgpass')) if os.path.exists(pgpass_file): _log.debug('standard .pgpass (%s) exists', pgpass_file) else: _log.debug('standard .pgpass (%s) not found', pgpass_file) pgpass_var = os.getenv('PGPASSFILE') if pgpass_var is None: _log.debug('$PGPASSFILE not set') else: if os.path.exists(pgpass_var): _log.debug('$PGPASSFILE=%s -> file exists', pgpass_var) else: _log.debug('$PGPASSFILE=%s -> file not found') #-------------------------------------------------- def __detect_client_timezone(self, conn:psycopg2.extras.DictConnection): """This is run on the very first connection.""" if self.__client_timezone is not None: return _log.debug('trying to detect timezone from system') # we need gmDateTime to be initialized if gmDateTime.current_local_iso_numeric_timezone_string is None: gmDateTime.init() tz_candidates = [gmDateTime.current_local_timezone_name] try: tz_candidates.append(os.environ['TZ']) except KeyError: pass expanded_tzs = [] for tz in tz_candidates: expanded = self.__expand_timezone(conn, timezone = tz) if expanded != tz: expanded_tzs.append(expanded) tz_candidates.extend(expanded_tzs) _log.debug('candidates: %s', tz_candidates) # find best among candidates found = False for tz in tz_candidates: if self.__validate_timezone(conn = conn, timezone = tz): self.__client_timezone = tz self.__SQL_set_client_timezone = 'SET timezone TO %(tz)s' found = True break if not found: self.__client_timezone = gmDateTime.current_local_iso_numeric_timezone_string self.__SQL_set_client_timezone = 'set time zone interval %(tz)s hour to minute' _log.info('client system timezone detected as equivalent to [%s]', self.__client_timezone) # FIXME: check whether server.timezone is the same # FIXME: value as what we eventually detect #-------------------------------------------------- def __expand_timezone(self, conn:psycopg2.extras.DictConnection, timezone:str): """Some timezone defs are abbreviations so try to expand them because "set time zone" doesn't take abbreviations""" cmd = _SQL_expand_tz_name args = {'tz': timezone} conn.commit() curs = conn.cursor() result = timezone try: curs.execute(cmd, args) rows = curs.fetchall() except Exception: _log.exception('cannot expand timezone abbreviation [%s]', timezone) finally: curs.close() conn.rollback() if rows: result = rows[0]['name'] _log.debug('[%s] maps to [%s]', timezone, result) return result #--------------------------------------------------- def __validate_timezone(self, conn:psycopg2.extras.DictConnection, timezone:str) -> bool: _log.debug('validating timezone [%s]', timezone) cmd = 'SET timezone TO %(tz)s' args = {'tz': timezone} curs = conn.cursor() try: curs.execute(cmd, args) except psycopg2.DataError: _log.warning('timezone [%s] is not settable', timezone) return False except Exception: _log.exception('failed to set timezone to [%s]', timezone) return False finally: conn.rollback() _log.info('time zone [%s] is settable', timezone) # can we actually use it, though ? SQL = "SELECT '1931-03-26 11:11:11+0'::timestamp with time zone" try: curs.execute(SQL) curs.fetchone() except Exception: _log.exception('error using timezone [%s]', timezone) return False finally: curs.close() conn.rollback() _log.info('timezone [%s] is usable', timezone) return True #-------------------------------------------------- # properties #-------------------------------------------------- def _get_credentials(self) -> cPGCredentials: return self.__creds def _set_credentials(self, creds:cPGCredentials=None): if self.__creds is None: self.__creds = creds return _log.debug('invalidating pooled connections on credentials change') pool_key_start_from_curr_creds = self.__creds.formatted_credentials + '::thread=' for pool_key in self.__ro_conn_pool: if not pool_key.startswith(pool_key_start_from_curr_creds): continue conn = self.__ro_conn_pool[pool_key] del self.__ro_conn_pool[pool_key] if conn.closed: del conn continue _log.debug('closing open database connection, pool key: %s', pool_key) log_conn_state(conn) conn.original_close() # type: ignore [attr-defined] del conn self.__creds = creds credentials = property(_get_credentials, _set_credentials) #-------------------------------------------------- def _get_pool_key(self) -> str: return '%s::thread=%s' % ( self.__creds.formatted_credentials, threading.current_thread().ident ) pool_key = property(_get_pool_key) #-------------------------------------------------- def __is_auth_fail_msg(self, msg:str) -> bool: if 'fe_sendauth' in msg: return True if regex.search(r'user ".*" does not exist', msg) is not None: return True if 'uthenti' in msg: return True if (( (regex.search(r'user ".*"', msg) is not None) or (regex.search(r'(R|r)ol{1,2}e', msg) is not None) ) and ('exist' in msg) and (regex.search(r'n(o|ich)t', msg) is not None) ): return True # to the best of our knowledge return False
The Singleton connection pool class.
Any normal connection from GNUmed to PostgreSQL should go through this pool. It needs credentials to be provided via .credentials =
. Ancestors
Instance variables
prop credentials : cPGCredentials
-
Expand source code
def _get_credentials(self) -> cPGCredentials: return self.__creds
prop pool_key : str
-
Expand source code
def _get_pool_key(self) -> str: return '%s::thread=%s' % ( self.__creds.formatted_credentials, threading.current_thread().ident )
Methods
def discard_pooled_connection_of_thread(self)
-
Expand source code
def discard_pooled_connection_of_thread(self): """Discard from pool the connection of the current thread.""" try: conn = self.__ro_conn_pool[self.pool_key] except KeyError: _log.debug('no connection pooled for thread [%s]', self.pool_key) return del self.__ro_conn_pool[self.pool_key] if conn.closed: return conn.close = conn.original_close conn.close()
Discard from pool the connection of the current thread.
def get_connection(self,
readonly: bool = True,
verbose: bool = False,
pooled: bool = True,
connection_name: str = None,
autocommit: bool = False,
credentials: cPGCredentials = None) ‑> psycopg2.extras.DictConnection-
Expand source code
def get_connection(self, readonly:bool=True, verbose:bool=False, pooled:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection: """Provide a database connection. Readonly connections can be pooled. If there is no suitable connection in the pool a new one will be created and stored. The pool is per-thread and per-credentials. Args: readonly: make connection read only verbose: make connection log more things pooled: return a pooled connection, if possible connection_name: a human readable name for the connection, avoid spaces autocommit: whether to autocommit credentials: use for getting a connection with other credentials different from what the pool was set to before Returns: a working connection to a PostgreSQL database """ # if _DISABLE_CONNECTION_POOL: # pooled = False if credentials is not None: pooled = False conn = None if readonly and pooled: try: conn = self.__ro_conn_pool[self.pool_key] except KeyError: _log.info('pooled RO conn with key [%s] requested, but not in pool, setting up', self.pool_key) if conn is not None: #if verbose: # _log.debug('using pooled conn [%s]', self.pool_key) return conn if conn is None: conn = self.get_raw_connection ( verbose = verbose, readonly = readonly, connection_name = connection_name, autocommit = autocommit, credentials = credentials ) if readonly and pooled: # monkey patch close() for pooled RO connections conn.original_close = conn.close # type: ignore [attr-defined] conn.close = _raise_exception_on_pooled_ro_conn_close # type: ignore [assignment] # set connection properties # - client encoding encoding = 'UTF8' _log.debug('setting client (wire) encoding to %s', encoding) conn.set_client_encoding(encoding) # - transaction isolation level if not readonly: conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) # - client time zone _log.debug('client timezone [%s]', self.__client_timezone) curs = conn.cursor() curs.execute(self.__SQL_set_client_timezone, {'tz': self.__client_timezone}) curs.close() conn.commit() if readonly and pooled: _log.debug('putting RO conn with key [%s] into pool', self.pool_key) self.__ro_conn_pool[self.pool_key] = conn if verbose: log_conn_state(conn) return conn
Provide a database connection.
Readonly connections can be pooled. If there is no suitable connection in the pool a new one will be created and stored. The pool is per-thread and per-credentials.
Args
readonly
- make connection read only
verbose
- make connection log more things
pooled
- return a pooled connection, if possible
connection_name
- a human readable name for the connection, avoid spaces
autocommit
- whether to autocommit
credentials
- use for getting a connection with other credentials different from what the pool was set to before
Returns
a working connection to a PostgreSQL database
def get_dbowner_connection(self,
readonly: bool = True,
verbose: bool = False,
connection_name: str = None,
autocommit: bool = False,
dbo_password: str = None,
dbo_account: str = 'gm-dbo') ‑> psycopg2.extras.DictConnection-
Expand source code
def get_dbowner_connection(self, readonly:bool=True, verbose:bool=False, connection_name:str=None, autocommit:bool=False, dbo_password:str=None, dbo_account:str='gm-dbo') -> psycopg2.extras.DictConnection: """Return a connection for the database owner. Will not touch the pool. """ dbo_creds = cPGCredentials() dbo_creds.user = dbo_account dbo_creds.password = dbo_password dbo_creds.database = self.__creds.database dbo_creds.host = self.__creds.host dbo_creds.port = self.__creds.port return self.get_connection ( pooled = False, readonly = readonly, verbose = verbose, connection_name = connection_name, autocommit = autocommit, credentials = dbo_creds )
Return a connection for the database owner.
Will not touch the pool.
def get_raw_connection(self,
verbose: bool = False,
readonly: bool = True,
connection_name: str = None,
autocommit: bool = False,
credentials: cPGCredentials = None) ‑> psycopg2.extras.DictConnection-
Expand source code
def get_raw_connection(self, verbose:bool=False, readonly:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection: """Get a raw, unadorned connection. This will not set any parameters such as encoding, timezone, or datestyle, hence it can be used for "service" connections for verifying encodings etc """ # # FIXME: support verbose if credentials is None: creds2use = self.__creds else: creds2use = credentials creds_kwargs = creds2use.generate_credentials_kwargs(connection_name = connection_name) try: # DictConnection now _is_ a real dictionary conn = psycopg2.connect(connection_factory = psycopg2.extras.DictConnection, **creds_kwargs) except psycopg2.OperationalError as e: _log.error('failed to establish connection [%s]', creds2use.formatted_credentials) t, v, tb = sys.exc_info() try: msg = e.args[0] except (AttributeError, IndexError, TypeError): raise if not self.__is_auth_fail_msg(msg): raise raise cAuthenticationError(creds2use.formatted_credentials, msg).with_traceback(tb) _log.debug('established connection "%s", backend PID: %s', gmTools.coalesce(connection_name, 'anonymous'), conn.get_backend_pid()) # safe-guard conn._original_rollback = conn.rollback conn.rollback = types.MethodType(_safe_transaction_rollback, conn) # - inspect server self.__log_on_first_contact(conn) # - verify PG understands client time zone self.__detect_client_timezone(conn) # - set access mode if readonly: _log.debug('readonly: forcing autocommit=True to avoid <IDLE IN TRANSACTION>') autocommit = True else: _log.debug('autocommit is desired to be: %s', autocommit) conn.commit() conn.autocommit = autocommit conn.readonly = readonly # - assume verbose=True to mean we want debugging in the database, too if verbose or _VERBOSE_PG_LOG: _log.debug('enabling <plpgsql.extra_warnings/_errors>') curs = conn.cursor() try: curs.execute("SET plpgsql.extra_warnings TO 'all'") except Exception: _log.exception('cannot enable <plpgsql.extra_warnings>') finally: curs.close() conn.commit() curs = conn.cursor() try: curs.execute("SET plpgsql.extra_errors TO 'all'") except Exception: _log.exception('cannot enable <plpgsql.extra_errors>') finally: curs.close() conn.commit() _log.debug('enabling auto_explain') curs = conn.cursor() try: curs.execute("SELECT gm.load_auto_explain(3000)") except Exception: _log.exception('cannot enable auto_explain') finally: curs.close() conn.commit() return conn
Get a raw, unadorned connection.
This will not set any parameters such as encoding, timezone, or datestyle, hence it can be used for "service" connections for verifying encodings etc
def get_ro_conn(self,
verbose: bool = False,
connection_name: str = None,
autocommit: bool = False) ‑> psycopg2.extras.DictConnection-
Expand source code
def get_ro_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection: return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit)
def get_rw_conn(self,
verbose: bool = False,
connection_name: str = None,
autocommit: bool = False) ‑> psycopg2.extras.DictConnection-
Expand source code
def get_rw_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection: return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit)
def shutdown(self)
-
Expand source code
def shutdown(self): """Close and discard all pooled connections.""" for conn_key in self.__ro_conn_pool: conn = self.__ro_conn_pool[conn_key] if conn.closed: continue _log.debug('closing open database connection, pool key: %s', conn_key) log_conn_state(conn) conn.close = conn.original_close conn.close() del self.__ro_conn_pool
Close and discard all pooled connections.