Module Gnumed.business.gmGenericEMRItem

GNUmed clinical business object in generic form.

license: GPL v2 or later

Functions

def generic_item_type_str(table)
Expand source code
def generic_item_type_str(table):
        try:
                return _MAP_generic_emr_item_table2type_str[table]
        except KeyError:
                return _('unmapped entry type from table [%s]') % table
def get_generic_emr_items(encounters=None,
episodes=None,
issues=None,
patient=None,
soap_cats=None,
time_range=None,
order_by=None,
active_encounter=None,
return_pks=False)
Expand source code
def get_generic_emr_items(encounters=None, episodes=None, issues=None, patient=None, soap_cats=None, time_range=None, order_by=None, active_encounter=None, return_pks=False):

        faulty_args = (
                (patient is None) and
                (encounters is None) and
                (episodes is None) and
                (issues is None) and
                (active_encounter is None)
        )
        assert not faulty_args, 'one of <patient>, <episodes>, <issues>, <active_encounter> must not be None'

        if (patient is not None) and (active_encounter is not None):
                if patient != active_encounter['pk_patient']:
                        raise AssertionError('<patient> (%s) and <active_encounter>["pk_patient"] (%s) must match, if both given', patient, active_encounter['pk_patient'])

        if order_by is None:
                order_by = 'ORDER BY clin_when, pk_episode, scr, modified_when, src_table'
        else:
                order_by = 'ORDER BY %s' % order_by

        if (patient is None) and (active_encounter is not None):
                patient = active_encounter['pk_patient']

        where_parts = []
        args = {}

        if patient is not None:
                where_parts.append('c_vej.pk_patient = %(pat)s')
                args['pat'] = patient
        if soap_cats is not None:
                # work around bug in psycopg2 not being able to properly
                # adapt None to NULL inside tuples
                if None in soap_cats:
                        where_parts.append('((c_vej.soap_cat = ANY(%(soap_cat)s)) OR (c_vej.soap_cat IS NULL))')
                        soap_cats.remove(None)
                else:
                        where_parts.append('c_vej.soap_cat = ANY(%(soap_cat)s)')
                args['soap_cat'] = soap_cats
        if time_range is not None:
                where_parts.append("c_vej.clin_when > (now() - '%s days'::interval)" % time_range)
        if encounters is not None:
                where_parts.append("c_vej.pk_encounter = ANY(%(encs)s)")
                args['encs'] = encounters
        if episodes is not None:
                where_parts.append("c_vej.pk_episode = ANY(%(epis)s)")
                args['epis'] = episodes
        if issues is not None:
                where_parts.append("c_vej.pk_health_issue = ANY(%(issues)s)")
                args['issues'] = issues

        cmd_journal = _SQL_get_generic_emr_items
        if len(where_parts) > 0:
                cmd_journal += '\nWHERE\n\t'
                cmd_journal += '\t\tAND\t'.join(where_parts)

        if active_encounter is None:
                cmd = cmd_journal + '\n' + order_by
        else:
                args['pk_enc'] = active_encounter['pk_encounter']
                args['enc_start'] = active_encounter['started']
                args['enc_last_affirmed'] = active_encounter['last_affirmed']
                args['enc_type'] = active_encounter['l10n_type']
                args['enc_pat'] = active_encounter['pk_patient']
                cmd = __SQL_union % (
                        cmd_journal,
                        _SQL_get_hints_as_generic_emr_items
                ) + '\n' + order_by

        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        if return_pks:
                return [ {
                        'src_table': r['src_table'],
                        'src_pk': r['src_pk']
                } for r in rows ]

        return [ cGenericEMRItem(row = {
                'data': r,
                'pk_obj': {'src_table': r['src_table'], 'src_pk': r['src_pk']}
        } ) for r in rows ]

Classes

