Module Gnumed.business.gmEpisode
GNUmed episode related business object.
license: GPL v2 or later
Functions
def create_episode(pk_health_issue: int = None,
episode_name: str = None,
is_open: bool = False,
allow_dupes: bool = False,
encounter: int = None,
link_obj=None)-
Expand source code
def create_episode(pk_health_issue:int=None, episode_name:str=None, is_open:bool=False, allow_dupes:bool=False, encounter:int=None, link_obj=None): """Creates a new episode for a given patient's health issue. pk_health_issue - given health issue PK episode_name - name of episode is_open - whether a *new* episode is to be open (don't tinker with existing ones) """ if not allow_dupes: try: return cEpisode(name = episode_name, health_issue = pk_health_issue, encounter = encounter, link_obj = link_obj) except gmExceptions.ConstructorError: pass queries = [] SQL = "INSERT INTO clin.episode (fk_health_issue, description, is_open, fk_encounter) VALUES (%(pk_hi)s, %(epi_name)s, %(open)s::boolean, %(pk_enc)s)" args = { 'pk_hi': pk_health_issue, 'epi_name': episode_name, 'open': is_open, 'pk_enc': encounter } queries.append({'sql': SQL, 'args': args}) queries.append({'sql': cEpisode._cmd_fetch_payload % "currval('clin.episode_pk_seq')"}) rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = queries, return_data = True) return cEpisode(row = {'data': rows[0], 'pk_field': 'pk_episode'})
Creates a new episode for a given patient's health issue.
pk_health_issue - given health issue PK episode_name - name of episode is_open - whether a new episode is to be open (don't tinker with existing ones)
def delete_episode(episode=None)
-
Expand source code
def delete_episode(episode=None): if isinstance(episode, cEpisode): pk = episode['pk_episode'] else: pk = int(episode) cmd = 'DELETE FROM clin.episode WHERE pk = %(pk)s' try: gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': {'pk': pk}}]) except gmPG2.dbapi.IntegrityError: # should be parsing pgcode/and or error message _log.exception('cannot delete episode, it is in use') return False return True
def format_clinical_duration_of_episode(start=None, end=None)
-
Expand source code
def format_clinical_duration_of_episode(start=None, end=None): assert (start is not None), '<start> must not be None' if end is None: start_end_str = '%s-%s' % ( start.strftime("%b'%y"), gmTools.u_ellipsis ) start_end_str_long = '%s - %s' % ( start.strftime('%b %d %Y'), gmTools.u_ellipsis ) duration_str = _('%s so far') % gmDateTime.format_interval_medically(gmDateTime.pydt_now_here() - start) return (start_end_str, start_end_str_long, duration_str) duration_str = gmDateTime.format_interval_medically(end - start) # year different: if end.year != start.year: start_end_str = '%s-%s' % ( start.strftime("%b'%y"), end.strftime("%b'%y") ) start_end_str_long = '%s - %s' % ( start.strftime('%b %d %Y'), end.strftime('%b %d %Y') ) return (start_end_str, start_end_str_long, duration_str) # same year: if end.month != start.month: start_end_str = '%s-%s' % ( start.strftime('%b'), end.strftime("%b'%y") ) start_end_str_long = '%s - %s' % ( start.strftime('%b %d'), end.strftime('%b %d %Y') ) return (start_end_str, start_end_str_long, duration_str) # same year and same month start_end_str = start.strftime("%b'%y") start_end_str_long = start.strftime('%b %d %Y') return (start_end_str, start_end_str_long, duration_str)
def get_best_guess_clinical_end_date_for_episode(pk_episode=None)
-
Expand source code
def get_best_guess_clinical_end_date_for_episode(pk_episode=None): assert (pk_episode is not None), '<pk_episode> must not be None' query = { 'sql': _SQL_best_guess_clinical_end_date_for_episode, 'args': {'pk': pk_episode} } rows = gmPG2.run_ro_queries(queries = [query]) return rows[0][0]
def get_best_guess_clinical_start_date_for_episode(pk_episode=None)
-
Expand source code
def get_best_guess_clinical_start_date_for_episode(pk_episode=None): assert (pk_episode is not None), '<pk_episode> must not be None' query = { 'sql': _SQL_best_guess_clinical_start_date_for_episode, 'args': {'pk': pk_episode} } rows = gmPG2.run_ro_queries(queries = [query]) return rows[0][0]
Classes
class cEpisode (aPK_obj=None,
id_patient=None,
name='xxxDEFAULTxxx',
health_issue=None,
row=None,
encounter=None,
link_obj=None)-
Expand source code
class cEpisode(gmBusinessDBObject.cBusinessDBObject): """Represents one clinical episode. """ _cmd_fetch_payload = "select * from clin.v_pat_episodes where pk_episode=%s" _cmds_store_payload = [ """update clin.episode set fk_health_issue = %(pk_health_issue)s, is_open = %(episode_open)s::boolean, description = %(description)s, summary = gm.nullify_empty_string(%(summary)s), diagnostic_certainty_classification = gm.nullify_empty_string(%(diagnostic_certainty_classification)s) where pk = %(pk_episode)s and xmin = %(xmin_episode)s""", """select xmin_episode from clin.v_pat_episodes where pk_episode = %(pk_episode)s""" ] _updatable_fields = [ 'pk_health_issue', 'episode_open', 'description', 'summary', 'diagnostic_certainty_classification' ] #-------------------------------------------------------- def __init__(self, aPK_obj=None, id_patient=None, name='xxxDEFAULTxxx', health_issue=None, row=None, encounter=None, link_obj=None): pk = aPK_obj if pk is None and row is None: where_parts = ['description = %(desc)s'] if id_patient is not None: where_parts.append('pk_patient = %(pat)s') if health_issue is not None: where_parts.append('pk_health_issue = %(issue)s') if encounter is not None: where_parts.append('pk_patient = (SELECT fk_patient FROM clin.encounter WHERE pk = %(enc)s)') args = { 'pat': id_patient, 'issue': health_issue, 'enc': encounter, 'desc': name } cmd = 'SELECT * FROM clin.v_pat_episodes WHERE %s' % ' AND '.join(where_parts) rows = gmPG2.run_ro_queries ( link_obj = link_obj, queries = [{'sql': cmd, 'args': args}], ) if len(rows) == 0: raise gmExceptions.NoSuchBusinessObjectError('no episode for [%s:%s:%s:%s]' % (id_patient, name, health_issue, encounter)) r = {'data': rows[0], 'pk_field': 'pk_episode'} gmBusinessDBObject.cBusinessDBObject.__init__(self, row=r) else: gmBusinessDBObject.cBusinessDBObject.__init__(self, aPK_obj=pk, row=row, link_obj = link_obj) #-------------------------------------------------------- # external API #-------------------------------------------------------- @classmethod def from_problem(cls, problem) -> 'cEpisode': """Initialize episode from episody-type problem.""" if isinstance(problem, cEpisode): return problem assert problem['type'] == 'episode', 'cannot convert [%s] to episode' % problem return cls(aPK_obj = problem['pk_episode']) #-------------------------------------------------------- def get_patient(self): return self._payload['pk_patient'] #-------------------------------------------------------- def get_narrative(self, soap_cats=None, encounters=None, order_by = None): return gmClinNarrative.get_narrative ( soap_cats = soap_cats, encounters = encounters, episodes = [self.pk_obj], order_by = order_by ) #-------------------------------------------------------- def rename(self, description=None): """Method for episode editing, that is, episode renaming. @param description - the new descriptive name for the encounter @type description - a string instance """ # sanity check if description.strip() == '': _log.error('<description> must be a non-empty string instance') return False # update the episode description old_description = self._payload['description'] self._payload['description'] = description.strip() self._is_modified = True successful, data = self.save_payload() if not successful: _log.error('cannot rename episode [%s] to [%s]' % (self, description)) self._payload['description'] = old_description return False return True #-------------------------------------------------------- def add_code(self, pk_code=None): """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)""" if pk_code in self._payload['pk_generic_codes']: return cmd = """ INSERT INTO clin.lnk_code2episode (fk_item, fk_generic_code) SELECT %(item)s, %(code)s WHERE NOT EXISTS ( SELECT 1 FROM clin.lnk_code2episode WHERE fk_item = %(item)s AND fk_generic_code = %(code)s )""" args = { 'item': self._payload['pk_episode'], 'code': pk_code } gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return #-------------------------------------------------------- def remove_code(self, pk_code=None): """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)""" cmd = "DELETE FROM clin.lnk_code2episode WHERE fk_item = %(item)s AND fk_generic_code = %(code)s" args = { 'item': self._payload['pk_episode'], 'code': pk_code } gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True #-------------------------------------------------------- def format_as_journal(self, left_margin=0, date_format='%Y %b %d, %a'): rows = gmClinNarrative.get_as_journal ( episodes = [self.pk_obj], order_by = 'pk_encounter, clin_when, scr, src_table' #order_by = u'pk_encounter, scr, clin_when, src_table' ) if len(rows) == 0: return '' lines = [] lines.append(_('Clinical data generated during encounters within this episode:')) left_margin = ' ' * left_margin prev_enc = None for row in rows: if row['pk_encounter'] != prev_enc: lines.append('') prev_enc = row['pk_encounter'] when = row['clin_when'].strftime(date_format) top_row = '%s%s %s (%s) %s' % ( gmTools.u_box_top_left_arc, gmTools.u_box_horiz_single, gmSoapDefs.soap_cat2l10n_str[row['real_soap_cat']], when, gmTools.u_box_horiz_single * 5 ) soap = gmTools.wrap ( text = row['narrative'], width = 60, initial_indent = ' ', subsequent_indent = ' ' + left_margin ) row_ver = '' if row['row_version'] > 0: row_ver = 'v%s: ' % row['row_version'] bottom_row = '%s%s %s, %s%s %s' % ( ' ' * 40, gmTools.u_box_horiz_light_heavy, row['modified_by'], row_ver, row['modified_when'].strftime(date_format), gmTools.u_box_horiz_heavy_light ) lines.append(top_row) lines.append(soap) lines.append(bottom_row) eol_w_margin = '\n%s' % left_margin return left_margin + eol_w_margin.join(lines) + '\n' #-------------------------------------------------------- def format_maximum_information(self, patient=None): if patient is None: from Gnumed.business.gmPerson import gmCurrentPatient, cPatient if not gmCurrentPatient().connected: patient = cPatient(self._payload['pk_patient']) else: if self._payload['pk_patient'] == gmCurrentPatient().ID: patient = gmCurrentPatient() else: patient = cPatient(self._payload['pk_patient']) return self.format ( patient = patient, with_summary = True, with_codes = True, with_encounters = True, with_documents = True, with_hospital_stays = True, with_procedures = True, with_family_history = True, with_tests = False, # does not inform on the episode itself with_vaccinations = True, with_health_issue = True, return_list = True ) #-------------------------------------------------------- def __format_encounters(self, left_margin=0, emr=None): lines = [] encs = emr.get_encounters(episodes = [self._payload['pk_episode']]) if encs is None: return [_('Error retrieving encounters for this episode.')] if len(encs) == 0: #lines.append(_('There are no encounters for this issue.')) return [] first_encounter = emr.get_first_encounter(episode_id = self._payload['pk_episode']) last_encounter = emr.get_last_encounter(episode_id = self._payload['pk_episode']) lines.append(_('Last worked on: %s\n') % last_encounter['last_affirmed_original_tz'].strftime('%Y-%m-%d %H:%M')) if len(encs) < 4: line = _('%s encounter(s) (%s - %s):') else: line = _('1st and (up to 3) most recent (of %s) encounters (%s - %s):') lines.append(line % ( len(encs), first_encounter['started'].strftime('%m/%Y'), last_encounter['last_affirmed'].strftime('%m/%Y') )) lines.append(' %s - %s (%s):%s' % ( first_encounter['started_original_tz'].strftime('%Y-%m-%d %H:%M'), first_encounter['last_affirmed_original_tz'].strftime('%H:%M'), first_encounter['l10n_type'], gmTools.coalesce ( first_encounter['assessment_of_encounter'], gmTools.coalesce ( first_encounter['reason_for_encounter'], '', ' \u00BB%s\u00AB' + (' (%s)' % _('RFE')) ), ' \u00BB%s\u00AB' + (' (%s)' % _('AOE')) ) )) if len(encs) > 4: lines.append(_(' %s %s skipped %s') % ( gmTools.u_ellipsis, (len(encs) - 4), gmTools.u_ellipsis )) for enc in encs[1:][-3:]: lines.append(' %s - %s (%s):%s' % ( enc['started_original_tz'].strftime('%Y-%m-%d %H:%M'), enc['last_affirmed_original_tz'].strftime('%H:%M'), enc['l10n_type'], gmTools.coalesce ( enc['assessment_of_encounter'], gmTools.coalesce ( enc['reason_for_encounter'], '', ' \u00BB%s\u00AB' + (' (%s)' % _('RFE')) ), ' \u00BB%s\u00AB' + (' (%s)' % _('AOE')) ) )) # spell out last encounter if last_encounter: lines.append('') lines.append(_('Progress notes in most recent encounter:')) lines.extend(last_encounter.format_soap ( episodes = [ self._payload['pk_episode'] ], left_margin = left_margin, soap_cats = 'soapu', emr = emr )) return lines #-------------------------------------------------------- def format(self, left_margin=0, patient=None, with_summary=True, with_codes=True, with_encounters=True, with_documents=True, with_hospital_stays=True, with_procedures=True, with_family_history=True, with_tests=True, with_vaccinations=True, with_health_issue=False, return_list=False ): if patient: if patient.ID != self._payload['pk_patient']: msg = '<patient>.ID = %s but episode %s belongs to patient %s' % ( patient.ID, self._payload['pk_episode'], self._payload['pk_patient'] ) raise ValueError(msg) emr = patient.emr else: with_encounters = False with_documents = False with_hospital_stays = False with_procedures = False with_family_history = False with_tests = False with_vaccinations = False lines = [] # episode details lines.append (_('Episode %s%s%s [#%s]') % ( gmTools.u_left_double_angle_quote, self._payload['description'], gmTools.u_right_double_angle_quote, self._payload['pk_episode'] )) from Gnumed.business.gmEncounter import cEncounter enc = cEncounter(aPK_obj = self._payload['pk_encounter']) lines.append (' ' + _('Created during encounter: %s (%s - %s) [#%s]') % ( enc['l10n_type'], enc['started_original_tz'].strftime('%Y-%m-%d %H:%M'), enc['last_affirmed_original_tz'].strftime('%H:%M'), self._payload['pk_encounter'] )) if patient is not None: range_str, range_str_verb, duration_str = self.formatted_clinical_duration lines.append(_(' Duration: %s (%s)') % (duration_str, range_str_verb)) from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str lines.append(' ' + _('Status') + ': %s%s' % ( gmTools.bool2subst(self._payload['episode_open'], _('active'), _('finished')), gmTools.coalesce ( value2test = diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification']), return_instead = '', template4value = ', %s', none_equivalents = [None, ''] ) )) if with_health_issue: lines.append(' ' + _('Health issue') + ': %s' % gmTools.coalesce ( self._payload['health_issue'], _('none associated') )) if with_summary: if self._payload['summary'] is not None: lines.append(' %s:' % _('Synopsis')) lines.append(gmTools.wrap ( text = self._payload['summary'], width = 60, initial_indent = ' ', subsequent_indent = ' ' ) ) lines.append('') # encounters if with_encounters: lines.extend(self.__format_encounters(left_margin = left_margin, emr = emr)) # documents if with_documents: doc_folder = patient.get_document_folder() docs = doc_folder.get_documents ( pk_episodes = [ self._payload['pk_episode'] ] ) if len(docs) > 0: lines.append('') lines.append(_('Documents: %s') % len(docs)) for d in docs: lines.append(' ' + d.format(single_line = True)) del docs # hospitalizations if with_hospital_stays: stays = emr.get_hospital_stays(episodes = [ self._payload['pk_episode'] ]) if len(stays) > 0: lines.append('') lines.append(_('Hospitalizations: %s') % len(stays)) for s in stays: lines.append(s.format(left_margin = (left_margin + 1))) del stays # procedures if with_procedures: procs = emr.get_performed_procedures(episodes = [ self._payload['pk_episode'] ]) if len(procs) > 0: lines.append('') lines.append(_('Procedures performed: %s') % len(procs)) for p in procs: lines.append(p.format ( left_margin = (left_margin + 1), include_episode = False, include_codes = True )) del procs # family history if with_family_history: fhx = emr.get_family_history(episodes = [ self._payload['pk_episode'] ]) if len(fhx) > 0: lines.append('') lines.append(_('Family History: %s') % len(fhx)) for f in fhx: lines.append(f.format ( left_margin = (left_margin + 1), include_episode = False, include_comment = True, include_codes = True )) del fhx # test results if with_tests: tests = emr.get_test_results_by_date(episodes = [ self._payload['pk_episode'] ]) if len(tests) > 0: lines.append('') lines.append(_('Measurements and Results:')) for t in tests: lines.append(' ' + t.format_concisely(date_format = '%Y %b %d', with_notes = True)) del tests # vaccinations if with_vaccinations: vaccs = emr.get_vaccinations ( episodes = [ self._payload['pk_episode'] ], order_by = 'date_given DESC, vaccine' ) if len(vaccs) > 0: lines.append('') lines.append(_('Vaccinations:')) for vacc in vaccs: lines.extend(vacc.format ( with_indications = True, with_comment = True, with_reaction = True, date_format = '%Y-%m-%d' )) del vaccs lines = gmTools.strip_trailing_empty_lines(lines = lines, eol = '\n') if return_list: return lines left_margin = ' ' * left_margin eol_w_margin = '\n%s' % left_margin return left_margin + eol_w_margin.join(lines) + '\n' #-------------------------------------------------------- # properties #-------------------------------------------------------- def _get_best_guess_clinical_start_date(self): return get_best_guess_clinical_start_date_for_episode(pk_episode = self.pk_obj) best_guess_clinical_start_date = property(_get_best_guess_clinical_start_date) #-------------------------------------------------------- def _get_best_guess_clinical_end_date(self): return get_best_guess_clinical_end_date_for_episode(self.pk_obj) best_guess_clinical_end_date = property(_get_best_guess_clinical_end_date) #-------------------------------------------------------- def _get_formatted_clinical_duration(self): return format_clinical_duration_of_episode ( start = get_best_guess_clinical_start_date_for_episode(self.pk_obj), end = get_best_guess_clinical_end_date_for_episode(self.pk_obj) ) formatted_clinical_duration = property(_get_formatted_clinical_duration) #-------------------------------------------------------- def _get_latest_access_date(self): cmd = """SELECT MAX(latest) FROM ( -- last modification, latest = when last changed to the current state (SELECT c_epi.modified_when AS latest, 'clin.episode.modified_when'::text AS candidate FROM clin.episode c_epi WHERE c_epi.pk = %(pk)s) UNION ALL -- last modification of encounter in which created, latest = initial creation of that encounter -- DO NOT USE: just because one corrects a typo does not mean the episode took longer --(SELECT c_enc.modified_when AS latest FROM clin.encounter c_enc WHERE c_enc.pk = ( -- SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s --)) -- end of encounter in which created, latest = explicitly set -- DO NOT USE: we can retrospectively create episodes which -- DO NOT USE: are long since finished --(SELECT c_enc.last_affirmed AS latest FROM clin.encounter c_enc WHERE c_enc.pk = ( -- SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s --)) -- latest end of encounters of clinical items linked to this episode (SELECT MAX(last_affirmed) AS latest, 'clin.episode.pk = clin.clin_root_item,fk_episode -> .fk_encounter.last_affirmed'::text AS candidate FROM clin.encounter WHERE pk IN ( SELECT fk_encounter FROM clin.clin_root_item WHERE fk_episode = %(pk)s )) UNION ALL -- latest explicit .clin_when of clinical items linked to this episode (SELECT MAX(clin_when) AS latest, 'clin.episode.pk = clin.clin_root_item,fk_episode -> .clin_when'::text AS candidate FROM clin.clin_root_item WHERE fk_episode = %(pk)s ) -- latest modification time of clinical items linked to this episode -- this CAN be used since if an item is linked to an episode it can be -- assumed the episode (should have) existed at the time of creation -- DO NOT USE, because typo fixes should not extend the episode --(SELECT MIN(modified_when) AS latest FROM clin.clin_root_item WHERE fk_episode = %(pk)s) -- not sure about this one: -- .pk -> clin.clin_root_item.fk_encounter.modified_when ) AS candidates""" rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': {'pk': self.pk_obj}}]) return rows[0][0] latest_access_date = property(_get_latest_access_date) #-------------------------------------------------------- def _get_diagnostic_certainty_description(self): from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str return diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification']) diagnostic_certainty_description = property(_get_diagnostic_certainty_description) #-------------------------------------------------------- def _get_formatted_revision_history(self): cmd = """SELECT '<N/A>'::TEXT as audit__action_applied, NULL AS audit__action_when, '<N/A>'::TEXT AS audit__action_by, pk_audit, row_version, modified_when, modified_by, pk, fk_health_issue, description, is_open, fk_encounter, diagnostic_certainty_classification, summary FROM clin.episode WHERE pk = %(pk_episode)s UNION ALL ( SELECT audit_action as audit__action_applied, audit_when as audit__action_when, audit_by as audit__action_by, pk_audit, orig_version as row_version, orig_when as modified_when, orig_by as modified_by, pk, fk_health_issue, description, is_open, fk_encounter, diagnostic_certainty_classification, summary FROM audit.log_episode WHERE pk = %(pk_episode)s ) ORDER BY row_version DESC """ args = {'pk_episode': self.pk_obj} title = _('Episode: %s%s%s') % ( gmTools.u_left_double_angle_quote, self._payload['description'], gmTools.u_right_double_angle_quote ) return '\n'.join(self._get_revision_history(cmd, args, title)) formatted_revision_history = property(_get_formatted_revision_history) #-------------------------------------------------------- def _get_generic_codes(self): if len(self._payload['pk_generic_codes']) == 0: return [] cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)' args = {'pks': self._payload['pk_generic_codes']} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ] def _set_generic_codes(self, pk_codes): queries = [] # remove all codes if len(self._payload['pk_generic_codes']) > 0: queries.append ({ 'sql': 'DELETE FROM clin.lnk_code2episode WHERE fk_item = %(epi)s AND fk_generic_code = ANY(%(codes)s)', 'args': { 'epi': self._payload['pk_episode'], 'codes': self._payload['pk_generic_codes'] } }) # add new codes for pk_code in pk_codes: queries.append ({ 'sql': 'INSERT INTO clin.lnk_code2episode (fk_item, fk_generic_code) VALUES (%(epi)s, %(pk_code)s)', 'args': { 'epi': self._payload['pk_episode'], 'pk_code': pk_code } }) if len(queries) == 0: return # run it all in one transaction gmPG2.run_rw_queries(queries = queries) return generic_codes = property(_get_generic_codes, _set_generic_codes) #-------------------------------------------------------- def _get_has_narrative(self): cmd = """SELECT EXISTS ( SELECT 1 FROM clin.clin_narrative WHERE fk_episode = %(epi)s AND fk_encounter IN ( SELECT pk FROM clin.encounter WHERE fk_patient = %(pat)s ) )""" args = { 'pat': self._payload['pk_patient'], 'epi': self._payload['pk_episode'] } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return rows[0][0] has_narrative = property(_get_has_narrative) #-------------------------------------------------------- def _get_health_issue(self): if self._payload['pk_health_issue'] is None: return None from Gnumed.business.gmHealthIssue import cHealthIssue return cHealthIssue(self._payload['pk_health_issue']) health_issue = property(_get_health_issue)
Represents one clinical episode.
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Ancestors
Static methods
def from_problem(problem) ‑> cEpisode
-
Initialize episode from episody-type problem.
Instance variables
prop best_guess_clinical_end_date
-
Expand source code
def _get_best_guess_clinical_end_date(self): return get_best_guess_clinical_end_date_for_episode(self.pk_obj)
prop best_guess_clinical_start_date
-
Expand source code
def _get_best_guess_clinical_start_date(self): return get_best_guess_clinical_start_date_for_episode(pk_episode = self.pk_obj)
prop diagnostic_certainty_description
-
Expand source code
def _get_diagnostic_certainty_description(self): from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str return diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification'])
prop formatted_clinical_duration
-
Expand source code
def _get_formatted_clinical_duration(self): return format_clinical_duration_of_episode ( start = get_best_guess_clinical_start_date_for_episode(self.pk_obj), end = get_best_guess_clinical_end_date_for_episode(self.pk_obj) )
prop formatted_revision_history
-
Expand source code
def _get_formatted_revision_history(self): cmd = """SELECT '<N/A>'::TEXT as audit__action_applied, NULL AS audit__action_when, '<N/A>'::TEXT AS audit__action_by, pk_audit, row_version, modified_when, modified_by, pk, fk_health_issue, description, is_open, fk_encounter, diagnostic_certainty_classification, summary FROM clin.episode WHERE pk = %(pk_episode)s UNION ALL ( SELECT audit_action as audit__action_applied, audit_when as audit__action_when, audit_by as audit__action_by, pk_audit, orig_version as row_version, orig_when as modified_when, orig_by as modified_by, pk, fk_health_issue, description, is_open, fk_encounter, diagnostic_certainty_classification, summary FROM audit.log_episode WHERE pk = %(pk_episode)s ) ORDER BY row_version DESC """ args = {'pk_episode': self.pk_obj} title = _('Episode: %s%s%s') % ( gmTools.u_left_double_angle_quote, self._payload['description'], gmTools.u_right_double_angle_quote ) return '\n'.join(self._get_revision_history(cmd, args, title))
prop generic_codes
-
Expand source code
def _get_generic_codes(self): if len(self._payload['pk_generic_codes']) == 0: return [] cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)' args = {'pks': self._payload['pk_generic_codes']} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]
prop has_narrative
-
Expand source code
def _get_has_narrative(self): cmd = """SELECT EXISTS ( SELECT 1 FROM clin.clin_narrative WHERE fk_episode = %(epi)s AND fk_encounter IN ( SELECT pk FROM clin.encounter WHERE fk_patient = %(pat)s ) )""" args = { 'pat': self._payload['pk_patient'], 'epi': self._payload['pk_episode'] } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return rows[0][0]
prop health_issue
-
Expand source code
def _get_health_issue(self): if self._payload['pk_health_issue'] is None: return None from Gnumed.business.gmHealthIssue import cHealthIssue return cHealthIssue(self._payload['pk_health_issue'])
prop latest_access_date
-
Expand source code
def _get_latest_access_date(self): cmd = """SELECT MAX(latest) FROM ( -- last modification, latest = when last changed to the current state (SELECT c_epi.modified_when AS latest, 'clin.episode.modified_when'::text AS candidate FROM clin.episode c_epi WHERE c_epi.pk = %(pk)s) UNION ALL -- last modification of encounter in which created, latest = initial creation of that encounter -- DO NOT USE: just because one corrects a typo does not mean the episode took longer --(SELECT c_enc.modified_when AS latest FROM clin.encounter c_enc WHERE c_enc.pk = ( -- SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s --)) -- end of encounter in which created, latest = explicitly set -- DO NOT USE: we can retrospectively create episodes which -- DO NOT USE: are long since finished --(SELECT c_enc.last_affirmed AS latest FROM clin.encounter c_enc WHERE c_enc.pk = ( -- SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s --)) -- latest end of encounters of clinical items linked to this episode (SELECT MAX(last_affirmed) AS latest, 'clin.episode.pk = clin.clin_root_item,fk_episode -> .fk_encounter.last_affirmed'::text AS candidate FROM clin.encounter WHERE pk IN ( SELECT fk_encounter FROM clin.clin_root_item WHERE fk_episode = %(pk)s )) UNION ALL -- latest explicit .clin_when of clinical items linked to this episode (SELECT MAX(clin_when) AS latest, 'clin.episode.pk = clin.clin_root_item,fk_episode -> .clin_when'::text AS candidate FROM clin.clin_root_item WHERE fk_episode = %(pk)s ) -- latest modification time of clinical items linked to this episode -- this CAN be used since if an item is linked to an episode it can be -- assumed the episode (should have) existed at the time of creation -- DO NOT USE, because typo fixes should not extend the episode --(SELECT MIN(modified_when) AS latest FROM clin.clin_root_item WHERE fk_episode = %(pk)s) -- not sure about this one: -- .pk -> clin.clin_root_item.fk_encounter.modified_when ) AS candidates""" rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': {'pk': self.pk_obj}}]) return rows[0][0]
Methods
def add_code(self, pk_code=None)
-
Expand source code
def add_code(self, pk_code=None): """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)""" if pk_code in self._payload['pk_generic_codes']: return cmd = """ INSERT INTO clin.lnk_code2episode (fk_item, fk_generic_code) SELECT %(item)s, %(code)s WHERE NOT EXISTS ( SELECT 1 FROM clin.lnk_code2episode WHERE fk_item = %(item)s AND fk_generic_code = %(code)s )""" args = { 'item': self._payload['pk_episode'], 'code': pk_code } gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return
must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code) def format_as_journal(self, left_margin=0, date_format='%Y %b %d, %a')
-
Expand source code
def format_as_journal(self, left_margin=0, date_format='%Y %b %d, %a'): rows = gmClinNarrative.get_as_journal ( episodes = [self.pk_obj], order_by = 'pk_encounter, clin_when, scr, src_table' #order_by = u'pk_encounter, scr, clin_when, src_table' ) if len(rows) == 0: return '' lines = [] lines.append(_('Clinical data generated during encounters within this episode:')) left_margin = ' ' * left_margin prev_enc = None for row in rows: if row['pk_encounter'] != prev_enc: lines.append('') prev_enc = row['pk_encounter'] when = row['clin_when'].strftime(date_format) top_row = '%s%s %s (%s) %s' % ( gmTools.u_box_top_left_arc, gmTools.u_box_horiz_single, gmSoapDefs.soap_cat2l10n_str[row['real_soap_cat']], when, gmTools.u_box_horiz_single * 5 ) soap = gmTools.wrap ( text = row['narrative'], width = 60, initial_indent = ' ', subsequent_indent = ' ' + left_margin ) row_ver = '' if row['row_version'] > 0: row_ver = 'v%s: ' % row['row_version'] bottom_row = '%s%s %s, %s%s %s' % ( ' ' * 40, gmTools.u_box_horiz_light_heavy, row['modified_by'], row_ver, row['modified_when'].strftime(date_format), gmTools.u_box_horiz_heavy_light ) lines.append(top_row) lines.append(soap) lines.append(bottom_row) eol_w_margin = '\n%s' % left_margin return left_margin + eol_w_margin.join(lines) + '\n'
def format_maximum_information(self, patient=None)
-
Expand source code
def format_maximum_information(self, patient=None): if patient is None: from Gnumed.business.gmPerson import gmCurrentPatient, cPatient if not gmCurrentPatient().connected: patient = cPatient(self._payload['pk_patient']) else: if self._payload['pk_patient'] == gmCurrentPatient().ID: patient = gmCurrentPatient() else: patient = cPatient(self._payload['pk_patient']) return self.format ( patient = patient, with_summary = True, with_codes = True, with_encounters = True, with_documents = True, with_hospital_stays = True, with_procedures = True, with_family_history = True, with_tests = False, # does not inform on the episode itself with_vaccinations = True, with_health_issue = True, return_list = True )
def get_narrative(self, soap_cats=None, encounters=None, order_by=None)
-
Expand source code
def get_narrative(self, soap_cats=None, encounters=None, order_by = None): return gmClinNarrative.get_narrative ( soap_cats = soap_cats, encounters = encounters, episodes = [self.pk_obj], order_by = order_by )
def remove_code(self, pk_code=None)
-
Expand source code
def remove_code(self, pk_code=None): """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)""" cmd = "DELETE FROM clin.lnk_code2episode WHERE fk_item = %(item)s AND fk_generic_code = %(code)s" args = { 'item': self._payload['pk_episode'], 'code': pk_code } gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True
must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code) def rename(self, description=None)
-
Expand source code
def rename(self, description=None): """Method for episode editing, that is, episode renaming. @param description - the new descriptive name for the encounter @type description - a string instance """ # sanity check if description.strip() == '': _log.error('<description> must be a non-empty string instance') return False # update the episode description old_description = self._payload['description'] self._payload['description'] = description.strip() self._is_modified = True successful, data = self.save_payload() if not successful: _log.error('cannot rename episode [%s] to [%s]' % (self, description)) self._payload['description'] = old_description return False return True
Method for episode editing, that is, episode renaming.
@param description - the new descriptive name for the encounter @type description - a string instance
Inherited members
class cEpisodeMatchProvider
-
Expand source code
class cEpisodeMatchProvider(gmMatchProvider.cMatchProvider_SQL2): """Find episodes for patient.""" _SQL_episode_start = _SQL_best_guess_clinical_start_date_for_episode % {'pk': 'c_vpe.pk_episode'} _SQL_episode_end = _SQL_best_guess_clinical_end_date_for_episode % {'pk': 'c_vpe.pk_episode'} _SQL_open_episodes = """ SELECT c_vpe.pk_episode, c_vpe.description AS episode, c_vpe.health_issue, 1 AS rank, (%s) AS episode_start, NULL::timestamp with time zone AS episode_end FROM clin.v_pat_episodes c_vpe WHERE c_vpe.episode_open IS TRUE AND c_vpe.description %%(fragment_condition)s %%(ctxt_pat)s """ % _SQL_episode_start _SQL_closed_episodes = """ SELECT c_vpe.pk_episode, c_vpe.description AS episode, c_vpe.health_issue, 2 AS rank, (%s) AS episode_start, (%s) AS episode_end FROM clin.v_pat_episodes c_vpe WHERE c_vpe.episode_open IS FALSE AND c_vpe.description %%(fragment_condition)s %%(ctxt_pat)s """ % ( _SQL_episode_start, _SQL_episode_end ) #-------------------------------------------------------- def __init__(self): query = """ ( %s ) UNION ALL ( %s ) ORDER BY rank, episode LIMIT 30""" % ( cEpisodeMatchProvider._SQL_open_episodes, cEpisodeMatchProvider._SQL_closed_episodes ) ctxt = {'ctxt_pat': {'where_part': 'AND pk_patient = %(pat)s', 'placeholder': 'pat'}} super().__init__(queries = [query], context = ctxt) #-------------------------------------------------------- def _find_matches(self, fragment_condition): try: pat = self._context_vals['pat'] except KeyError: pat = None if not pat: # not patient, no search return (False, []) return super()._find_matches(fragment_condition) #-------------------------------------------------------- def _rows2matches(self, rows): matches = [] for row in rows: match = { 'weight': 0, 'data': row['pk_episode'] } label = '%s (%s)%s' % ( row['episode'], format_clinical_duration_of_episode ( start = row['episode_start'], end = row['episode_end'] )[0], gmTools.coalesce ( row['health_issue'], '', ' - %s' ) ) match['list_label'] = label match['field_label'] = label matches.append(match) return matches
Find episodes for patient.
Ancestors
Inherited members