Module Gnumed.business.gmIncomingData

Handling of area.

Expand source code
# -*- coding: utf-8 -*-
"""Handling of <INCOMING> area."""
#============================================================
__author__ = "K.Hilbert <Karsten.Hilbert@gmx.net>"
__license__ = "GPL v2 or later"


import sys
import os
import logging


if __name__ == '__main__':
        sys.path.insert(0, '../../')

from Gnumed.pycommon import gmI18N
if __name__ == '__main__':
        gmI18N.activate_locale()
        gmI18N.install_domain()
from Gnumed.pycommon import gmTools
from Gnumed.pycommon import gmBusinessDBObject
from Gnumed.pycommon import gmPG2
from Gnumed.pycommon import gmDateTime


_log = logging.getLogger('gm.import')

#============================================================
# class to handle unmatched incoming clinical data
#------------------------------------------------------------
_SQL_get_incoming_data = """SELECT * FROM clin.v_incoming_data WHERE %s"""

class cIncomingData(gmBusinessDBObject.cBusinessDBObject):
        """Represents items of incoming data, say, HL7 snippets."""

        _cmd_fetch_payload = _SQL_get_incoming_data % "pk_incoming_data = %s"
        _cmds_store_payload = [
                """UPDATE clin.incoming_data SET
                                fk_patient_candidates = %(pk_patient_candidates)s,
                                fk_identity = %(pk_identity)s,
                                fk_provider_disambiguated = %(pk_provider_disambiguated)s,
                                request_id = gm.nullify_empty_string(%(request_id)s),
                                firstnames = gm.nullify_empty_string(%(firstnames)s),
                                lastnames = gm.nullify_empty_string(%(lastnames)s),
                                dob = %(dob)s,
                                postcode = gm.nullify_empty_string(%(postcode)s),
                                other_info = gm.nullify_empty_string(%(other_info)s),
                                type = gm.nullify_empty_string(%(data_type)s),
                                gender = gm.nullify_empty_string(%(gender)s),
                                requestor = gm.nullify_empty_string(%(requestor)s),
                                external_data_id = gm.nullify_empty_string(%(external_data_id)s),
                                comment = gm.nullify_empty_string(%(comment)s)
                        WHERE
                                pk = %(pk_incoming_data)s
                                        AND
                                xmin = %(xmin_incoming_data)s
                        RETURNING
                                xmin as xmin_incoming_data,
                                octet_length(data) as data_size
                """
        ]
        # view columns that can be updated:
        _updatable_fields = [
                'pk_patient_candidates',
                'request_id',                                           # request ID as found in <data>
                'firstnames',
                'lastnames',
                'dob',
                'postcode',
                'other_info',                                           # other identifying info in .data
                'data_type',
                'gender',
                'requestor',                                            # Requestor of data (e.g. who ordered test results) if available in source data.
                'external_data_id',                             # ID of content of .data in external system (e.g. importer) where appropriate
                'comment',                                                      # a free text comment on this row, eg. why is it here, error logs etc
                'pk_identity',
                'pk_provider_disambiguated'             # The provider the data is relevant to.
        ]
        #--------------------------------------------------------
        def format(self):
                return '%s' % self
        #--------------------------------------------------------
        def _format_patient_identification(self):
                tmp = '%s %s %s' % (
                        gmTools.coalesce(self._payload[self._idx['lastnames']], '', 'last=%s'),
                        gmTools.coalesce(self._payload[self._idx['firstnames']], '', 'first=%s'),
                        gmTools.coalesce(self._payload[self._idx['gender']], '', 'gender=%s')
                )
                if self._payload[self._idx['dob']] is not None:
                        tmp += ' dob=%s' % gmDateTime.pydt_strftime(self._payload[self._idx['dob']], '%Y %b %d')
                return tmp

        patient_identification = property(_format_patient_identification)

        #--------------------------------------------------------
        def update_data_from_file(self, fname=None, link_obj=None, verify_import:bool=False):
                # sanity check
                if not (os.access(fname, os.R_OK) and os.path.isfile(fname)):
                        _log.error('[%s] is not a readable file' % fname)
                        return False

                _log.debug('updating [pk=%s] from [%s]', self.pk_obj, fname)
                gmPG2.file2bytea (
                        query = "UPDATE clin.incoming_data SET data = %(data)s::bytea WHERE pk = %(pk)s",
                        filename = fname,
                        args = {'pk': self.pk_obj},
                        conn = link_obj
                )
                # must update XMIN now ...
                self.refetch_payload(link_obj = link_obj)
                if not verify_import:
                        return True

                SQL = 'SELECT (md5(data) = %(local_md5)s) AS verified FROM clin.incoming_data WHERE pk = %(pk)s'
                args = {'pk': self.pk_obj, 'local_md5': gmTools.file2md5(filename = fname)}
                rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': SQL, 'args': args}])
                return rows[0]['verified']

        #--------------------------------------------------------
        def save_to_file(self, aChunkSize=0, filename=None):

                if self._payload[self._idx['data_size']] == 0:
                        return None

                if self._payload[self._idx['data_size']] is None:
                        return None

                if filename is None:
                        filename = gmTools.get_unique_filename(prefix = 'gm-incoming_data-')

                success = gmPG2.bytea2file (
                        data_query = {
                                'cmd': 'SELECT substring(data from %(start)s for %(size)s) FROM clin.incoming_data WHERE pk = %(pk)s',
                                'args': {'pk': self.pk_obj}
                        },
                        filename = filename,
                        chunk_size = aChunkSize,
                        data_size = self._payload[self._idx['data_size']]
                )

                if not success:
                        return None

                return filename

        #--------------------------------------------------------
        def lock(self, exclusive=False):
                return gmPG2.lock_row(table = 'clin.incoming_data', pk = self.pk_obj, exclusive = exclusive)

        #--------------------------------------------------------
        def unlock(self, exclusive=False):
                return gmPG2.unlock_row(table = 'clin.incoming_data', pk = self.pk_obj, exclusive = exclusive)

        #--------------------------------------------------------
        def set_patient(self, patient):
                if patient is None:
                        pk_pat = None
                elif isinstance(patient, int):
                        pk_pat = patient
                else:
                        pk_pat = patient['pk_identity']
                if self['pk_identity'] == pk_pat:
                        return

                self['pk_identity'] = pk_pat

        patient = property(fset = set_patient)

