Module Gnumed.business.gmIncomingData
Handling of
Functions
def create_incoming_data(data_type: str = None, filename: str = None, verify_import: bool = False) ‑> cIncomingData
-
Expand source code
def create_incoming_data(data_type:str=None, filename:str=None, verify_import:bool=False) -> cIncomingData: conn = gmPG2.get_connection(readonly = False) args = {'typ': data_type} SQL = """ INSERT INTO clin.incoming_data (type, data) VALUES (%(typ)s, 'new data'::bytea) RETURNING pk""" rows = gmPG2.run_rw_query ( link_obj = conn, end_tx = False, sql = SQL, args = args, return_data = True ) pk = rows[0]['pk'] incoming = cIncomingData(aPK_obj = pk, link_obj = conn) if incoming.update_data_from_file(fname = filename, link_obj = conn, verify_import = verify_import): conn.commit() return incoming conn.rollback() _log.debug('cannot update incoming_data stub from file, rolled back') return None
def data_exists(filename: str) ‑> bool
-
Expand source code
def data_exists(filename:str) -> bool: """Check by md5 hash whether data in filename already in database.""" local_md5 = gmTools.file2md5(filename = filename) SQL = 'SELECT EXISTS(SELECT 1 FROM clin.incoming_data WHERE md5(data) = %(local_md5)s) AS data_exists' args = {'local_md5': local_md5} rows = gmPG2.run_ro_query(sql = SQL, args = args) return rows[0]['data_exists']
Check by md5 hash whether data in filename already in database.
def delete_incoming_data(pk_incoming_data=None)
-
Expand source code
def delete_incoming_data(pk_incoming_data=None): SQL = "DELETE FROM clin.incoming_data WHERE pk = %(pk)s" args = {'pk': pk_incoming_data} gmPG2.run_rw_query(sql = SQL, args = args) return True
def get_incoming_data(order_by=None, return_pks=False)
-
Expand source code
def get_incoming_data(order_by=None, return_pks=False): if order_by is None: order_by = 'true' else: order_by = 'true ORDER BY %s' % order_by SQL = _SQL_get_incoming_data % order_by rows = gmPG2.run_ro_query(sql = SQL) if return_pks: return [ r['pk_incoming_data'] for r in rows ] return [ cIncomingData(row = {'data': r, 'pk_field': 'pk_incoming_data'}) for r in rows ]
Classes
class cIncomingData (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Expand source code
class cIncomingData(gmBusinessDBObject.cBusinessDBObject): """Represents items of incoming data, say, HL7 snippets.""" _cmd_fetch_payload = _SQL_get_incoming_data % "pk_incoming_data = %s" _cmds_store_payload = [ """UPDATE clin.incoming_data SET fk_patient_candidates = %(pk_patient_candidates)s, fk_identity = %(pk_identity)s, fk_provider_disambiguated = %(pk_provider_disambiguated)s, request_id = gm.nullify_empty_string(%(request_id)s), firstnames = gm.nullify_empty_string(%(firstnames)s), lastnames = gm.nullify_empty_string(%(lastnames)s), dob = %(dob)s, postcode = gm.nullify_empty_string(%(postcode)s), other_info = gm.nullify_empty_string(%(other_info)s), type = gm.nullify_empty_string(%(data_type)s), gender = gm.nullify_empty_string(%(gender)s), requestor = gm.nullify_empty_string(%(requestor)s), external_data_id = gm.nullify_empty_string(%(external_data_id)s), comment = gm.nullify_empty_string(%(comment)s) WHERE pk = %(pk_incoming_data)s AND xmin = %(xmin_incoming_data)s RETURNING xmin as xmin_incoming_data, octet_length(data) as data_size """ ] # view columns that can be updated: _updatable_fields = [ 'pk_patient_candidates', 'request_id', # request ID as found in <data> 'firstnames', 'lastnames', 'dob', 'postcode', 'other_info', # other identifying info in .data 'data_type', 'gender', 'requestor', # Requestor of data (e.g. who ordered test results) if available in source data. 'external_data_id', # ID of content of .data in external system (e.g. importer) where appropriate 'comment', # a free text comment on this row, eg. why is it here, error logs etc 'pk_identity', 'pk_provider_disambiguated' # The provider the data is relevant to. ] #-------------------------------------------------------- def format(self): return '%s' % self #-------------------------------------------------------- def _format_patient_identification(self): tmp = '%s %s %s' % ( gmTools.coalesce(self._payload['lastnames'], '', 'last=%s'), gmTools.coalesce(self._payload['firstnames'], '', 'first=%s'), gmTools.coalesce(self._payload['gender'], '', 'gender=%s') ) if self._payload['dob']: tmp += ' dob=%s' % self._payload['dob'].strftime('%Y %b %d') return tmp patient_identification = property(_format_patient_identification) #-------------------------------------------------------- def update_data_from_file(self, fname=None, link_obj=None, verify_import:bool=False): # sanity check if not (os.access(fname, os.R_OK) and os.path.isfile(fname)): _log.error('[%s] is not a readable file' % fname) return False _log.debug('updating [pk=%s] from [%s]', self.pk_obj, fname) gmPG2.file2bytea ( query = "UPDATE clin.incoming_data SET data = %(data)s::bytea WHERE pk = %(pk)s", filename = fname, args = {'pk': self.pk_obj}, conn = link_obj ) # must update XMIN now ... self.refetch_payload(link_obj = link_obj) if not verify_import: return True SQL = 'SELECT (md5(data) = %(local_md5)s) AS verified FROM clin.incoming_data WHERE pk = %(pk)s' args = {'pk': self.pk_obj, 'local_md5': gmTools.file2md5(filename = fname)} rows = gmPG2.run_ro_query(sql = SQL, args = args) return rows[0]['verified'] #-------------------------------------------------------- def save_to_file(self, aChunkSize=0, filename=None): if self._payload['data_size'] == 0: return None if self._payload['data_size'] is None: return None if filename is None: filename = gmTools.get_unique_filename(prefix = 'gm-incoming_data-') success = gmPG2.bytea2file ( data_query = { 'sql': 'SELECT substring(data from %(start)s for %(size)s) FROM clin.incoming_data WHERE pk = %(pk)s', 'args': {'pk': self.pk_obj} }, filename = filename, chunk_size = aChunkSize, data_size = self._payload['data_size'] ) if not success: return None return filename #-------------------------------------------------------- def lock(self, exclusive=False): return gmPG2.lock_row(table = 'clin.incoming_data', pk = self.pk_obj, exclusive = exclusive) #-------------------------------------------------------- def unlock(self, exclusive=False): return gmPG2.unlock_row(table = 'clin.incoming_data', pk = self.pk_obj, exclusive = exclusive) #-------------------------------------------------------- def set_patient(self, patient): if patient is None: pk_pat = None elif isinstance(patient, int): pk_pat = patient else: pk_pat = patient['pk_identity'] if self['pk_identity'] == pk_pat: return self['pk_identity'] = pk_pat patient = property(fset = set_patient)
Represents items of incoming data, say, HL7 snippets.
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Ancestors
Instance variables
prop patient
-
Expand source code
def set_patient(self, patient): if patient is None: pk_pat = None elif isinstance(patient, int): pk_pat = patient else: pk_pat = patient['pk_identity'] if self['pk_identity'] == pk_pat: return self['pk_identity'] = pk_pat
prop patient_identification
-
Expand source code
def _format_patient_identification(self): tmp = '%s %s %s' % ( gmTools.coalesce(self._payload['lastnames'], '', 'last=%s'), gmTools.coalesce(self._payload['firstnames'], '', 'first=%s'), gmTools.coalesce(self._payload['gender'], '', 'gender=%s') ) if self._payload['dob']: tmp += ' dob=%s' % self._payload['dob'].strftime('%Y %b %d') return tmp
Methods
def lock(self, exclusive=False)
-
Expand source code
def lock(self, exclusive=False): return gmPG2.lock_row(table = 'clin.incoming_data', pk = self.pk_obj, exclusive = exclusive)
def save_to_file(self, aChunkSize=0, filename=None)
-
Expand source code
def save_to_file(self, aChunkSize=0, filename=None): if self._payload['data_size'] == 0: return None if self._payload['data_size'] is None: return None if filename is None: filename = gmTools.get_unique_filename(prefix = 'gm-incoming_data-') success = gmPG2.bytea2file ( data_query = { 'sql': 'SELECT substring(data from %(start)s for %(size)s) FROM clin.incoming_data WHERE pk = %(pk)s', 'args': {'pk': self.pk_obj} }, filename = filename, chunk_size = aChunkSize, data_size = self._payload['data_size'] ) if not success: return None return filename
def set_patient(self, patient)
-
Expand source code
def set_patient(self, patient): if patient is None: pk_pat = None elif isinstance(patient, int): pk_pat = patient else: pk_pat = patient['pk_identity'] if self['pk_identity'] == pk_pat: return self['pk_identity'] = pk_pat
def unlock(self, exclusive=False)
-
Expand source code
def unlock(self, exclusive=False): return gmPG2.unlock_row(table = 'clin.incoming_data', pk = self.pk_obj, exclusive = exclusive)
def update_data_from_file(self, fname=None, link_obj=None, verify_import: bool = False)
-
Expand source code
def update_data_from_file(self, fname=None, link_obj=None, verify_import:bool=False): # sanity check if not (os.access(fname, os.R_OK) and os.path.isfile(fname)): _log.error('[%s] is not a readable file' % fname) return False _log.debug('updating [pk=%s] from [%s]', self.pk_obj, fname) gmPG2.file2bytea ( query = "UPDATE clin.incoming_data SET data = %(data)s::bytea WHERE pk = %(pk)s", filename = fname, args = {'pk': self.pk_obj}, conn = link_obj ) # must update XMIN now ... self.refetch_payload(link_obj = link_obj) if not verify_import: return True SQL = 'SELECT (md5(data) = %(local_md5)s) AS verified FROM clin.incoming_data WHERE pk = %(pk)s' args = {'pk': self.pk_obj, 'local_md5': gmTools.file2md5(filename = fname)} rows = gmPG2.run_ro_query(sql = SQL, args = args) return rows[0]['verified']
Inherited members