Module Gnumed.business.gmGenericEMRItem
GNUmed clinical business object in generic form.
license: GPL v2 or later
Functions
def generic_item_type_str(table)
-
Expand source code
def generic_item_type_str(table): try: return _MAP_generic_emr_item_table2type_str[table] except KeyError: return _('unmapped entry type from table [%s]') % table
def get_generic_emr_items(encounters=None,
episodes=None,
issues=None,
patient=None,
soap_cats=None,
time_range=None,
order_by=None,
active_encounter=None,
return_pks=False)-
Expand source code
def get_generic_emr_items(encounters=None, episodes=None, issues=None, patient=None, soap_cats=None, time_range=None, order_by=None, active_encounter=None, return_pks=False): faulty_args = ( (patient is None) and (encounters is None) and (episodes is None) and (issues is None) and (active_encounter is None) ) assert not faulty_args, 'one of <patient>, <episodes>, <issues>, <active_encounter> must not be None' if (patient is not None) and (active_encounter is not None): if patient != active_encounter['pk_patient']: raise AssertionError('<patient> (%s) and <active_encounter>["pk_patient"] (%s) must match, if both given', patient, active_encounter['pk_patient']) if order_by is None: order_by = 'ORDER BY clin_when, pk_episode, scr, modified_when, src_table' else: order_by = 'ORDER BY %s' % order_by if (patient is None) and (active_encounter is not None): patient = active_encounter['pk_patient'] where_parts = [] args = {} if patient is not None: where_parts.append('c_vej.pk_patient = %(pat)s') args['pat'] = patient if soap_cats is not None: # work around bug in psycopg2 not being able to properly # adapt None to NULL inside tuples if None in soap_cats: where_parts.append('((c_vej.soap_cat = ANY(%(soap_cat)s)) OR (c_vej.soap_cat IS NULL))') soap_cats.remove(None) else: where_parts.append('c_vej.soap_cat = ANY(%(soap_cat)s)') args['soap_cat'] = soap_cats if time_range is not None: where_parts.append("c_vej.clin_when > (now() - '%s days'::interval)" % time_range) if encounters is not None: where_parts.append("c_vej.pk_encounter = ANY(%(encs)s)") args['encs'] = encounters if episodes is not None: where_parts.append("c_vej.pk_episode = ANY(%(epis)s)") args['epis'] = episodes if issues is not None: where_parts.append("c_vej.pk_health_issue = ANY(%(issues)s)") args['issues'] = issues cmd_journal = _SQL_get_generic_emr_items if len(where_parts) > 0: cmd_journal += '\nWHERE\n\t' cmd_journal += '\t\tAND\t'.join(where_parts) if active_encounter is None: cmd = cmd_journal + '\n' + order_by else: args['pk_enc'] = active_encounter['pk_encounter'] args['enc_start'] = active_encounter['started'] args['enc_last_affirmed'] = active_encounter['last_affirmed'] args['enc_type'] = active_encounter['l10n_type'] args['enc_pat'] = active_encounter['pk_patient'] cmd = __SQL_union % ( cmd_journal, _SQL_get_hints_as_generic_emr_items ) + '\n' + order_by rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if return_pks: return [ { 'src_table': r['src_table'], 'src_pk': r['src_pk'] } for r in rows ] return [ cGenericEMRItem(row = { 'data': r, 'pk_obj': {'src_table': r['src_table'], 'src_pk': r['src_pk']} } ) for r in rows ]
Classes
class cGenericEMRItem (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Expand source code
class cGenericEMRItem(gmBusinessDBObject.cBusinessDBObject): """Represents an entry in clin.v_emr_journal.""" _cmd_fetch_payload:str = _SQL_get_generic_emr_items + "WHERE src_table = %(src_table)s AND src_pk = %(src_pk)s" _cmds_store_payload:list = [] _updatable_fields:list = [''] #-------------------------------------------------------- def format(self, eol=None): lines = self.formatted_header lines.append(gmTools.u_box_horiz_4dashes * 40) lines.extend(self._payload['narrative'].strip().split('\n')) lines.append('') lines.append(_(' rev %s (%s) by %s in <%s>') % ( self._payload['row_version'], self._payload['date_modified'], self._payload['modified_by'], self._payload['src_table'] )) if eol is None: return lines return eol.join(lines) #-------------------------------------------------------- def format_header(self, eol=None): lines = [] lines.append(_('Chart entry (%s): %s [#%s in %s]') % ( self.i18n_soap_cat, self.item_type_str, self._payload['src_pk'], self._payload['src_table'] )) lines.append(_(' Modified: %s by %s (%s rev %s)') % ( self._payload['date_modified'], self._payload['modified_by'], gmTools.u_arrow2right, self._payload['row_version'] )) lines.append('') if self._payload['health_issue'] is None: issue_info = gmTools.u_diameter else: issue_info = '%s%s' % ( self._payload['health_issue'], gmTools.bool2subst(self._payload['issue_active'], ' (' + _('active') + ')', ' (' + _('inactive') + ')', '') ) lines.append(_('Health issue: %s') % issue_info) if self._payload['episode'] is None: episode_info = gmTools.u_diameter else: episode_info = '%s%s' % ( self._payload['episode'], gmTools.bool2subst(self._payload['episode_open'], ' (' + _('open') + ')', ' (' + _('closed') + ')', '') ) lines.append(_('Episode: %s') % episode_info) if self._payload['encounter_started'] is None: enc_info = gmTools.u_diameter else: enc_info = '%s - %s (%s)' % ( self._payload['encounter_started'].strftime('%Y %b %d %H:%M'), self._payload['encounter_last_affirmed'].strftime('%H:%M'), self._payload['encounter_l10n_type'] ) lines.append(_('Encounter: %s') % enc_info) lines.append(_('Event: %s') % self._payload['clin_when'].strftime('%Y %b %d %H:%M')) if eol is None: return lines return eol.join(lines) formatted_header = property(format_header) #-------------------------------------------------------- def __get_item_type_str(self): try: return _MAP_generic_emr_item_table2type_str[self._payload['src_table']] except KeyError: return '[%s:%s]' % ( self._payload['src_table'], self._payload['src_pk'] ) item_type_str = property(__get_item_type_str) #-------------------------------------------------------- def __get_i18n_soap_cat(self): return gmSoapDefs.soap_cat2l10n[self._payload['soap_cat']] i18n_soap_cat = property(__get_i18n_soap_cat) #-------------------------------------------------------- def __get_specialized_item(self): item_class = _MAP_generic_emr_item_table2class[self._payload['src_table']] return item_class(aPK_obj = self._payload['src_pk']) specialized_item = property(__get_specialized_item)
Represents an entry in clin.v_emr_journal.
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Ancestors
Instance variables
prop formatted_header
-
Expand source code
def format_header(self, eol=None): lines = [] lines.append(_('Chart entry (%s): %s [#%s in %s]') % ( self.i18n_soap_cat, self.item_type_str, self._payload['src_pk'], self._payload['src_table'] )) lines.append(_(' Modified: %s by %s (%s rev %s)') % ( self._payload['date_modified'], self._payload['modified_by'], gmTools.u_arrow2right, self._payload['row_version'] )) lines.append('') if self._payload['health_issue'] is None: issue_info = gmTools.u_diameter else: issue_info = '%s%s' % ( self._payload['health_issue'], gmTools.bool2subst(self._payload['issue_active'], ' (' + _('active') + ')', ' (' + _('inactive') + ')', '') ) lines.append(_('Health issue: %s') % issue_info) if self._payload['episode'] is None: episode_info = gmTools.u_diameter else: episode_info = '%s%s' % ( self._payload['episode'], gmTools.bool2subst(self._payload['episode_open'], ' (' + _('open') + ')', ' (' + _('closed') + ')', '') ) lines.append(_('Episode: %s') % episode_info) if self._payload['encounter_started'] is None: enc_info = gmTools.u_diameter else: enc_info = '%s - %s (%s)' % ( self._payload['encounter_started'].strftime('%Y %b %d %H:%M'), self._payload['encounter_last_affirmed'].strftime('%H:%M'), self._payload['encounter_l10n_type'] ) lines.append(_('Encounter: %s') % enc_info) lines.append(_('Event: %s') % self._payload['clin_when'].strftime('%Y %b %d %H:%M')) if eol is None: return lines return eol.join(lines)
prop i18n_soap_cat
-
Expand source code
def __get_i18n_soap_cat(self): return gmSoapDefs.soap_cat2l10n[self._payload['soap_cat']]
prop item_type_str
-
Expand source code
def __get_item_type_str(self): try: return _MAP_generic_emr_item_table2type_str[self._payload['src_table']] except KeyError: return '[%s:%s]' % ( self._payload['src_table'], self._payload['src_pk'] )
prop specialized_item
-
Expand source code
def __get_specialized_item(self): item_class = _MAP_generic_emr_item_table2class[self._payload['src_table']] return item_class(aPK_obj = self._payload['src_pk'])
Methods
def format_header(self, eol=None)
-
Expand source code
def format_header(self, eol=None): lines = [] lines.append(_('Chart entry (%s): %s [#%s in %s]') % ( self.i18n_soap_cat, self.item_type_str, self._payload['src_pk'], self._payload['src_table'] )) lines.append(_(' Modified: %s by %s (%s rev %s)') % ( self._payload['date_modified'], self._payload['modified_by'], gmTools.u_arrow2right, self._payload['row_version'] )) lines.append('') if self._payload['health_issue'] is None: issue_info = gmTools.u_diameter else: issue_info = '%s%s' % ( self._payload['health_issue'], gmTools.bool2subst(self._payload['issue_active'], ' (' + _('active') + ')', ' (' + _('inactive') + ')', '') ) lines.append(_('Health issue: %s') % issue_info) if self._payload['episode'] is None: episode_info = gmTools.u_diameter else: episode_info = '%s%s' % ( self._payload['episode'], gmTools.bool2subst(self._payload['episode_open'], ' (' + _('open') + ')', ' (' + _('closed') + ')', '') ) lines.append(_('Episode: %s') % episode_info) if self._payload['encounter_started'] is None: enc_info = gmTools.u_diameter else: enc_info = '%s - %s (%s)' % ( self._payload['encounter_started'].strftime('%Y %b %d %H:%M'), self._payload['encounter_last_affirmed'].strftime('%H:%M'), self._payload['encounter_l10n_type'] ) lines.append(_('Encounter: %s') % enc_info) lines.append(_('Event: %s') % self._payload['clin_when'].strftime('%Y %b %d %H:%M')) if eol is None: return lines return eol.join(lines)
Inherited members