#------------------------------------------------------------
def get_incoming_data(order_by=None, return_pks=False):
        if order_by is None:
                order_by = 'true'
        else:
                order_by = 'true ORDER BY %s' % order_by
        cmd = _SQL_get_incoming_data % order_by
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx = True)
        if return_pks:
                return [ r['pk_incoming_data'] for r in rows ]

        return [ cIncomingData(row = {'data': r, 'idx': idx, 'pk_field': 'pk_incoming_data'}) for r in rows ]

#------------------------------------------------------------
def create_incoming_data(data_type:str=None, filename:str=None, verify_import:bool=False) -> cIncomingData:
        conn = gmPG2.get_connection(readonly = False)
        args = {'typ': data_type}
        cmd = """
                INSERT INTO clin.incoming_data (type, data)
                VALUES (%(typ)s, 'new data'::bytea)
                RETURNING pk"""
        rows, idx = gmPG2.run_rw_queries (
                link_obj = conn,
                end_tx = False,
                queries = [{'cmd': cmd, 'args': args}], return_data = True, get_col_idx = False
        )
        pk = rows[0]['pk']
        incoming = cIncomingData(aPK_obj = pk, link_obj = conn)
        if incoming.update_data_from_file(fname = filename, link_obj = conn, verify_import = verify_import):
                conn.commit()
                return incoming

        conn.rollback()
        _log.debug('cannot update incoming_data stub from file, rolled back')
        return None

#------------------------------------------------------------
def delete_incoming_data(pk_incoming_data=None):
        args = {'pk': pk_incoming_data}
        cmd = "DELETE FROM clin.incoming_data WHERE pk = %(pk)s"
        gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
        return True

#------------------------------------------------------------
def data_exists(filename:str) -> bool:
        """Check by md5 hash whether data in filename already in database."""
        local_md5 = gmTools.file2md5(filename = filename)
        SQL = 'SELECT EXISTS(SELECT 1 FROM clin.incoming_data WHERE md5(data) = %(local_md5)s) AS data_exists'
        args = {'local_md5': local_md5}
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': SQL, 'args': args}])
        return rows[0]['data_exists']