class cGenericEMRItem (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
Expand source code
class cGenericEMRItem(gmBusinessDBObject.cBusinessDBObject):
        """Represents an entry in clin.v_emr_journal."""

        _cmd_fetch_payload:str = _SQL_get_generic_emr_items + "WHERE src_table = %(src_table)s AND src_pk = %(src_pk)s"
        _cmds_store_payload:list = []
        _updatable_fields:list = ['']

        #--------------------------------------------------------
        def format(self, eol=None):
                lines = self.formatted_header
                lines.append(gmTools.u_box_horiz_4dashes * 40)
                lines.extend(self._payload['narrative'].strip().split('\n'))
                lines.append('')
                lines.append(_('                        rev %s (%s) by %s in <%s>') % (
                        self._payload['row_version'],
                        self._payload['date_modified'],
                        self._payload['modified_by'],
                        self._payload['src_table']
                ))
                if eol is None:
                        return lines
                return eol.join(lines)

        #--------------------------------------------------------
        def format_header(self, eol=None):
                lines = []
                lines.append(_('Chart entry (%s): %s       [#%s in %s]') % (
                        self.i18n_soap_cat,
                        self.item_type_str,
                        self._payload['src_pk'],
                        self._payload['src_table']
                ))
                lines.append(_(' Modified: %s by %s (%s rev %s)') % (
                        self._payload['date_modified'],
                        self._payload['modified_by'],
                        gmTools.u_arrow2right,
                        self._payload['row_version']
                ))
                lines.append('')
                if self._payload['health_issue'] is None:
                        issue_info = gmTools.u_diameter
                else:
                        issue_info = '%s%s' % (
                                self._payload['health_issue'],
                                gmTools.bool2subst(self._payload['issue_active'], ' (' + _('active') + ')', ' (' + _('inactive') + ')', '')
                        )
                lines.append(_('Health issue: %s') % issue_info)
                if self._payload['episode'] is None:
                        episode_info = gmTools.u_diameter
                else:
                        episode_info = '%s%s' % (
                                self._payload['episode'],
                                gmTools.bool2subst(self._payload['episode_open'], ' (' +  _('open') + ')', ' (' +  _('closed') + ')', '')
                        )
                lines.append(_('Episode: %s') % episode_info)
                if self._payload['encounter_started'] is None:
                        enc_info = gmTools.u_diameter
                else:
                        enc_info = '%s - %s (%s)' % (
                                self._payload['encounter_started'].strftime('%Y %b %d  %H:%M'),
                                self._payload['encounter_last_affirmed'].strftime('%H:%M'),
                                self._payload['encounter_l10n_type']
                        )
                lines.append(_('Encounter: %s') % enc_info)
                lines.append(_('Event: %s') % self._payload['clin_when'].strftime('%Y %b %d  %H:%M'))
                if eol is None:
                        return lines

                return eol.join(lines)

        formatted_header = property(format_header)

        #--------------------------------------------------------
        def __get_item_type_str(self):
                try:
                        return _MAP_generic_emr_item_table2type_str[self._payload['src_table']]
                except KeyError:
                        return '[%s:%s]' % (
                                self._payload['src_table'],
                                self._payload['src_pk']
                        )

        item_type_str = property(__get_item_type_str)

        #--------------------------------------------------------
        def __get_i18n_soap_cat(self):
                return gmSoapDefs.soap_cat2l10n[self._payload['soap_cat']]

        i18n_soap_cat = property(__get_i18n_soap_cat)

        #--------------------------------------------------------
        def __get_specialized_item(self):
                item_class = _MAP_generic_emr_item_table2class[self._payload['src_table']]
                return item_class(aPK_obj = self._payload['src_pk'])

        specialized_item = property(__get_specialized_item)

Represents an entry in clin.v_emr_journal.

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)

Args

aPK_obj
retrieve data from backend
  • an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
  • a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
must hold the fields
  • data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key column OR
  • pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    }
    
    rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
    objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
    

Ancestors

Instance variables

