Module Gnumed.pycommon.gmPG2
GNUmed PostgreSQL connection handling.
TODO: iterator/generator batch fetching: - http://groups-beta.google.com/group/comp.lang.python/msg/7ff516d7d9387dad - search Google for "Geneator/Iterator Nesting Problem - Any Ideas? 2.4"
winner:
def resultset_functional_batchgenerator(cursor, size=100):
for results in iter(lambda: cursor.fetchmany(size), []):
for rec in results:
yield rec
Functions
def bytea2file(data_query: dict = None,
filename: str = None,
chunk_size: int = 0,
data_size: int = None,
data_size_query: dict = None,
conn=None,
link2cached: bool = True) ‑> bool-
Expand source code
def bytea2file ( data_query:dict=None, filename:str=None, chunk_size:int=0, data_size:int=None, data_size_query:dict=None, conn=None, link2cached:bool=True ) -> bool: """Store data from a bytea field into a file. Args: data_query: * data_query['sql']:str, SQL to retrieve the BYTEA column (say, <data>), must contain '... SUBSTRING(data FROM %(start)s FOR %(size)s) ...', must return one row with one field of type bytea * data_query['args']:dict, must contain selectors for the BYTEA row filename: the file to store into data_size: total size of the expected data, or None data_size_query: * only used when data_size is None * dict {'sql': ..., 'args': ...} * must return one row with one field with the octet_length() of the data field link2cached: if the bytea data is found in the cache, whether to return a link to the cache file or to create a copy thereof Returns: True/False based on success. Exception on errors. """ if data_size == 0: open(filename, 'wb').close() return True if data_size is None: rows = run_ro_queries(link_obj = conn, queries = [data_size_query]) if not rows: _log.error('cannot determine size: %s', data_size_query) return False data_size = rows[0][0] if data_size is None: _log.error('cannot determine size: %s', data_size_query) return False if data_size == 0: open(filename, 'wb').close() return True if conn is None: conn = gmConnectionPool.gmConnectionPool().get_connection() cache_key_data = '%s::%s' % (conn.dsn, data_query) found_in_cache = __get_file_from_cache(filename, cache_key_data = cache_key_data, data_size = data_size, link2cached = link2cached) if found_in_cache: # FIXME: start thread checking cache staleness on file return True with open(filename, 'wb') as outfile: result = bytea2file_object ( data_query = data_query, file_obj = outfile, chunk_size = chunk_size, data_size = data_size, data_size_query = data_size_query, conn = conn ) __store_file_in_cache(filename, cache_key_data) return result
Store data from a bytea field into a file.
Args
data_query:
- data_query['sql']:str, SQL to retrieve the BYTEA column (say, ), must contain '… SUBSTRING(data FROM %(start)s FOR %(size)s) …', must return one row with one field of type bytea
- data_query['args']:dict, must contain selectors for the BYTEA row
filename
- the file to store into
data_size
- total size of the expected data, or None
data_size_query:
- only used when data_size is None
- dict {'sql': …, 'args': …}
- must return one row with one field with the octet_length() of the data field
link2cached
- if the bytea data is found in the cache, whether to return a link to the cache file or to create a copy thereof
Returns
True/False based on success. Exception on errors.
def bytea2file_object(data_query: dict = None,
file_obj=None,
chunk_size: int = 0,
data_size: int = None,
data_size_query: dict = None,
conn: psycopg2.extras.DictConnection | None = None) ‑> bool-
Expand source code
def bytea2file_object ( data_query:dict=None, file_obj=None, chunk_size:int=0, data_size:int=None, data_size_query:dict=None, conn:dbapi.extras.DictConnection|None=None ) -> bool: """Stream data from a bytea field into a file-like object. Args: data_query: * data_query['sql']:str, SQL to retrieve the BYTEA column (say, <data>), must contain '... SUBSTRING(data FROM %(start)s FOR %(size)s) ...', must return one row with one field of type bytea * data_query['args']:dict, must contain selectors for the BYTEA row file_obj: a file-like Python object data_size: total size of the expected data, or None data_size_query: * only used when data_size is None * dict {'sql': ..., 'args': ...} * must return one row with one field with the octet_length() of the data field Returns: True on success. Exception on errors. """ if data_size == 0: return True # If the client sets an encoding other than the default we # will receive encoding-parsed data which isn't the binary # content we want. Hence we need to get our own connection. # It must be a read-write one so that we don't affect the # encoding for other users of the shared read-only # connections. # Actually, encodings shouldn't be applied to binary data # (eg. bytea types) in the first place but that is only # reported to be fixed > v7.4. # further tests reveal that at least on PG 8.0 this bug still # manifests itself if conn is None: conn = get_raw_connection(readonly = True) if data_size is None: rows = run_ro_queries(link_obj = conn, queries = [data_size_query]) data_size = rows[0][0] if data_size in [None, 0]: conn.rollback() return True max_chunk_size = 1024 * 1024 * 20 # 20 MB, works for typical CR DICOMs if chunk_size == 0: chunk_size = min(data_size, max_chunk_size) _log.debug('expecting %s bytes of BYTEA data in chunks of %s bytes', data_size, chunk_size) # Windoze sucks: it can't transfer objects of arbitrary size, # anyways, we need to split the transfer, # however, only possible if postgres >= 7.2 needed_chunks, remainder = divmod(data_size, chunk_size) _log.debug('%s chunk(s), %s byte(s) remainder', needed_chunks, remainder) # retrieve chunks, skipped if data size < chunk size, # does this not carry the danger of cutting up multi-byte escape sequences ? # no, since bytea is binary, # yes, since in bytea there are *some* escaped values, still # no, since those are only escaped during *transfer*, not on-disk, hence # only complete escape sequences are put on the wire for chunk_id in range(needed_chunks): chunk_start = (chunk_id * chunk_size) + 1 data_query['args']['start'] = chunk_start data_query['args']['size'] = chunk_size try: rows = run_ro_queries(link_obj=conn, queries=[data_query]) except Exception: _log.error('cannot retrieve chunk [%s/%s], size [%s], try decreasing chunk size' % (chunk_id+1, needed_chunks, chunk_size)) conn.rollback() raise # it would be a fatal error to see more than one result as ids are supposed to be unique file_obj.write(rows[0][0]) # retrieve remainder if remainder > 0: chunk_start = (needed_chunks * chunk_size) + 1 data_query['args']['start'] = chunk_start data_query['args']['size'] = remainder try: rows = run_ro_queries(link_obj=conn, queries=[data_query]) except Exception: _log.error('cannot retrieve remaining [%s] bytes' % remainder) conn.rollback() raise # it would be a fatal error to see more than one result as ids are supposed to be unique file_obj.write(rows[0][0]) conn.rollback() return True
Stream data from a bytea field into a file-like object.
Args
data_query:
- data_query['sql']:str, SQL to retrieve the BYTEA column (say, ), must contain '… SUBSTRING(data FROM %(start)s FOR %(size)s) …', must return one row with one field of type bytea
- data_query['args']:dict, must contain selectors for the BYTEA row
file_obj
- a file-like Python object
data_size
- total size of the expected data, or None
data_size_query:
- only used when data_size is None
- dict {'sql': …, 'args': …}
- must return one row with one field with the octet_length() of the data field
Returns
True on success. Exception on errors.
def check_fk_encounter_fk_episode_x_ref()
-
Expand source code
def check_fk_encounter_fk_episode_x_ref(): aggregate_result = 0 fks_linking2enc = get_foreign_keys2column(schema = 'clin', table = 'encounter', column = 'pk') tables_linking2enc = set([ r['referencing_table'] for r in fks_linking2enc ]) fks_linking2epi = get_foreign_keys2column(schema = 'clin', table = 'episode', column = 'pk') tables_linking2epi = [ r['referencing_table'] for r in fks_linking2epi ] tables_linking2both = tables_linking2enc.intersection(tables_linking2epi) tables_linking2enc = {} for fk in fks_linking2enc: table = fk['referencing_table'] tables_linking2enc[table] = fk tables_linking2epi = {} for fk in fks_linking2epi: table = fk['referencing_table'] tables_linking2epi[table] = fk for t in tables_linking2both: table_file_name = 'x-check_enc_epi_xref-%s.log' % t table_file = open(table_file_name, 'w+', encoding = 'utf8') # get PK column args = {'table': t} rows = run_ro_queries(queries = [{'sql': SQL_get_pk_col_def, 'args': args}]) pk_col = rows[0][0] print("checking table:", t, '- pk col:', pk_col) print(' =>', table_file_name) table_file.write('table: %s\n' % t) table_file.write('PK col: %s\n' % pk_col) # get PKs cmd = 'select %s from %s' % (pk_col, t) rows = run_ro_queries(queries = [{'sql': cmd}]) pks = [ r[0] for r in rows ] for pk in pks: args = {'pk': pk, 'tbl': t} enc_cmd = "select fk_patient from clin.encounter where pk = (select fk_encounter from %s where %s = %%(pk)s)" % (t, pk_col) epi_cmd = "select fk_patient from clin.encounter where pk = (select fk_encounter from clin.episode where pk = (select fk_episode from %s where %s = %%(pk)s))" % (t, pk_col) enc_rows = run_ro_queries(queries = [{'sql': enc_cmd, 'args': args}]) epi_rows = run_ro_queries(queries = [{'sql': epi_cmd, 'args': args}]) enc_pat = enc_rows[0][0] epi_pat = epi_rows[0][0] args['pat_enc'] = enc_pat args['pat_epi'] = epi_pat if epi_pat != enc_pat: print(' mismatch: row pk=%s, enc pat=%s, epi pat=%s' % (pk, enc_pat, epi_pat)) aggregate_result = -2 table_file.write('--------------------------------------------------------------------------------\n') table_file.write('mismatch on row with pk: %s\n' % pk) table_file.write('\n') table_file.write('journal entry:\n') cmd = 'SELECT * from clin.v_emr_journal where src_table = %(tbl)s AND src_pk = %(pk)s' rows = run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: table_file.write(gmTools.format_dict_like(rows[0], left_margin = 1, tabular = False, value_delimiters = None)) table_file.write('\n\n') table_file.write('row data:\n') cmd = 'SELECT * from %s where %s = %%(pk)s' % (t, pk_col) rows = run_ro_queries(queries = [{'sql': cmd, 'args': args}]) table_file.write(gmTools.format_dict_like(rows[0], left_margin = 1, tabular = False, value_delimiters = None)) table_file.write('\n\n') table_file.write('episode:\n') cmd = 'SELECT * from clin.v_pat_episodes WHERE pk_episode = (select fk_episode from %s where %s = %%(pk)s)' % (t, pk_col) rows = run_ro_queries(queries = [{'sql': cmd, 'args': args}]) table_file.write(gmTools.format_dict_like(rows[0], left_margin = 1, tabular = False, value_delimiters = None)) table_file.write('\n\n') table_file.write('patient of episode:\n') cmd = 'SELECT * FROM dem.v_persons WHERE pk_identity = %(pat_epi)s' rows = run_ro_queries(queries = [{'sql': cmd, 'args': args}]) table_file.write(gmTools.format_dict_like(rows[0], left_margin = 1, tabular = False, value_delimiters = None)) table_file.write('\n\n') table_file.write('encounter:\n') cmd = 'SELECT * from clin.v_pat_encounters WHERE pk_encounter = (select fk_encounter from %s where %s = %%(pk)s)' % (t, pk_col) rows = run_ro_queries(queries = [{'sql': cmd, 'args': args}]) table_file.write(gmTools.format_dict_like(rows[0], left_margin = 1, tabular = False, value_delimiters = None)) table_file.write('\n\n') table_file.write('patient of encounter:\n') cmd = 'SELECT * FROM dem.v_persons WHERE pk_identity = %(pat_enc)s' rows = run_ro_queries(queries = [{'sql': cmd, 'args': args}]) table_file.write(gmTools.format_dict_like(rows[0], left_margin = 1, tabular = False, value_delimiters = None)) table_file.write('\n') table_file.write('done\n') table_file.close() return aggregate_result
def create_group_role(group_role: str = None, admin_role: str = None, link_obj=None) ‑> bool
-
Expand source code
def create_group_role(group_role:str=None, admin_role:str=None, link_obj=None) -> bool: if not create_role(role = group_role, link_obj = link_obj): return False if not admin_role: return True _log.debug('adding admin "%s" to group role "%s"', admin_role, group_role) SQL = 'GRANT "%s" to "%s" WITH ADMIN OPTION;' % (group_role, admin_role) run_rw_query(link_obj = link_obj, sql = SQL, return_data = False) return True
def create_role(role: str = None, password: str = None, link_obj=None) ‑> bool
-
Expand source code
def create_role(role:str=None, password:str=None, link_obj=None) -> bool: if role_exists(role = role, link_obj = link_obj): return True if password: _log.debug('creating role "%s" with password', role) SQL = 'CREATE ROLE "%s" WITH ENCRYPTED PASSWORD \'%s\'' % (role, password) else: _log.debug('creating role "%s" withOUT password', role) SQL = 'CREATE ROLE "%s"' % role run_rw_query(link_obj = link_obj, sql = SQL, return_data = False) return role_exists(role = role, link_obj = link_obj)
def create_user_role(user_role: str = None, password: str = None, link_obj=None) ‑> bool
-
Expand source code
def create_user_role(user_role:str=None, password:str=None, link_obj=None) -> bool: if role_exists(role = user_role, link_obj = link_obj): # make sure it is a user role (has LOGIN) SQL = 'ALTER ROLE "%s" WITH LOGIN' run_rw_query(link_obj = link_obj, sql = SQL, return_data = False) return True _log.debug('creating user (can_login) role "%s"', user_role) # implies LOGIN SQL = 'CREATE USER "%s" WITH ENCRYPTED PASSWORD \'%s\'' % (user_role, password) run_rw_query(link_obj = link_obj, sql = SQL, return_data = False) return role_exists(role = user_role, link_obj = link_obj)
def database_schema_compatible(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
version: int = None,
verbose: bool = True) ‑> bool-
Expand source code
def database_schema_compatible(link_obj:_TLnkObj=None, version:int=None, verbose:bool=True) -> bool: expected_hash = known_schema_hashes[version] ver = 9999 if version == 0 else version md5_db = get_schema_hash(link_obj = link_obj, version = ver) if md5_db == expected_hash: _log.info('detected schema version [%s], hash [%s]' % (map_schema_hash2version[md5_db], md5_db)) return True _log.error('database schema version mismatch') _log.error('expected: %s (%s)' % (version, expected_hash)) try: _log.error('detected: %s (%s)', map_schema_hash2version[md5_db], md5_db) except KeyError: _log.error('detected: <unknown> (%s)', md5_db) if verbose: log_schema_structure(link_obj = link_obj) log_schema_revision_history(link_obj = link_obj) return False
def delete_translation_from_database(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
language=None,
original=None)-
Expand source code
def delete_translation_from_database(link_obj:_TLnkObj=None, language=None, original=None): cmd = 'DELETE FROM i18n.translations WHERE lang = %(lang)s AND orig = %(orig)s' args = {'lang': language, 'orig': original} run_rw_queries(link_obj = link_obj, queries = [{'sql': cmd, 'args': args}], return_data = False, end_tx = True) return True
def discard_pooled_connection_of_thread()
-
Expand source code
def discard_pooled_connection_of_thread(): gmConnectionPool.gmConnectionPool().discard_pooled_connection_of_thread()
def export_translations_from_database(filename=None)
-
Expand source code
def export_translations_from_database(filename=None): tx_file = open(filename, mode = 'wt', encoding = 'utf8') tx_file.write('-- GNUmed database string translations exported %s\n' % gmDateTime.pydt_now_here().strftime('%Y-%m-%d %H:%M')) tx_file.write('-- - contains translations for each of [%s]\n' % ', '.join(get_translation_languages())) tx_file.write('-- - user database language is set to [%s]\n\n' % get_current_user_language()) tx_file.write('-- Please email this file to <gnumed-devel@gnu.org>.\n') tx_file.write('-- ----------------------------------------------------------------------------------------------\n\n') tx_file.write('set default_transaction_read_only to off;\n\n') tx_file.write("set client_encoding to 'utf-8';\n\n") tx_file.write('\\unset ON_ERROR_STOP\n\n') cmd = 'SELECT lang, orig, trans FROM i18n.translations ORDER BY lang, orig' rows = run_ro_queries(queries = [{'sql': cmd}]) for row in rows: line = "select i18n.upd_tx(E'%s', E'%s', E'%s');\n" % ( row['lang'].replace("'", "\\'"), row['orig'].replace("'", "\\'"), row['trans'].replace("'", "\\'") ) tx_file.write(line) tx_file.write('\n') tx_file.write('\set ON_ERROR_STOP 1\n') tx_file.close() return True
def file2bytea(query: str = None,
filename: str = None,
args: dict = None,
conn=None,
file_md5: str = None) ‑> bool-
Expand source code
def file2bytea(query:str=None, filename:str=None, args:dict=None, conn=None, file_md5:str=None) -> bool: """Store data from a file into a bytea field. Args: query: SQL, * INSERT or UPDATE * must contain a format spec named '%(data)s' for the BYTEA data, say '... <BYTEA data column> = %(data)s::BYTEA' * if UPDATE, must contain a format spec identifying the row (eg a primary key), say '... AND pk_column = %(pk_val)s' * can contain a '... RETURNING md5(<BYTEA data column>) AS md5' args: if UPDATE, must contain primary key placeholder matching format spec in <query>, say {'pk_val': pk_value, ...} file_md5: * md5 sum of the file in "filename" * if given, and <query> RETURNs a column 'md5', the returned value is compared to the given value Returns: Whether operation seems to have succeeded. """ attempt = 0 infile = None while attempt < 3: attempt += 1 try: infile = open(filename, "rb") except (BlockingIOError, FileNotFoundError, PermissionError): _log.exception('#%s: cannot open [%s]', attempt, filename) _log.error('retrying after 100ms') time.sleep(0.1) break if infile is None: return False data_as_byte_string = infile.read() infile.close() if args is None: args = {} args['data'] = memoryview(data_as_byte_string) # really still needed for BYTEA input ? del(data_as_byte_string) if conn is None: conn = get_raw_connection(readonly = False) conn_close = conn.close else: conn_close = lambda *x: None rows = run_rw_queries ( link_obj = conn, queries = [{'sql': query, 'args': args}], end_tx = False, return_data = (file_md5 is not None) ) if file_md5 is None: conn.commit() conn_close() return True db_md5 = rows[0]['md5'] if file_md5 == db_md5: conn.commit() conn_close() _log.debug('MD5 sums of data file and database BYTEA field match: [file::%s] = [DB::%s]', file_md5, db_md5) return True conn.rollback() conn_close() _log.error('MD5 sums of data file and database BYTEA field do not match: [file::%s] <> [DB::%s]', file_md5, db_md5) return False
Store data from a file into a bytea field.
Args
query
- SQL,
- INSERT or UPDATE
- must contain a format spec named '%(data)s' for the BYTEA data, say '…
= %(data)s::BYTEA' - if UPDATE, must contain a format spec identifying the row (eg a primary key), say '… AND pk_column = %(pk_val)s'
- can contain a '… RETURNING md5(
) AS md5'
args
- if UPDATE, must contain primary key placeholder matching format spec in
, say {'pk_val': pk_value, …}
file_md5:
- md5 sum of the file in "filename"
- if given, and
RETURNs a column 'md5', the returned value is compared to the given value
Returns
Whether operation seems to have succeeded.
def file2lo(filename=None, conn=None, check_md5=False, file_md5=None)
-
Expand source code
def file2lo(filename=None, conn=None, check_md5=False, file_md5=None): # 1 GB limit unless 64 bit Python build ... file_size = os.path.getsize(filename) if file_size > (1024 * 1024) * 1024: _log.debug('file size of [%s] > 1 GB, supposedly not supported by psycopg2 large objects (but seems to work anyway ?)', file_size) # return -1 if conn is None: conn = get_raw_connection(readonly = False) close_conn = conn.close else: close_conn = lambda *x: None _log.debug('[%s] -> large object', filename) # insert the data lo = conn.lobject(0, 'w', 0, filename) lo_oid = lo.oid lo.close() _log.debug('large object OID: %s', lo_oid) # verify if file_md5 is None: conn.commit() close_conn() return lo_oid cmd = 'SELECT md5(lo_get(%(loid)s::oid))' args = {'loid': lo_oid} rows = run_ro_queries(link_obj = conn, queries = [{'sql': cmd, 'args': args}]) db_md5 = rows[0][0] if file_md5 == db_md5: conn.commit() close_conn() _log.debug('MD5 sums of data file and database large object match: [file::%s] = [DB::%s]', file_md5, db_md5) return lo_oid conn.rollback() close_conn() _log.error('MD5 sums of data file and database large object [%s] do not match: [file::%s] <> [DB::%s]', lo_oid, file_md5, db_md5) return -1
def force_user_language(language=None)
-
Expand source code
def force_user_language(language=None): """Set the user language in the database. - regardless of whether there is any translation available. - only for the current user """ _log.info('forcing database language for current db user to [%s]', language) run_rw_queries(queries = [{ 'sql': 'select i18n.force_curr_lang(%(lang)s)', 'args': {'lang': language} }])
Set the user language in the database.
- regardless of whether there is any translation available.
- only for the current user
def function_exists(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
schema=None,
function=None)-
Expand source code
def function_exists(link_obj:_TLnkObj=None, schema=None, function=None): cmd = """ SELECT EXISTS ( SELECT 1 FROM pg_proc WHERE proname = %(func)s AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = %(schema)s) ) """ args = { 'func': function, 'schema': schema } rows = run_ro_queries(link_obj = link_obj, queries = [{'sql': cmd, 'args': args}]) return rows[0][0]
def get_child_tables(schema='public', table=None, link_obj=None)
-
Expand source code
def get_child_tables(schema='public', table=None, link_obj=None): """Return child tables of <table>.""" cmd = """ select pgn.nspname as namespace, pgc.relname as table from pg_namespace pgn, pg_class pgc where pgc.relnamespace = pgn.oid and pgc.oid in ( select inhrelid from pg_inherits where inhparent = ( select oid from pg_class where relnamespace = (select oid from pg_namespace where nspname = %(schema)s) and relname = %(table)s ) )""" rows = run_ro_queries(link_obj = link_obj, queries = [{'sql': cmd, 'args': {'schema': schema, 'table': table}}]) return rows
Return child tables of
.
def get_col_defs(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
schema='public',
table=None)Expand source code
def get_col_defs(link_obj:_TLnkObj=None, schema='public', table=None): args = {'schema': schema, 'table': table} rows = run_ro_queries(link_obj = link_obj, queries = [{'sql': SQL__col_defs4table, 'args': args}]) col_names = [] col_type = {} for row in rows: col_names.append(row[0]) # map array types if row[1].startswith('_'): col_type[row[0]] = row[1][1:] + '[]' else: col_type[row[0]] = row[1] col_defs = [] col_defs.append(col_names) col_defs.append(col_type) # type: ignore [arg-type] return col_defs
def get_col_indices(cursor=None)
Expand source code
def get_col_indices(cursor = None): if cursor.description is None: _log.error('no result description available: unused cursor or last query did not select rows') return None col_indices = {} col_index = 0 for col_desc in cursor.description: col_name = col_desc[0] # a query like "select 1,2;" will return two columns of the same name ! # hence adjust to that, note, however, that dict-style access won't work # on results of such queries ... if col_name in col_indices: col_name = '%s_%s' % (col_name, col_index) col_indices[col_name] = col_index col_index += 1 return col_indices
def get_col_names(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
schema='public',
table=None)Expand source code
def get_col_names(link_obj:_TLnkObj=None, schema='public', table=None): """Return column attributes of table""" args = {'schema': schema, 'table': table} rows = run_ro_queries(link_obj = link_obj, queries = [{'sql': SQL__cols4table, 'args': args}]) return [ row[0] for row in rows]
Return column attributes of table
def get_connection(readonly: bool = True,
verbose: bool = False,
pooled: bool = True,
connection_name: str = None,
autocommit: bool = False) ‑> psycopg2.extras.DictConnectionExpand source code
def get_connection(readonly:bool=True, verbose:bool=False, pooled:bool=True, connection_name:str=None, autocommit:bool=False) -> dbapi.extras.DictConnection: return gmConnectionPool.gmConnectionPool().get_connection ( readonly = readonly, verbose = verbose, connection_name = connection_name, autocommit = autocommit, pooled = pooled )
def get_current_user() ‑> str
Expand source code
def get_current_user() -> str: rows = run_ro_query(sql = 'SELECT CURRENT_USER') return rows[0][0]
def get_current_user_language()
Expand source code
def get_current_user_language(): cmd = 'select i18n.get_curr_lang()' rows = run_ro_queries(queries = [{'sql': cmd}]) return rows[0][0]
def get_database_translations(language=None, order_by=None)
Expand source code
def get_database_translations(language=None, order_by=None): args = {'lang': language} _log.debug('language [%s]', language) if order_by is None: order_by = 'ORDER BY %s' % order_by else: order_by = 'ORDER BY lang, orig' if language is None: cmd = """ SELECT DISTINCT ON (orig, lang) lang, orig, trans FROM (( -- strings stored as translation keys whether translated or not SELECT NULL as lang, ik.orig, NULL AS trans FROM i18n.keys ik ) UNION ALL ( -- already translated strings SELECT it.lang, it.orig, it.trans FROM i18n.translations it )) as translatable_strings %s""" % order_by else: cmd = """ SELECT DISTINCT ON (orig, lang) lang, orig, trans FROM (( -- strings stored as translation keys whether translated or not SELECT %%(lang)s as lang, ik.orig, i18n._(ik.orig, %%(lang)s) AS trans FROM i18n.keys ik ) UNION ALL ( -- already translated strings SELECT %%(lang)s as lang, it.orig, i18n._(it.orig, %%(lang)s) AS trans FROM i18n.translations it )) AS translatable_strings %s""" % order_by rows = run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if rows is None: _log.error('no translatable strings found') else: _log.debug('%s translatable strings found', len(rows)) return rows
def get_db_fingerprint(conn=None, fname: str = None, with_dump: bool = False, eol: str = None)
Expand source code
def get_db_fingerprint(conn=None, fname:str=None, with_dump:bool=False, eol:str=None): """Get a fingerprint for a GNUmed database. A "fingerprint" is a collection of settings and typical row counts. Args: conn: a database connection fname: name of file to write fingerprint to, *None* = return text with_dump: include dump of schema structure (tables, views, ...) eol: concatenate list by this string when returning text (rather than when writing to a file) Returns: * if "fname" is not None: filename * if "eol" is None: list of lines with fingerprint data * if "eol" is not None: lines with fingerprint data joined with "eol" """ queries = [ ("Version (PG)", "SELECT setting FROM pg_settings WHERE name = 'server_version'"), ('Encoding (PG)', "SELECT setting FROM pg_settings WHERE name = 'server_encoding'"), ('LC_COLLATE (PG)', "SELECT setting FROM pg_settings WHERE name = 'lc_collate'"), ('pg_database.datcollate (PG)', "SELECT datcollate FROM pg_database WHERE datname = current_database()"), ('LC_CTYPE (PG)', "SELECT setting FROM pg_settings WHERE name = 'lc_ctype'"), ('pg_database.datctype (PG)', "SELECT datctype FROM pg_database WHERE datname = current_database()"), ('Patients', "SELECT COUNT(*) FROM dem.identity"), ('Contacts', "SELECT COUNT(*) FROM clin.encounter"), ('Episodes', "SELECT COUNT(*) FROM clin.episode"), ('Issues', "SELECT COUNT(*) FROM clin.health_issue"), ('Results', "SELECT COUNT(*) FROM clin.test_result"), ('Vaccinations', "SELECT COUNT(*) FROM clin.vaccination"), ('Documents', "SELECT COUNT(*) FROM blobs.doc_med"), ('Objects', "SELECT COUNT(*) FROM blobs.doc_obj"), ('Organizations', "SELECT COUNT(*) FROM dem.org"), ("Organizational units", "SELECT COUNT(*) FROM dem.org_unit"), (' Earliest .modified_when', "SELECT min(modified_when) FROM audit.audit_fields"), ('Most recent .modified_when', "SELECT max(modified_when) FROM audit.audit_fields"), (' Earliest .audit_when', "SELECT min(audit_when) FROM audit.audit_trail"), (' Most recent .audit_when', "SELECT max(audit_when) FROM audit.audit_trail") ] if conn is None: conn = get_connection(readonly = True) database = conn.get_dsn_parameters()['dbname'] lines = [ 'Fingerprinting GNUmed database ...', '', '%20s: %s' % ('Name (DB)', database) ] curs = conn.cursor() # get size cmd = "SELECT pg_size_pretty(pg_database_size('%s'))" % database curs.execute(cmd) rows = curs.fetchall() lines.append('%20s: %s' % ('Size (DB)', rows[0][0])) # get hash md5_sum = get_schema_hash(link_obj = curs) try: lines.append('%20s: %s (v%s)' % ('Schema hash', md5_sum, map_schema_hash2version[md5_sum])) except KeyError: lines.append('%20s: %s' % ('Schema hash', md5_sum)) for label, cmd in queries: try: curs.execute(cmd) rows = curs.fetchall() if rows: val = rows[0][0] else: val = '<not found>' except PG_ERROR_EXCEPTION as pg_exc: if pg_exc.pgcode != PG_error_codes.INSUFFICIENT_PRIVILEGE: raise if pg_exc.pgerror is None: val = '[%s]: insufficient privileges' % pg_exc.pgcode else: val = '[%s]: %s' % (pg_exc.pgcode, pg_exc.pgerror) lines.append('%20s: %s' % (label, val)) if with_dump: lines.append('') lines.append(str(get_schema_structure(link_obj = curs))) curs.close() if fname is None: if eol is None: return lines return eol.join(lines) outfile = open(fname, mode = 'wt', encoding = 'utf8') outfile.write('\n'.join(lines)) outfile.close() return fname
Get a fingerprint for a GNUmed database.
A "fingerprint" is a collection of settings and typical row counts.
Args
conn
- a database connection
fname
- name of file to write fingerprint to, None = return text
with_dump
- include dump of schema structure (tables, views, …)
eol
- concatenate list by this string when returning text (rather than when writing to a file)
Returns
- if "fname" is not None: filename
- if "eol" is None: list of lines with fingerprint data
- if "eol" is not None: lines with fingerprint data joined with "eol"
def get_foreign_key_names(src_schema=None,
src_table=None,
src_column=None,
target_schema=None,
target_table=None,
target_column=None,
link_obj=None)Expand source code
def get_foreign_key_names(src_schema=None, src_table=None, src_column=None, target_schema=None, target_table=None, target_column=None, link_obj=None): args = { 'src_schema': src_schema, 'src_tbl': src_table, 'src_col': src_column, 'target_schema': target_schema, 'target_tbl': target_table, 'target_col': target_column } rows = run_ro_queries ( link_obj = link_obj, queries = [{'sql': SQL_foreign_key_name, 'args': args}] ) return rows
def get_foreign_keys2column(schema='public', table=None, column=None, link_obj=None)
Expand source code
def get_foreign_keys2column(schema='public', table=None, column=None, link_obj=None): """Get the foreign keys pointing to schema.table.column. Does not properly work with multi-column FKs. GNUmed doesn't use any, however. """ args = { 'schema': schema, 'tbl': table, 'col': column } cmd = """ SELECT %(schema)s AS referenced_schema, %(tbl)s AS referenced_table, %(col)s AS referenced_column, pgc.confkey AS referenced_column_list, pgc.conrelid::regclass AS referencing_table, pgc.conkey AS referencing_column_list, (select attname from pg_attribute where attnum = pgc.conkey[1] and attrelid = pgc.conrelid) AS referencing_column FROM pg_constraint pgc WHERE pgc.contype = 'f' AND pgc.confrelid = ( select oid from pg_class where relname = %(tbl)s and relnamespace = ( select oid from pg_namespace where nspname = %(schema)s ) ) and ( select attnum from pg_attribute where attrelid = (select oid from pg_class where relname = %(tbl)s and relnamespace = ( select oid from pg_namespace where nspname = %(schema)s )) and attname = %(col)s ) = any(pgc.confkey) """ rows = run_ro_queries ( link_obj = link_obj, queries = [ {'sql': cmd, 'args': args} ] ) return rows
Get the foreign keys pointing to schema.table.column.
Does not properly work with multi-column FKs. GNUmed doesn't use any, however.
def get_index_name(indexed_table=None, indexed_column=None, link_obj=None)
Expand source code
def get_index_name(indexed_table=None, indexed_column=None, link_obj=None): args = { 'idx_tbl': indexed_table, 'idx_col': indexed_column } rows = run_ro_query(link_obj = link_obj, sql = SQL_get_index_name, args = args) return rows
def get_raw_connection(verbose: bool = False,
readonly: bool = True,
connection_name: str = None,
autocommit: bool = False) ‑> psycopg2.extras.DictConnectionExpand source code
def get_raw_connection(verbose:bool=False, readonly:bool=True, connection_name:str=None, autocommit:bool=False) -> dbapi.extras.DictConnection: """Get a raw, unadorned connection. * this will not set any parameters such as encoding, timezone, datestyle * the only requirement is valid connection parameters having been passed to the connection pool * hence it can be used for "service" connections for verifying encodings etc """ return gmConnectionPool.gmConnectionPool().get_raw_connection ( readonly = readonly, verbose = verbose, connection_name = connection_name, autocommit = autocommit )
Get a raw, unadorned connection.
- this will not set any parameters such as encoding, timezone, datestyle
- the only requirement is valid connection parameters having been passed to the connection pool
- hence it can be used for "service" connections for verifying encodings etc
def get_schema_hash(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
version=None) ‑> strExpand source code
def get_schema_hash(link_obj:_TLnkObj=None, version=None) -> str: md5_db = __get_schema_hash_by_gm_func(link_obj = link_obj, version = version) if not md5_db: _log.debug('retrying with temporary function') md5_db = __get_schema_hash_by_pg_temp_func() return md5_db
def get_schema_revision_history(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None) ‑> list[psycopg2.extras.DictRow]
Expand source code
def get_schema_revision_history(link_obj:_TLnkObj=None) -> list[_TRow]: if table_exists(link_obj = link_obj, schema = 'gm', table = 'schema_revision'): cmd = """ SELECT imported::text, version, filename FROM gm.schema_revision ORDER BY imported""" elif table_exists(link_obj = link_obj, schema = 'public', table = 'gm_schema_revision'): cmd = """ SELECT imported::text, version, filename FROM public.gm_schema_revision ORDER BY imported""" else: return [] rows = run_ro_queries(link_obj = link_obj, queries = [{'sql': cmd}]) return rows
def get_schema_structure(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None) ‑> str
Expand source code
def get_schema_structure(link_obj:_TLnkObj=None) -> str: schema_struct = __get_schema_structure_by_gm_func(link_obj = link_obj) if not schema_struct: _log.debug('retrying with temporary function') schema_struct = __get_schema_structure_by_pg_temp_func() return schema_struct
def get_schema_version(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None) ‑> str | int
Expand source code
def get_schema_version(link_obj:_TLnkObj=None) -> int|str: md5_db = get_schema_hash(link_obj = link_obj) if not md5_db: _log.error('cannot determine schema version') return None try: return map_schema_hash2version[md5_db] except KeyError: return 'unknown database schema version, MD5 hash is [%s]' % md5_db
def get_translation_languages()
Expand source code
def get_translation_languages(): rows = run_ro_queries ( queries = [{'sql': 'select distinct lang from i18n.translations'}] ) return [ r[0] for r in rows ]
def group_role_exists(group_role: str = None, link_obj=None) ‑> bool
Expand source code
def group_role_exists(group_role:str=None, link_obj=None) -> bool: return role_exists(role = group_role, link_obj = link_obj)
def is_beginning_of_time(dt: datetime.datetime) ‑> bool
Expand source code
def is_beginning_of_time(dt:pydt.datetime) -> bool: global PG_BEGINNING_OF_TIME if not PG_BEGINNING_OF_TIME: SQL = "SELECT '-infinity'::TIMESTAMP WITH TIME ZONE AT TIME ZONE 'UTC' AS big_bang" rows = run_ro_query(sql= SQL) PG_BEGINNING_OF_TIME = rows[0]['big_bang'] _log.debug("psycopg2 puts PG's Big Bang at: %s ('-infinity' at UTC)", PG_BEGINNING_OF_TIME) pydt_bing_bang = pydt.datetime(1,1,1) if pydt_bing_bang == PG_BEGINNING_OF_TIME: _log.debug('Python and PostgreSQL (via psycopg2) agree on the beginning of time') else: _log.error('Python3 does not agree, it thinks: %s (datetime(1,1,1))', pydt_bing_bang.isoformat()) return dt == PG_BEGINNING_OF_TIME
def is_pg_interval(candidate: str = None) ‑> bool
Expand source code
def is_pg_interval(candidate:str=None) -> bool: cmd = 'SELECT %(candidate)s::interval' try: run_ro_queries(queries = [{'sql': cmd, 'args': {'candidate': candidate}}]) except Exception: cmd = 'SELECT %(candidate)s::text::interval' try: run_ro_queries(queries = [{'sql': cmd, 'args': {'candidate': candidate}}]) except Exception: return False return True
def lock_row(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
table: str = None,
pk: int = None,
exclusive: bool = False) ‑> boolExpand source code
def lock_row(link_obj:_TLnkObj=None, table:str=None, pk:int=None, exclusive:bool=False) -> bool: """Get advisory lock on a table row. Uses pg_advisory(_shared). Technically, <table> and <pk> are just conventions for reproducibly generating the lock token. Locks stack upon each other and need one unlock per lock. Same connection: all locks succeed Different connections: - shared + shared: succeeds - shared + exclusive: fails Args: link_obj: None/connection/cursor table: the table in which to lock a row pk: the PK of the row to lock. exclusive: whether or not to lock _shared Returns: Whether lock was obtained or not. """ _log.debug('locking row: [%s] [%s] (exclusive: %s)', table, pk, exclusive) if exclusive: cmd = """SELECT pg_try_advisory_lock('%s'::regclass::oid::int, %s)""" % (table, pk) else: cmd = """SELECT pg_try_advisory_lock_shared('%s'::regclass::oid::int, %s)""" % (table, pk) rows = run_ro_queries(link_obj = link_obj, queries = [{'sql': cmd}]) if rows[0][0]: return True _log.warning('cannot lock row: [%s] [%s] (exclusive: %s)', table, pk, exclusive) return False
Get advisory lock on a table row.
Uses pg_advisory(_shared). Technically,
and
are just conventions for reproducibly generating the lock token. Locks stack upon each other and need one unlock per lock.
Same connection: all locks succeed
Different connections:
- shared + shared: succeeds - shared + exclusive: fails
Args
link_obj
- None/connection/cursor
table
- the table in which to lock a row
pk
- the PK of the row to lock.
exclusive
- whether or not to lock _shared
Returns
Whether lock was obtained or not.
def log_database_access(action=None)
Expand source code
def log_database_access(action=None): args = {'action': action} SQL = "INSERT INTO gm.access_log (user_action) VALUES (%(action)s)" run_rw_queries(queries = [{'sql': SQL, 'args': args}])
def log_pg_exception(exc: Exception, msg: str = None)
Expand source code
def log_pg_exception(exc:Exception, msg:str=None): gmConnectionPool.log_pg_exception_details(exc) _log.exception(msg)
def log_schema_revision_history(link_obj=None)
Expand source code
def log_schema_revision_history(link_obj=None): _log.debug('schema revision history dump:') for line in get_schema_revision_history(link_obj = link_obj): _log.debug(' - '.join(line))
def log_schema_structure(link_obj=None)
Expand source code
def log_schema_structure(link_obj=None): _log.debug('schema structure dump:') schema_struct = get_schema_structure(link_obj = link_obj) if not schema_struct: _log.error('cannot determine schema structure') return for line in schema_struct.split(): _log.debug(line)
def read_all_rows_of_table(schema=None, table=None)
Expand source code
def read_all_rows_of_table(schema=None, table=None): if schema is None: schema = prompted_input(prompt = 'schema for table to dump', default = None) if schema is None: _log.debug('aborted by user (no schema entered)') return None if table is None: table = prompted_input(prompt = 'table to dump (in schema %s.)' % schema, default = None) if table is None: _log.debug('aborted by user (no table entered)') return None _log.debug('dumping <%s.%s>', schema, table) conn = get_connection(readonly=True, verbose = False, pooled = True, connection_name = 'read_all_rows_of_table') # get pk column name rows = run_ro_queries(link_obj = conn, queries = [{'sql': SQL_get_primary_key_name, 'args': {'schema': schema, 'table': table}}]) if rows: _log.debug('primary key def: %s', rows) if len(rows) > 1: _log.error('cannot handle multi-column primary key') return False pk_name = rows[0][0] else: _log.debug('cannot determine primary key, asking user') pk_name = prompted_input(prompt = 'primary key name for %s.%s' % (schema, table), default = None) if pk_name is None: _log.debug('aborted by user (no primary key name entered)') return None # get PK values qualified_table = '%s.%s' % (schema, table) qualified_pk_name = '%s.%s.%s' % (schema, table, pk_name) cmd = PG_SQL.SQL('SELECT {schema_table_pk} FROM {schema_table} ORDER BY 1'.format ( schema_table_pk = qualified_pk_name, schema_table = qualified_table )) rows = run_ro_queries(link_obj = conn, queries = [{'sql': cmd}]) if not rows: _log.debug('no rows to dump') return True # dump table rows _log.debug('dumping %s rows', len(rows)) cmd = PG_SQL.SQL('SELECT * FROM {schema_table} WHERE {schema_table_pk} = %(pk_val)s'.format ( schema_table = qualified_table, schema_table_pk = qualified_pk_name )) found_errors = False idx = 0 for row in rows: idx += 1 args = {'pk_val': row[0]} _log.debug('dumping row #%s with pk [%s]', idx, row[0]) try: run_ro_queries(link_obj = conn, queries = [{'sql': cmd, 'args': args}]) except dbapi.InternalError: found_errors = True _log.exception('error dumping row') print('ERROR: cannot dump row %s of %s with pk %s = %s', idx, len(rows), qualified_pk_name, rows[0]) return found_errors is False
def refresh_collations_version_information(conn=None, use_the_source_luke=False) ‑> bool
Expand source code
def refresh_collations_version_information(conn=None, use_the_source_luke=False) -> bool: """Update the recorded versions in pg_collations. Needs to be run by the owner of the collations stored in pg_collation, typically the database owner. Args: conn: a psycopg2 connection to the database intended to be updated use_the_source_luke: do as you are told Returns: False: cannot refresh collations True: collations refreshed None: collations refresh function missing """ if not use_the_source_luke: _log.error('REINDEX and VALIDATE CONSTRAINT must have been run before updating collation version information') return False if __MIND_MELD not in use_the_source_luke: _log.error('REINDEX and VALIDATE CONSTRAINT must have been run before updating collation version information') return False if __LLAP not in use_the_source_luke: _log.error('REINDEX and VALIDATE CONSTRAINT must have been run before updating collation version information') return False _log.debug('Kelvin: refreshing pg_collations row version information') # https://www.postgresql.org/message-id/9aec6e6d-318e-4a36-96a4-3b898c3600c9%40manitou-mail.org SQL = 'SELECT gm.update_pg_collations();' try: try: run_rw_queries(link_obj = conn, queries = [{'sql': SQL}]) except dbapi.errors.UndefinedFunction as exc: # type-x: ignore [attr-defined] # pylint: disable=no-member if 'gm.update_pg_collations() does not exist' in exc.pgerror: _log.error('gm.update_pg_collations() does not exist') return None except dbapi.errors.InvalidSchemaName as exc: # type-x: ignore [attr-defined] # pylint: disable=no-member if 'schema "gm" does not exist' in exc.pgerror: _log.error('schema "gm" does not exist, cannot run gm.update_pg_collations()') return None raise except Exception: _log.exception('failure to update collations version information') return False return True
Update the recorded versions in pg_collations.
Needs to be run by the owner of the collations stored in pg_collation, typically the database owner.
Args
conn
- a psycopg2 connection to the database intended to be updated
use_the_source_luke
- do as you are told
Returns
False
- cannot refresh collations
True
- collations refreshed
None
- collations refresh function missing
def refresh_database_default_collation_version_information(conn=None, use_the_source_luke=False) ‑> bool
Expand source code
def refresh_database_default_collation_version_information(conn=None, use_the_source_luke=False) -> bool: """Update the recorded version of the database default collation. Args: conn: a psycopg2 connection for the database intended to be updated use_the_source_luke: do as you are told """ if not use_the_source_luke: _log.error('REINDEX and VALIDATE CONSTRAINT must have been run before updating collation version information') return False if __MIND_MELD not in use_the_source_luke: _log.error('REINDEX and VALIDATE CONSTRAINT must have been run before updating collation version information') return False if __LLAP not in use_the_source_luke: _log.error('REINDEX and VALIDATE CONSTRAINT must have been run before updating collation version information') return False _log.debug('Kelvin: refreshing database default collation version information') SQL = PG_SQL.SQL('ALTER DATABASE {} REFRESH COLLATION VERSION').format(PG_SQL.Identifier(conn.info.dbname)) try: run_rw_queries(link_obj = conn, queries = [{'sql': SQL}]) except Exception: _log.exception('failure to update default collation version information') return False return True
Update the recorded version of the database default collation.
Args
conn
- a psycopg2 connection for the database intended to be updated
use_the_source_luke
- do as you are told
def reindex_database(conn=None) ‑> str | bool
Expand source code
def reindex_database(conn=None) -> str | bool: """Reindex the database "conn" is connected to. Args: conn: a read-write connection in autocommit mode with sufficient PG level permissions for reindexing, say, "postgres" or the database owner Returns: False on error, magic cookie on success. """ assert conn, '<conn> must be given' dbname = conn.get_dsn_parameters()['dbname'] _log.debug('rebuilding all indices in database [%s]', dbname) SQL = PG_SQL.SQL('REINDEX (VERBOSE) DATABASE {}').format(PG_SQL.Identifier(dbname)) # REINDEX must be run outside transactions conn.commit() conn.set_session(readonly = False, autocommit = True) curs = conn.cursor() try: run_rw_queries(link_obj = curs, queries = [{'sql': SQL}], end_tx = True) return __MIND_MELD except Exception: _log.exception('reindexing failed') return False finally: curs.close() conn.commit() # should never get here return False
Reindex the database "conn" is connected to.
Args
conn
- a read-write connection in autocommit mode with sufficient PG level permissions for reindexing, say, "postgres" or the database owner
Returns
False on error, magic cookie on success.
def request_login_params(setup_pool: bool = False, force_tui: bool = False, user: str = None) ‑> tuple[LoginInfo, cPGCredentials]
Expand source code
def request_login_params ( setup_pool:bool=False, force_tui:bool=False, user:str=None ) -> tuple[gmLoginInfo.LoginInfo, gmConnectionPool.cPGCredentials]: """Request login parameters for database connection. Args: setup_pool: initialize connection pool force_tui: do not attempt to use wxPython as UI Returns: A tuple with login info. """ # are we inside X ? # if we aren't wxGTK would crash hard at the C-level with "can't open Display" if 'DISPLAY' in os.environ and not force_tui: try: # try wxPython GUI login, creds = __request_login_params_gui_wx() except Exception: _log.exception('cannot request creds via wxPython') if setup_pool: pool = gmConnectionPool.gmConnectionPool() pool.credentials = creds return login, creds # well, either we are on the console or # wxPython does not work, use text mode login, creds = __request_login_params_tui(user = user) if setup_pool: pool = gmConnectionPool.gmConnectionPool() pool.credentials = creds return login, creds
Request login parameters for database connection.
Args
setup_pool
- initialize connection pool
force_tui
- do not attempt to use wxPython as UI
Returns
A tuple with login info.
def revalidate_constraints(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None) ‑> str | bool
Expand source code
def revalidate_constraints(link_obj:_TLnkObj=None) -> str | bool: """Revalidate all database constraints. This needs a gm-dbo connection. Note that reindexing should have been run *before* this if fixing collations. Returns: Magic cookie on success. """ _log.debug('revalidating all constraints in database') SQL = 'SELECT gm.revalidate_all_constraints();' try: try: run_rw_queries(link_obj = link_obj, queries = [{'sql': SQL}]) except dbapi.errors.UndefinedFunction as exc: # type-x: ignore [attr-defined] # pylint: disable=no-member if 'gm.revalidate_all_constraints() does not exist' in exc.pgerror: _log.error('gm.revalidate_all_constraints() does not exist') return None except dbapi.errors.InvalidSchemaName as exc: # type-x: ignore [attr-defined] # pylint: disable=no-member if 'schema "gm" does not exist' in exc.pgerror: _log.error('schema "gm" does not exist, cannot run gm.revalidate_all_constraints()') return None raise except Exception: _log.exception('failure to revalidate constraints') return False return __LLAP
Revalidate all database constraints.
This needs a gm-dbo connection.
Note that reindexing should have been run before this if fixing collations.
Returns
Magic cookie on success.
def role_exists(role: str = None, link_obj=None) ‑> bool
Expand source code
def role_exists(role:str=None, link_obj=None) -> bool: SQL = 'SELECT EXISTS(SELECT 1 FROM pg_roles WHERE rolname = %(role)s)' args = {'role': role} rows = run_ro_query(link_obj = link_obj, sql = SQL, args = args) if rows[0][0]: _log.debug('role [%s] exists', role) return True _log.info("role [%s] does not exist" % role) return False
def row_is_locked(table=None, pk=None) ‑> bool
Expand source code
def row_is_locked(table=None, pk=None) -> bool: """Checks pg_locks for (ADVISORY only) locks on the row identified by table and pk. - does not take into account locks other than 'advisory', however """ cmd = """SELECT EXISTS ( SELECT 1 FROM pg_locks WHERE classid = '%s'::regclass::oid::int AND objid = %s AND locktype = 'advisory' )""" % (table, pk) rows = run_ro_queries(queries = [{'sql': cmd}]) if rows[0][0]: _log.debug('row is locked: [%s] [%s]', table, pk) return True _log.debug('row is NOT locked: [%s] [%s]', table, pk) return False
Checks pg_locks for (ADVISORY only) locks on the row identified by table and pk.
- does not take into account locks other than 'advisory', however
def run_collations_tool() ‑> int
Expand source code
def run_collations_tool() -> int: print('Fixing database collations version mismatches.') print('----------------------------------------------') if os.getuid() != 0: print('Not running as root. Aborting.') return -2 pg_demon_user_passwd_line = None try: pg_demon_user_passwd_line = pwd.getpwnam('postgres') except KeyError: try: pg_demon_user_passwd_line = pwd.getpwnam('pgsql') except KeyError: print('cannot identify postgres superuser account') return -2 _log.debug('PG demon user: %s', pg_demon_user_passwd_line) if os.getuid() != pg_demon_user_passwd_line[2]: os.setuid(pg_demon_user_passwd_line[2]) if os.getuid() != pg_demon_user_passwd_line[2]: print('Failed to become database superuser [%s]' % pg_demon_user_passwd_line[0]) return -2 request_login_params ( setup_pool = True, force_tui = True, user = pg_demon_user_passwd_line[0] ) conn = get_connection(readonly = False) default_collation_valid = sanity_check_database_default_collation_version(conn = conn) other_collations_valid = sanity_check_collation_versions(conn = conn) if default_collation_valid and other_collations_valid: print('All collations valid.') return 0 llap = [] llap.append(revalidate_constraints(link_obj = conn)) llap.append(reindex_database(conn = conn)) if not default_collation_valid: print('Refreshing database default collation version.') if not refresh_database_default_collation_version_information(conn = conn, use_the_source_luke = llap): print('Failed. Aborting.') conn.rollback() conn.close() return -2 if not other_collations_valid: print('Refreshing general collation versions.') if not refresh_collations_version_information(conn = conn, use_the_source_luke = llap): print('Failed. Aborting.') conn.rollback() conn.close() return -2 conn.commit() conn.close() print('All collation versions refreshed.') return 0
def run_fingerprint_tool() ‑> int
Expand source code
def run_fingerprint_tool() -> int: fname = 'db-fingerprint.txt' result = get_db_fingerprint(fname = fname, with_dump = True) if result == fname: print('Success: %s' % fname) return 0 print('Failed. Check the log for details.') return -2
def run_ro_queries(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
queries: list[dict] = None,
verbose: bool = False,
return_data: bool = True) ‑> list[psycopg2.extras.DictRow] | NoneExpand source code
def run_ro_queries ( link_obj:_TLnkObj=None, #queries:list[_TQueryWithArgs]=None, queries:list[dict]=None, verbose:bool=False, return_data:bool=True ) -> list[_TRow] | None: """Run read-only queries. Args: link_obj: a psycopg2 cursor or connection, can be used to continue transactions, or None queries: a list of dicts: [ {'sql': <SQL string with %(name)s placeholders>, 'args': <dict>}, {...}, ... ] return_data: attempt to fetch data produced by the last query and return that Returns: list of query results as psycopg2 rows """ assert queries is not None, '<queries> must not be None' assert isinstance(link_obj, (dbapi._psycopg.connection, dbapi._psycopg.cursor, type(None))), '<link_obj> must be None, a cursor, or a connection, but [%s] is of type (%s)' % (link_obj, type(link_obj)) if link_obj is None: conn = get_connection(readonly = True, verbose = verbose) curs = conn.cursor() curs_close = curs.close tx_rollback = conn.rollback readonly_rollback_just_in_case = conn.rollback else: curs_close = lambda *x: None tx_rollback = lambda *x: None readonly_rollback_just_in_case = lambda *x: None if isinstance(link_obj, dbapi._psycopg.cursor): curs = link_obj elif isinstance(link_obj, dbapi._psycopg.connection): curs = link_obj.cursor() curs_close = curs.close tx_rollback = link_obj.rollback if link_obj.autocommit is True: # readonly connection ? readonly_rollback_just_in_case = link_obj.rollback # do NOT rollback readonly queries on passed-in readwrite # connections just in case because they may have already # seen fully legitimate write action which would get lost if verbose: _log.debug('cursor: %s', curs) for query in queries: try: args = query['args'] except KeyError: args = None if isinstance(args, list): _log.debug('arguments-as-list depreciated:') _log.debug(query['sql']) try: SQL = query['sql'] except KeyError: SQL = query['cmd'] #_log.debug("depreciated: SQL keyed as ['cmd'] rather than ['sql']: %s", SQL) try: curs.execute(SQL, args) except PG_ERROR_EXCEPTION as pg_exc: _log.error('query failed in RO connection') gmConnectionPool.log_pg_exception_details(pg_exc) __safely_close_cursor_and_rollback_close_conn ( close_cursor = curs_close, rollback_tx = tx_rollback, # rollback so any ABORT state isn't preserved in pooled connections close_conn = False # do not close connection, RO connections are pooled ) __perhaps_reraise_as_permissions_error(pg_exc, curs) raise except Exception: _log.exception('error during query run in RO connection') gmConnectionPool.log_cursor_state(curs) __safely_close_cursor_and_rollback_close_conn ( close_cursor = curs_close, rollback_tx = tx_rollback, # rollback so any ABORT state isn't preserved in pooled connections close_conn = False # do not close connection, RO connections are pooled ) raise if verbose: gmConnectionPool.log_cursor_state(curs) if not return_data: __safely_close_cursor_and_rollback_close_conn ( close_cursor = curs_close, # rollback just-in-case so we can see data committed meanwhile if # the link object had been passed in and thusly might be part of # a long-running transaction -- but only if its a readonly framing # transaction, do not rollback framing readwrite connections rollback_tx = readonly_rollback_just_in_case, close_conn = False # do not close connection, RO connections are pooled ) return None data = curs.fetchall() if verbose: _log.debug('last query returned [%s (%s)] rows', curs.rowcount, len(data)) _log.debug('cursor description: %s', curs.description) __safely_close_cursor_and_rollback_close_conn ( close_cursor = curs_close, # rollback just-in-case so we can see data committed meanwhile if # the link object had been passed in and thusly might be part of # a long-running transaction -- but only if its a readonly framing # transaction, do not rollback framing readwrite connections rollback_tx = readonly_rollback_just_in_case, close_conn = False # do not close connection, RO connections are pooled ) return data
Run read-only queries.
Args
link_obj
- a psycopg2 cursor or connection, can be used to continue transactions, or None
queries
- a list of dicts:
[
{'sql':
, 'args': }, {…}, … ] return_data
- attempt to fetch data produced by the last query and return that
Returns
list of query results as psycopg2 rows
def run_ro_query(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
sql: str | psycopg2.sql.Composed = None,
args: dict = None,
verbose: bool = False,
return_data: bool = True) ‑> list[psycopg2.extras.DictRow] | NoneExpand source code
def run_ro_query(link_obj:_TLnkObj=None, sql:_TSQL=None, args:dict=None, verbose:bool=False, return_data:bool=True) -> list[_TRow] | None: """Run one ready-only query via run_ro_queries().""" return run_ro_queries ( link_obj = link_obj, queries = [{'sql': sql, 'args': args}], verbose = verbose, return_data = return_data )
Run one ready-only query via run_ro_queries().
def run_rw_queries(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
queries: list[dict] = None,
end_tx: bool = False,
return_data: bool = None,
verbose: bool = False) ‑> list[psycopg2.extras.DictRow] | NoneExpand source code
def run_rw_queries ( link_obj:_TLnkObj=None, #queries:_TQueries=None, queries:list[dict]=None, end_tx:bool=False, return_data:bool=None, verbose:bool=False ) -> list[_TRow] | None: """Convenience function for running read-write queries. Typically (part of) a transaction. Args: link_obj: None, cursor, connection queries: * a list of dicts [{'sql': <SQL string>, 'args': <dict>) * to be executed as a single transaction * the last query may usefully return rows, such as: SELECT currval('some_sequence'); or INSERT/UPDATE ... RETURNING some_value; end_tx: * controls whether the transaction is finalized (eg. COMMITted/ROLLed BACK) or not, this allows the call to run_rw_queries() to be part of a framing transaction * if link_obj is a *connection* then "end_tx" will default to False unless it is explicitly set to True which is taken to mean "yes, you do have full control over the transaction" in which case the transaction is properly finalized * if link_obj is a *cursor* we CANNOT finalize the transaction because we would need the connection for that * if link_obj is *None* "end_tx" will, of course, always be True, because we always have full control over the connection, not ending the transaction would be pointless return_data: * if true, the returned data will include the rows the last query selected * if false, it returns None instead Returns: * None if last query did not return rows * "fetchall() result" if last query returned any rows and "return_data" was True """ assert queries is not None, '<queries> must not be None' assert isinstance(link_obj, (dbapi._psycopg.connection, dbapi._psycopg.cursor, type(None))), '<link_obj> must be None, a cursor, or a connection, but [%s] is of type (%s)' % (link_obj, type(link_obj)) if link_obj is None: conn = get_connection(readonly = False) curs = conn.cursor() conn_close = conn.close tx_commit = conn.commit tx_rollback = conn.rollback curs_close = curs.close notices_accessor = conn else: conn_close = lambda *x: None tx_commit = lambda *x: None tx_rollback = lambda *x: None curs_close = lambda *x: None if isinstance(link_obj, dbapi._psycopg.cursor): curs = link_obj notices_accessor = curs.connection elif isinstance(link_obj, dbapi._psycopg.connection): curs = link_obj.cursor() curs_close = curs.close notices_accessor = link_obj if end_tx: tx_commit = link_obj.commit tx_rollback = link_obj.rollback for query in queries: try: args = query['args'] except KeyError: args = None try: SQL = query['sql'] except KeyError: SQL = query['cmd'] #_log.debug("depreciated: SQL keyed as ['cmd'] rather than ['sql']: %s", SQL) try: curs.execute(SQL, args) except dbapi.Error as pg_exc: # DB related exceptions _log.error('query failed in RW connection') gmConnectionPool.log_pg_exception_details(pg_exc) __safely_close_cursor_and_rollback_close_conn ( curs_close, tx_rollback, conn_close ) __perhaps_reraise_as_permissions_error(pg_exc, curs) gmLog2.log_stack_trace() raise except Exception: # other exceptions _log.exception('error running query in RW connection') gmConnectionPool.log_cursor_state(curs) gmLog2.log_stack_trace() __safely_close_cursor_and_rollback_close_conn ( curs_close, tx_rollback, conn_close ) raise if verbose: gmConnectionPool.log_cursor_state(curs) __log_notices(notices_accessor) if not return_data: curs_close() tx_commit() conn_close() #return (None, None) return None data = None try: data = curs.fetchall() except Exception: _log.exception('error fetching data from RW query') gmLog2.log_stack_trace() __safely_close_cursor_and_rollback_close_conn ( curs_close, tx_rollback, conn_close ) raise curs_close() tx_commit() conn_close() return data
Convenience function for running read-write queries.
Typically (part of) a transaction.
Args
link_obj
- None, cursor, connection
queries:
- a list of dicts [{'sql':
, 'args': ) - to be executed as a single transaction
- the last query may usefully return rows, such as:
SELECT currval('some_sequence'); or INSERT/UPDATE ... RETURNING some_value;
end_tx:
- controls whether the transaction is finalized (eg. COMMITted/ROLLed BACK) or not, this allows the call to run_rw_queries() to be part of a framing transaction
- if link_obj is a connection then "end_tx" will default to False unless it is explicitly set to True which is taken to mean "yes, you do have full control over the transaction" in which case the transaction is properly finalized
- if link_obj is a cursor we CANNOT finalize the transaction because we would need the connection for that
- if link_obj is None "end_tx" will, of course, always be True, because we always have full control over the connection, not ending the transaction would be pointless
return_data:
- if true, the returned data will include the rows the last query selected
- if false, it returns None instead
Returns
- None if last query did not return rows
- "fetchall() result" if last query returned any rows and "return_data" was True
def run_rw_query(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
sql: str | psycopg2.sql.Composed = None,
args: dict = None,
end_tx: bool = False,
return_data: bool = None,
verbose: bool = False) ‑> list[psycopg2.extras.DictRow] | NoneExpand source code
def run_rw_query ( link_obj:_TLnkObj=None, sql:_TSQL=None, args:dict=None, end_tx:bool=False, return_data:bool=None, verbose:bool=False ) -> list[_TRow] | None: return run_rw_queries ( link_obj = link_obj, queries = [{'sql': sql, 'args': args}], end_tx = end_tx, return_data = return_data, verbose = verbose )
def run_sql_script(sql_script, conn=None)
Expand source code
def run_sql_script(sql_script, conn=None): if conn is None: conn = get_connection(readonly = False) from Gnumed.pycommon import gmPsql psql = gmPsql.Psql(conn) if psql.run(sql_script) == 0: query = { 'sql': 'select gm.log_script_insertion(%(name)s, %(ver)s)', 'args': {'name': sql_script, 'ver': 'current'} } run_rw_queries(link_obj = conn, queries = [query]) conn.commit() return True _log.error('error running sql script: %s', sql_script) return False
def sanitize_pg_regex(expression=None, escape_all=False)
Expand source code
def sanitize_pg_regex(expression=None, escape_all=False): """Escape input for use in a PostgreSQL regular expression. If a fragment comes from user input and is to be used as a regular expression we need to make sure it doesn't contain invalid regex patterns such as unbalanced ('s. <escape_all> True: try to escape *all* metacharacters False: only escape those which are known to render the regex invalid """ return expression.replace ( '(', '\(' ).replace ( ')', '\)' ).replace ( '[', '\[' ).replace ( '+', '\+' ).replace ( '.', '\.' ).replace ( '*', '\*' ).replace ( '?', '\?' ) #']', '\]', # not needed
Escape input for use in a PostgreSQL regular expression.
If a fragment comes from user input and is to be used as a regular expression we need to make sure it doesn't contain invalid regex patterns such as unbalanced ('s.
True: try to escape all metacharacters False: only escape those which are known to render the regex invalid def sanity_check_collation_versions(conn=None) ‑> bool
Expand source code
def sanity_check_collation_versions(conn=None) -> bool: """Check whether the version of collation has changed. Args: conn: a psycopg2 connection, in which connection's database the collations are to be checked Returns: If this returns False you need to run REINDEX (VERBOSE) DATABASE the_database; VALIDATE CONSTRAINTS; ALTER COLLATION collation_name REFRESH VERSION; for each of the collations with mismatching versions from pg_collation. """ SQL = """ SELECT *, pg_catalog.pg_collation_actual_version(oid), pg_catalog.pg_encoding_to_char(collencoding), pg_catalog.current_database() FROM pg_collation WHERE collversion IS DISTINCT FROM NULL AND collprovider <> 'd' AND collversion <> pg_catalog.pg_collation_actual_version(oid) AND -- must ignore collations not intended for the database encoding collencoding = (SELECT encoding FROM pg_database WHERE datname = pg_catalog.current_database()) """ try: rows = run_ro_queries(link_obj = conn, queries = [{'sql': SQL}]) except dbapi.errors.UndefinedFunction as pg_exc: # type-x: ignore [attr-defined] # pylint: disable=no-member _log.exception('cannot verify collation versions, likely PG < 15') gmConnectionPool.log_pg_exception_details(pg_exc) return True if not rows: _log.debug('no version changes in pg_collation entries') return True _log.error('version mismatches in pg_collation') _log.debug('you need to run REINDEX DATABASE/VALIDATE CONSTRAINTS etc and ALTER COLLATION collation_name REFRESH VERSION') for coll in rows: _log.error(coll) return False
Check whether the version of collation has changed.
Args
conn
- a psycopg2 connection, in which connection's database the collations are to be checked
Returns
If this returns False you need to run
REINDEX (VERBOSE) DATABASE the_database; VALIDATE CONSTRAINTS; ALTER COLLATION collation_name REFRESH VERSION;
for each of the collations with mismatching versions from pg_collation.
def sanity_check_database_default_collation_version(conn=None) ‑> bool
Expand source code
def sanity_check_database_default_collation_version(conn=None) -> bool: """Check whether the database default collation version has changed. Args: conn: a psycopg2 connection, for which connection's database the collation is to be checked Returns: If this returns False you need to run REINDEX (VERBOSE) DATABASE the_database; VALIDATE CONSTRAINTS; ALTER DATABASE the_database REFRESH COLLATION VERSION; inside the affected database. """ SQL = 'SELECT *, pg_database_collation_actual_version(oid) FROM pg_database WHERE datname = current_database()' try: rows = run_ro_queries(link_obj = conn, queries = [{'sql': SQL}]) except dbapi.errors.UndefinedFunction as pg_exc: # type-x: ignore [attr-defined] # pylint: disable=no-member _log.exception('cannot verify collation version, likely PG < 15') gmConnectionPool.log_pg_exception_details(pg_exc) return True db = rows[0] if db['datcollversion'] == db['pg_database_collation_actual_version']: _log.debug('no version change in database default collation:') _log.debug(db) return True _log.error('database default collation version mismatch') _log.error('collation: %s', db['datcollate']) _log.error('provider: %s', db['datlocprovider']) if db['daticulocale']: _log.error('ICU locale: %s', db['daticulocale']) _log.error('version (DB): %s', db['datcollversion']) _log.error('version (OS): %s', db['pg_database_collation_actual_version']) _log.debug('you need to run REINDEX DATABASE/VALIDATE CONSTRAINTS etc and ALTER DATABASE db_name REFRESH COLLATION VERSION') return False
Check whether the database default collation version has changed.
Args
conn
- a psycopg2 connection, for which connection's database the collation is to be checked
Returns
If this returns False you need to run
REINDEX (VERBOSE) DATABASE the_database; VALIDATE CONSTRAINTS; ALTER DATABASE the_database REFRESH COLLATION VERSION;
inside the affected database.
def sanity_check_database_settings(hipaa: bool = True) ‑> tuple
Expand source code
def sanity_check_database_settings(hipaa:bool=True) -> tuple: """Check database settings for sanity. Args: hipaa: how to check HIPAA relevant settings, as fatal or warning Returns: (status, message) status * 0: no problem * 1: non-fatal problem * 2: fatal problem """ _log.debug('checking database settings') conn = get_connection() # - version string global postgresql_version_string if postgresql_version_string is None: curs = conn.cursor() curs.execute('SELECT version()') postgresql_version_string = curs.fetchone()['version'] curs.close() _log.info('PostgreSQL version (string): "%s"' % postgresql_version_string) # - postgresql settings options2check:dict[str, list] = { # setting: [expected value, risk, fatal?] 'allow_system_table_mods': [['off'], 'system breakage', False], 'check_function_bodies': [['on'], 'suboptimal error detection', False], 'datestyle': [['ISO'], 'faulty timestamp parsing', True], 'default_transaction_isolation': [['read committed'], 'faulty database reads', True], 'default_transaction_read_only': [['on'], 'accidental database writes', False], 'fsync': [['on'], 'data loss/corruption', True], 'full_page_writes': [['on'], 'data loss/corruption', False], 'lc_messages': [['C'], 'suboptimal error detection', False], 'password_encryption': [['on', 'md5', 'scram-sha-256'], 'breach of confidentiality', False], #'regex_flavor': [[u'advanced'], u'query breakage', False], # 9.0 doesn't support this anymore, and default now "advanced" anyway 'synchronous_commit': [['on'], 'data loss/corruption', False], 'sql_inheritance': [['on'], 'query breakage, data loss/corruption', True], # IF returned (<PG10): better be ON, if NOT returned (PG10): hardwired 'ignore_checksum_failure': [['off'], 'data loss/corruption', False], # starting with PG 9.3 'track_commit_timestamp': [['on'], 'suboptimal auditing', False], # starting with PG 9.3 } if hipaa: options2check['log_connections'] = [['on'], 'non-compliance with HIPAA', True] options2check['log_disconnections'] = [['on'], 'non-compliance with HIPAA', True] else: options2check['log_connections'] = [['on'], 'non-compliance with HIPAA', None] options2check['log_disconnections'] = [['on'], 'non-compliance with HIPAA', None] cmd = 'SELECT name, setting FROM pg_settings WHERE name = ANY(%(settings)s)' rows = run_ro_queries ( link_obj = conn, queries = [{'sql': cmd, 'args': {'settings': list(options2check)}}] ) found_error = False found_problem = False msg = [] for row in rows: option = row['name'] value_found = row['setting'] values_expected = options2check[option][0] risk = options2check[option][1] fatal_setting = options2check[option][2] if not value_found in values_expected: if fatal_setting is True: found_error = True elif fatal_setting is False: found_problem = True elif fatal_setting is None: pass else: _log.error(options2check[option]) raise ValueError('invalid database configuration sanity check') msg.append(_(' option [%s]: %s') % (option, value_found)) msg.append(_(' risk: %s') % risk) _log.warning('PG option [%s] set to [%s], expected %s, risk: <%s>' % (option, value_found, values_expected, risk)) # - collations if not sanity_check_database_default_collation_version(conn = conn): found_problem = True msg.append(_(' collation version mismatch between database and operating system')) msg.append(_(' risk: data corruption (duplicate entries, faulty sorting)')) if not sanity_check_collation_versions(conn = conn): found_problem = True msg.append(_(' collations with version mismatch')) msg.append(_(' risk: data corruption (duplicate entries, faulty sorting)')) # - database encoding curs = conn.cursor() try: curs.execute('SELECT pg_encoding_to_char(encoding) FROM pg_database WHERE datname = current_database()') encoding = curs.fetchone()['pg_encoding_to_char'] if encoding != 'UTF8': found_problem = True msg.append(_(' database encoding not UTF8 but rather: %s') % encoding) msg.append(_(' risk: multilingual data storage problems')) _log.warning('PG database encoding not UTF8 but [%s]', encoding) except dbapi.Error: _log.exception('cannot verify database encoding (probably PG < 15)') finally: curs.close() # preloaded libraries # SQL = "SELECT name, setting from pg_settings where name = 'shared_preload_libraries';" # rows = run_ro_queries (link_obj = conn, queries = [{'sql': SQL, 'args': None}]) # if rows: # value_found = rows[0]['setting'] # else: # value_found = [] # if 'auto_explain' not in value_found: # msg.append(_(' option [shared_preload_libraries]: %s') % value_found) # msg.append(_(' risk: suboptimal debugging')) # _log.warning('PG option [shared_preload_libraries] set to: %s, expected to include "auto_explain", risk: <suboptimal debugging>', value_found) # found_problem = True if found_error: return 2, '\n'.join(msg) if found_problem: return 1, '\n'.join(msg) return 0, ''
Check database settings for sanity.
Args
hipaa
- how to check HIPAA relevant settings, as fatal or warning
Returns
(status, message)
status
- 0: no problem
- 1: non-fatal problem
- 2: fatal problem
def sanity_check_time_skew(tolerance: int = 60) ‑> bool
Expand source code
def sanity_check_time_skew(tolerance:int=60) -> bool: """Check server time and local time to be within the given tolerance of each other. Args: tolerance: seconds """ _log.debug('maximum skew tolerance (seconds): %s', tolerance) cmd = "SELECT now() at time zone 'UTC'" conn = get_raw_connection(readonly = True) curs = conn.cursor() start = time.time() rows = run_ro_queries(link_obj = curs, queries = [{'sql': cmd}]) end = time.time() client_now_as_utc = pydt.datetime.utcnow() curs.close() conn.commit() server_now_as_utc = rows[0][0] query_duration = end - start _log.info('server "now" (UTC): %s', server_now_as_utc) _log.info('client "now" (UTC): %s', client_now_as_utc) _log.debug('wire roundtrip (seconds): %s', query_duration) if query_duration > tolerance: _log.error('useless to check client/server time skew, wire roundtrip > tolerance') return False if server_now_as_utc > client_now_as_utc: current_skew = server_now_as_utc - client_now_as_utc else: current_skew = client_now_as_utc - server_now_as_utc _log.debug('client/server time skew: %s', current_skew) if current_skew > pydt.timedelta(seconds = tolerance): _log.error('client/server time skew > tolerance') return False return True
Check server time and local time to be within the given tolerance of each other.
Args
tolerance
- seconds
def schema_exists(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
schema='gm') ‑> boolExpand source code
def schema_exists(link_obj:_TLnkObj=None, schema='gm') -> bool: cmd = "SELECT EXISTS (SELECT 1 FROM pg_namespace WHERE nspname = %(schema)s)" args = {'schema': schema} rows = run_ro_queries(link_obj = link_obj, queries = [{'sql': cmd, 'args': args}]) return rows[0][0]
def send_maintenance_notification()
Expand source code
def send_maintenance_notification(): cmd = 'NOTIFY "db_maintenance_warning"' run_rw_queries(queries = [{'sql': cmd}], return_data = False)
def send_maintenance_shutdown()
Expand source code
def send_maintenance_shutdown(): cmd = 'NOTIFY "db_maintenance_disconnect"' run_rw_queries(queries = [{'sql': cmd}], return_data = False)
def set_user_language(user=None, language=None)
Expand source code
def set_user_language(user=None, language=None): """Set the user language in the database. user = None: current db user language = None: unset """ _log.info('setting database language for user [%s] to [%s]', user, language) args = {'usr': user, 'lang': language} if language is None: if user is None: queries = [{'sql': 'select i18n.unset_curr_lang()'}] else: queries = [{'sql': 'select i18n.unset_curr_lang(%(usr)s)', 'args': args}] queries.append({'sql': 'select True'}) else: if user is None: queries = [{'sql': 'select i18n.set_curr_lang(%(lang)s)', 'args': args}] else: queries = [{'sql': 'select i18n.set_curr_lang(%(lang)s, %(usr)s)', 'args': args}] rows = run_rw_queries(queries = queries, return_data = True) if not rows[0][0]: _log.error('cannot set database language to [%s] for user [%s]', language, user) return rows[0][0]
Set the user language in the database.
user = None: current db user language = None: unset
def shutdown()
Expand source code
def shutdown(): gmConnectionPool.gmConnectionPool().shutdown()
def table_exists(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
schema: str = None,
table: str = None) ‑> boolExpand source code
def table_exists(link_obj:_TLnkObj=None, schema:str=None, table:str=None) -> bool: """Returns false, true.""" cmd = """ select exists ( select 1 from information_schema.tables where table_schema = %(ns)s and table_name = %(tbl)s and table_type = 'BASE TABLE' )""" rows = run_ro_queries(link_obj = link_obj, queries = [{'sql': cmd, 'args': {'ns': schema, 'tbl': table}}]) return rows[0][0]
Returns false, true.
def unlock_row(link_obj: psycopg2.extras.DictConnection | psycopg2.extras.DictCursor | None = None,
table: str = None,
pk: int = None,
exclusive: bool = False) ‑> boolExpand source code
def unlock_row(link_obj:_TLnkObj=None, table:str=None, pk:int=None, exclusive:bool=False) -> bool: """Uses pg_advisory_unlock(_shared). - each lock needs one unlock """ _log.debug('trying to unlock row: [%s] [%s] (exclusive: %s)', table, pk, exclusive) if exclusive: cmd = "SELECT pg_advisory_unlock('%s'::regclass::oid::int, %s)" % (table, pk) else: cmd = "SELECT pg_advisory_unlock_shared('%s'::regclass::oid::int, %s)" % (table, pk) rows = run_ro_queries(link_obj = link_obj, queries = [{'sql': cmd}]) if rows[0][0]: return True _log.warning('cannot unlock row: [%s] [%s] (exclusive: %s)', table, pk, exclusive) return False
Uses pg_advisory_unlock(_shared).
- each lock needs one unlock
def update_translation_in_database(language=None, original=None, translation=None, link_obj=None)
Expand source code
def update_translation_in_database(language=None, original=None, translation=None, link_obj=None): if language is None: cmd = 'SELECT i18n.upd_tx(%(orig)s, %(trans)s)' else: cmd = 'SELECT i18n.upd_tx(%(lang)s, %(orig)s, %(trans)s)' args = {'lang': language, 'orig': original, 'trans': translation} run_rw_queries(queries = [{'sql': cmd, 'args': args}], return_data = False, link_obj = link_obj) return args
def user_role_exists(user_role: str = None, link_obj=None) ‑> bool
Expand source code
def user_role_exists(user_role:str=None, link_obj=None) -> bool: return role_exists(role = user_role, link_obj = link_obj)