#============================================================
# main
#------------------------------------------------------------
if __name__ == "__main__":

        if len(sys.argv) < 2:
                sys.exit()

        if sys.argv[1] != 'test':
                sys.exit()

        gmDateTime.init()
        gmTools.gmPaths()

        #-------------------------------------------------------
        def test_incoming_data():
                for d in get_incoming_data():
                        print(d)

        #-------------------------------------------------------
        gmPG2.request_login_params(setup_pool = True)
        test_incoming_data()

Functions

def create_incoming_data(data_type: str = None, filename: str = None, verify_import: bool = False) ‑> cIncomingData
Expand source code
def create_incoming_data(data_type:str=None, filename:str=None, verify_import:bool=False) -> cIncomingData:
        conn = gmPG2.get_connection(readonly = False)
        args = {'typ': data_type}
        cmd = """
                INSERT INTO clin.incoming_data (type, data)
                VALUES (%(typ)s, 'new data'::bytea)
                RETURNING pk"""
        rows, idx = gmPG2.run_rw_queries (
                link_obj = conn,
                end_tx = False,
                queries = [{'cmd': cmd, 'args': args}], return_data = True, get_col_idx = False
        )
        pk = rows[0]['pk']
        incoming = cIncomingData(aPK_obj = pk, link_obj = conn)
        if incoming.update_data_from_file(fname = filename, link_obj = conn, verify_import = verify_import):
                conn.commit()
                return incoming

        conn.rollback()
        _log.debug('cannot update incoming_data stub from file, rolled back')
        return None
def data_exists(filename: str) ‑> bool

Check by md5 hash whether data in filename already in database.

Expand source code
def data_exists(filename:str) -> bool:
        """Check by md5 hash whether data in filename already in database."""
        local_md5 = gmTools.file2md5(filename = filename)
        SQL = 'SELECT EXISTS(SELECT 1 FROM clin.incoming_data WHERE md5(data) = %(local_md5)s) AS data_exists'
        args = {'local_md5': local_md5}
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': SQL, 'args': args}])
        return rows[0]['data_exists']
def delete_incoming_data(pk_incoming_data=None)
Expand source code
def delete_incoming_data(pk_incoming_data=None):
        args = {'pk': pk_incoming_data}
        cmd = "DELETE FROM clin.incoming_data WHERE pk = %(pk)s"
        gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
        return True
def get_incoming_data(order_by=None, return_pks=False)
Expand source code
def get_incoming_data(order_by=None, return_pks=False):
        if order_by is None:
                order_by = 'true'
        else:
                order_by = 'true ORDER BY %s' % order_by
        cmd = _SQL_get_incoming_data % order_by
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx = True)
        if return_pks:
                return [ r['pk_incoming_data'] for r in rows ]

        return [ cIncomingData(row = {'data': r, 'idx': idx, 'pk_field': 'pk_incoming_data'}) for r in rows ]

Classes

class cIncomingData (aPK_obj: int | dict = None, row: dict = None, link_obj=None)

Represents items of incoming data, say, HL7 snippets.

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)

Args

aPK_obj
retrieve data from backend
  • a simple value the primary key WHERE condition must be a simple column
  • a dictionary of values the primary key WHERE condition must be a subselect consuming the dict and producing the single-value primary key
row
must hold the fields
  • idx: a dict mapping field names to position
  • data: the field values in a list (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key field OR
  • pk_obj: a dictionary suitable for passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'idx': idx,
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    }
    rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
    objects = [ cChildClass(row = {'data': r, 'idx': idx, 'pk_field': 'the PK column name'}) for r in rows ]
    