prop formatted_header
Expand source code
def format_header(self, eol=None):
        lines = []
        lines.append(_('Chart entry (%s): %s       [#%s in %s]') % (
                self.i18n_soap_cat,
                self.item_type_str,
                self._payload['src_pk'],
                self._payload['src_table']
        ))
        lines.append(_(' Modified: %s by %s (%s rev %s)') % (
                self._payload['date_modified'],
                self._payload['modified_by'],
                gmTools.u_arrow2right,
                self._payload['row_version']
        ))
        lines.append('')
        if self._payload['health_issue'] is None:
                issue_info = gmTools.u_diameter
        else:
                issue_info = '%s%s' % (
                        self._payload['health_issue'],
                        gmTools.bool2subst(self._payload['issue_active'], ' (' + _('active') + ')', ' (' + _('inactive') + ')', '')
                )
        lines.append(_('Health issue: %s') % issue_info)
        if self._payload['episode'] is None:
                episode_info = gmTools.u_diameter
        else:
                episode_info = '%s%s' % (
                        self._payload['episode'],
                        gmTools.bool2subst(self._payload['episode_open'], ' (' +  _('open') + ')', ' (' +  _('closed') + ')', '')
                )
        lines.append(_('Episode: %s') % episode_info)
        if self._payload['encounter_started'] is None:
                enc_info = gmTools.u_diameter
        else:
                enc_info = '%s - %s (%s)' % (
                        self._payload['encounter_started'].strftime('%Y %b %d  %H:%M'),
                        self._payload['encounter_last_affirmed'].strftime('%H:%M'),
                        self._payload['encounter_l10n_type']
                )
        lines.append(_('Encounter: %s') % enc_info)
        lines.append(_('Event: %s') % self._payload['clin_when'].strftime('%Y %b %d  %H:%M'))
        if eol is None:
                return lines

        return eol.join(lines)
prop i18n_soap_cat
Expand source code
def __get_i18n_soap_cat(self):
        return gmSoapDefs.soap_cat2l10n[self._payload['soap_cat']]
prop item_type_str
Expand source code
def __get_item_type_str(self):
        try:
                return _MAP_generic_emr_item_table2type_str[self._payload['src_table']]
        except KeyError:
                return '[%s:%s]' % (
                        self._payload['src_table'],
                        self._payload['src_pk']
                )
prop specialized_item
Expand source code
def __get_specialized_item(self):
        item_class = _MAP_generic_emr_item_table2class[self._payload['src_table']]
        return item_class(aPK_obj = self._payload['src_pk'])

Methods

def format_header(self, eol=None)
Expand source code
def format_header(self, eol=None):
        lines = []
        lines.append(_('Chart entry (%s): %s       [#%s in %s]') % (
                self.i18n_soap_cat,
                self.item_type_str,
                self._payload['src_pk'],
                self._payload['src_table']
        ))
        lines.append(_(' Modified: %s by %s (%s rev %s)') % (
                self._payload['date_modified'],
                self._payload['modified_by'],
                gmTools.u_arrow2right,
                self._payload['row_version']
        ))
        lines.append('')
        if self._payload['health_issue'] is None:
                issue_info = gmTools.u_diameter
        else:
                issue_info = '%s%s' % (
                        self._payload['health_issue'],
                        gmTools.bool2subst(self._payload['issue_active'], ' (' + _('active') + ')', ' (' + _('inactive') + ')', '')
                )
        lines.append(_('Health issue: %s') % issue_info)
        if self._payload['episode'] is None:
                episode_info = gmTools.u_diameter
        else:
                episode_info = '%s%s' % (
                        self._payload['episode'],
                        gmTools.bool2subst(self._payload['episode_open'], ' (' +  _('open') + ')', ' (' +  _('closed') + ')', '')
                )
        lines.append(_('Episode: %s') % episode_info)
        if self._payload['encounter_started'] is None:
                enc_info = gmTools.u_diameter
        else:
                enc_info = '%s - %s (%s)' % (
                        self._payload['encounter_started'].strftime('%Y %b %d  %H:%M'),
                        self._payload['encounter_last_affirmed'].strftime('%H:%M'),
                        self._payload['encounter_l10n_type']
                )
        lines.append(_('Encounter: %s') % enc_info)
        lines.append(_('Event: %s') % self._payload['clin_when'].strftime('%Y %b %d  %H:%M'))
        if eol is None:
                return lines

        return eol.join(lines)

Inherited members