Expand source code
class cIncomingData(gmBusinessDBObject.cBusinessDBObject):
        """Represents items of incoming data, say, HL7 snippets."""

        _cmd_fetch_payload = _SQL_get_incoming_data % "pk_incoming_data = %s"
        _cmds_store_payload = [
                """UPDATE clin.incoming_data SET
                                fk_patient_candidates = %(pk_patient_candidates)s,
                                fk_identity = %(pk_identity)s,
                                fk_provider_disambiguated = %(pk_provider_disambiguated)s,
                                request_id = gm.nullify_empty_string(%(request_id)s),
                                firstnames = gm.nullify_empty_string(%(firstnames)s),
                                lastnames = gm.nullify_empty_string(%(lastnames)s),
                                dob = %(dob)s,
                                postcode = gm.nullify_empty_string(%(postcode)s),
                                other_info = gm.nullify_empty_string(%(other_info)s),
                                type = gm.nullify_empty_string(%(data_type)s),
                                gender = gm.nullify_empty_string(%(gender)s),
                                requestor = gm.nullify_empty_string(%(requestor)s),
                                external_data_id = gm.nullify_empty_string(%(external_data_id)s),
                                comment = gm.nullify_empty_string(%(comment)s)
                        WHERE
                                pk = %(pk_incoming_data)s
                                        AND
                                xmin = %(xmin_incoming_data)s
                        RETURNING
                                xmin as xmin_incoming_data,
                                octet_length(data) as data_size
                """
        ]
        # view columns that can be updated:
        _updatable_fields = [
                'pk_patient_candidates',
                'request_id',                                           # request ID as found in <data>
                'firstnames',
                'lastnames',
                'dob',
                'postcode',
                'other_info',                                           # other identifying info in .data
                'data_type',
                'gender',
                'requestor',                                            # Requestor of data (e.g. who ordered test results) if available in source data.
                'external_data_id',                             # ID of content of .data in external system (e.g. importer) where appropriate
                'comment',                                                      # a free text comment on this row, eg. why is it here, error logs etc
                'pk_identity',
                'pk_provider_disambiguated'             # The provider the data is relevant to.
        ]
        #--------------------------------------------------------
        def format(self):
                return '%s' % self
        #--------------------------------------------------------
        def _format_patient_identification(self):
                tmp = '%s %s %s' % (
                        gmTools.coalesce(self._payload[self._idx['lastnames']], '', 'last=%s'),
                        gmTools.coalesce(self._payload[self._idx['firstnames']], '', 'first=%s'),
                        gmTools.coalesce(self._payload[self._idx['gender']], '', 'gender=%s')
                )
                if self._payload[self._idx['dob']] is not None:
                        tmp += ' dob=%s' % gmDateTime.pydt_strftime(self._payload[self._idx['dob']], '%Y %b %d')
                return tmp

        patient_identification = property(_format_patient_identification)

        #--------------------------------------------------------
        def update_data_from_file(self, fname=None, link_obj=None, verify_import:bool=False):
                # sanity check
                if not (os.access(fname, os.R_OK) and os.path.isfile(fname)):
                        _log.error('[%s] is not a readable file' % fname)
                        return False

                _log.debug('updating [pk=%s] from [%s]', self.pk_obj, fname)
                gmPG2.file2bytea (
                        query = "UPDATE clin.incoming_data SET data = %(data)s::bytea WHERE pk = %(pk)s",
                        filename = fname,
                        args = {'pk': self.pk_obj},
                        conn = link_obj
                )
                # must update XMIN now ...
                self.refetch_payload(link_obj = link_obj)
                if not verify_import:
                        return True

                SQL = 'SELECT (md5(data) = %(local_md5)s) AS verified FROM clin.incoming_data WHERE pk = %(pk)s'
                args = {'pk': self.pk_obj, 'local_md5': gmTools.file2md5(filename = fname)}
                rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': SQL, 'args': args}])
                return rows[0]['verified']

        #--------------------------------------------------------
        def save_to_file(self, aChunkSize=0, filename=None):

                if self._payload[self._idx['data_size']] == 0:
                        return None

                if self._payload[self._idx['data_size']] is None:
                        return None

                if filename is None:
                        filename = gmTools.get_unique_filename(prefix = 'gm-incoming_data-')

                success = gmPG2.bytea2file (
                        data_query = {
                                'cmd': 'SELECT substring(data from %(start)s for %(size)s) FROM clin.incoming_data WHERE pk = %(pk)s',
                                'args': {'pk': self.pk_obj}
                        },
                        filename = filename,
                        chunk_size = aChunkSize,
                        data_size = self._payload[self._idx['data_size']]
                )

                if not success:
                        return None

                return filename

        #--------------------------------------------------------
        def lock(self, exclusive=False):
                return gmPG2.lock_row(table = 'clin.incoming_data', pk = self.pk_obj, exclusive = exclusive)

        #--------------------------------------------------------
        def unlock(self, exclusive=False):
                return gmPG2.unlock_row(table = 'clin.incoming_data', pk = self.pk_obj, exclusive = exclusive)

        #--------------------------------------------------------
        def set_patient(self, patient):
                if patient is None:
                        pk_pat = None
                elif isinstance(patient, int):
                        pk_pat = patient
                else:
                        pk_pat = patient['pk_identity']
                if self['pk_identity'] == pk_pat:
                        return

                self['pk_identity'] = pk_pat

        patient = property(fset = set_patient)

Ancestors

Instance variables

var patient
var patient_identification
Expand source code
def _format_patient_identification(self):
        tmp = '%s %s %s' % (
                gmTools.coalesce(self._payload[self._idx['lastnames']], '', 'last=%s'),
                gmTools.coalesce(self._payload[self._idx['firstnames']], '', 'first=%s'),
                gmTools.coalesce(self._payload[self._idx['gender']], '', 'gender=%s')
        )
        if self._payload[self._idx['dob']] is not None:
                tmp += ' dob=%s' % gmDateTime.pydt_strftime(self._payload[self._idx['dob']], '%Y %b %d')
        return tmp

Methods

def lock(self, exclusive=False)
Expand source code
def lock(self, exclusive=False):
        return gmPG2.lock_row(table = 'clin.incoming_data', pk = self.pk_obj, exclusive = exclusive)
def save_to_file(self, aChunkSize=0, filename=None)
Expand source code
def save_to_file(self, aChunkSize=0, filename=None):

        if self._payload[self._idx['data_size']] == 0:
                return None

        if self._payload[self._idx['data_size']] is None:
                return None

        if filename is None:
                filename = gmTools.get_unique_filename(prefix = 'gm-incoming_data-')

        success = gmPG2.bytea2file (
                data_query = {
                        'cmd': 'SELECT substring(data from %(start)s for %(size)s) FROM clin.incoming_data WHERE pk = %(pk)s',
                        'args': {'pk': self.pk_obj}
                },
                filename = filename,
                chunk_size = aChunkSize,
                data_size = self._payload[self._idx['data_size']]
        )

        if not success:
                return None

        return filename
def set_patient(self, patient)
Expand source code
def set_patient(self, patient):
        if patient is None:
                pk_pat = None
        elif isinstance(patient, int):
                pk_pat = patient
        else:
                pk_pat = patient['pk_identity']
        if self['pk_identity'] == pk_pat:
                return

        self['pk_identity'] = pk_pat
def unlock(self, exclusive=False)
Expand source code
def unlock(self, exclusive=False):
        return gmPG2.unlock_row(table = 'clin.incoming_data', pk = self.pk_obj, exclusive = exclusive)
def update_data_from_file(self, fname=None, link_obj=None, verify_import: bool = False)
Expand source code
def update_data_from_file(self, fname=None, link_obj=None, verify_import:bool=False):
        # sanity check
        if not (os.access(fname, os.R_OK) and os.path.isfile(fname)):
                _log.error('[%s] is not a readable file' % fname)
                return False

        _log.debug('updating [pk=%s] from [%s]', self.pk_obj, fname)
        gmPG2.file2bytea (
                query = "UPDATE clin.incoming_data SET data = %(data)s::bytea WHERE pk = %(pk)s",
                filename = fname,
                args = {'pk': self.pk_obj},
                conn = link_obj
        )
        # must update XMIN now ...
        self.refetch_payload(link_obj = link_obj)
        if not verify_import:
                return True

        SQL = 'SELECT (md5(data) = %(local_md5)s) AS verified FROM clin.incoming_data WHERE pk = %(pk)s'
        args = {'pk': self.pk_obj, 'local_md5': gmTools.file2md5(filename = fname)}
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': SQL, 'args': args}])
        return rows[0]['verified']

Inherited members