Module Gnumed.business.gmHealthIssue

GNUmed health issue related business object.

license: GPL v2 or later

Functions

def create_health_issue(description=None, encounter=None, patient=None, link_obj=None)
Expand source code
def create_health_issue(description=None, encounter=None, patient=None, link_obj=None):
        """Creates a new health issue for a given patient.

        description - health issue name
        """
        try:
                h_issue = cHealthIssue(name = description, encounter = encounter, patient = patient)
                return h_issue

        except gmExceptions.NoSuchBusinessObjectError:
                pass

        queries = []
        cmd = "insert into clin.health_issue (description, fk_encounter) values (%(desc)s, %(enc)s)"
        queries.append({'sql': cmd, 'args': {'desc': description, 'enc': encounter}})
        cmd = "select currval('clin.health_issue_pk_seq')"
        queries.append({'sql': cmd})
        rows = gmPG2.run_rw_queries(queries = queries, return_data = True, link_obj = link_obj)
        h_issue = cHealthIssue(aPK_obj = rows[0][0])
        return h_issue

Creates a new health issue for a given patient.

description - health issue name

def delete_health_issue(health_issue=None)
Expand source code
def delete_health_issue(health_issue=None):
        if isinstance(health_issue, cHealthIssue):
                args = {'pk': health_issue['pk_health_issue']}
        else:
                args = {'pk': int(health_issue)}
        try:
                gmPG2.run_rw_queries(queries = [{'sql': 'DELETE FROM clin.health_issue WHERE pk = %(pk)s', 'args': args}])
        except gmPG2.dbapi.IntegrityError:
                # should be parsing pgcode/and or error message
                _log.exception('cannot delete health issue')
                return False

        return True
def diagnostic_certainty_classification2str(classification)
Expand source code
def diagnostic_certainty_classification2str(classification):

        global __diagnostic_certainty_classification_map

        if __diagnostic_certainty_classification_map is None:
                __diagnostic_certainty_classification_map = {
                        None: '',
                        'A': _('A: Sign'),
                        'B': _('B: Cluster of signs'),
                        'C': _('C: Syndromic diagnosis'),
                        'D': _('D: Scientific diagnosis')
                }

        try:
                return __diagnostic_certainty_classification_map[classification]
        except KeyError:
                return _('<%s>: unknown diagnostic certainty classification') % classification
def get_dummy_health_issue()
Expand source code
def get_dummy_health_issue():
        issue = {
                'pk_health_issue': None,
                'description': _('Unattributed episodes'),
                'age_noted': None,
                'laterality': 'na',
                'is_active': True,
                'clinically_relevant': True,
                'is_confidential': None,
                'is_cause_of_death': False,
                'is_dummy': True,
                'grouping': None
        }
        return issue

Classes

class cHealthIssue (aPK_obj=None, encounter=None, name='xxxDEFAULTxxx', patient=None, row=None)
Expand source code
class cHealthIssue(gmBusinessDBObject.cBusinessDBObject):
        """Represents one health issue."""

        _cmd_fetch_payload = "select * from clin.v_health_issues where pk_health_issue = %s"
        _cmds_store_payload = [
                """update clin.health_issue set
                                description = %(description)s,
                                summary = gm.nullify_empty_string(%(summary)s),
                                age_noted = %(age_noted)s,
                                laterality = gm.nullify_empty_string(%(laterality)s),
                                grouping = gm.nullify_empty_string(%(grouping)s),
                                diagnostic_certainty_classification = gm.nullify_empty_string(%(diagnostic_certainty_classification)s),
                                is_active = %(is_active)s,
                                clinically_relevant = %(clinically_relevant)s,
                                is_confidential = %(is_confidential)s,
                                is_cause_of_death = %(is_cause_of_death)s
                        WHERE
                                pk = %(pk_health_issue)s
                                        AND
                                xmin = %(xmin_health_issue)s""",
                "select xmin as xmin_health_issue from clin.health_issue where pk = %(pk_health_issue)s"
        ]
        _updatable_fields = [
                'description',
                'summary',
                'grouping',
                'age_noted',
                'laterality',
                'is_active',
                'clinically_relevant',
                'is_confidential',
                'is_cause_of_death',
                'diagnostic_certainty_classification'
        ]

        #--------------------------------------------------------
        def __init__(self, aPK_obj=None, encounter=None, name='xxxDEFAULTxxx', patient=None, row=None):
                pk = aPK_obj

                if (pk is not None) or (row is not None):
                        gmBusinessDBObject.cBusinessDBObject.__init__(self, aPK_obj=pk, row=row)
                        return

                if patient is None:
                        cmd = """select *, xmin_health_issue from clin.v_health_issues
                                        where
                                                description = %(desc)s
                                                        and
                                                pk_patient = (select fk_patient from clin.encounter where pk = %(enc)s)"""
                else:
                        cmd = """select *, xmin_health_issue from clin.v_health_issues
                                        where
                                                description = %(desc)s
                                                        and
                                                pk_patient = %(pat)s"""

                queries = [{'sql': cmd, 'args': {'enc': encounter, 'desc': name, 'pat': patient}}]
                rows = gmPG2.run_ro_queries(queries = queries)

                if len(rows) == 0:
                        raise gmExceptions.NoSuchBusinessObjectError('no health issue for [enc:%s::desc:%s::pat:%s]' % (encounter, name, patient))

                pk = rows[0][0]
                r = {'data': rows[0], 'pk_field': 'pk_health_issue'}

                gmBusinessDBObject.cBusinessDBObject.__init__(self, row=r)

        #--------------------------------------------------------
        # external API
        #--------------------------------------------------------
        @classmethod
        def from_problem(cls, problem) -> 'cHealthIssue':
                """Initialize health issue from issue-type problem."""
                if isinstance(problem, cHealthIssue):
                        return problem

                assert problem['type'] == 'issue', 'cannot convert [%s] to health issue' % problem
                return cls(aPK_obj = problem['pk_health_issue'])

        #--------------------------------------------------------
        def rename(self, description:str=None) -> bool:
                """Rename health issue.

                Args:
                        description: the new descriptive name for the issue
                """
                if description.strip() == '':
                        return False

                # update the issue description
                old_description = self._payload['description']
                self._payload['description'] = description.strip()
                self._is_modified = True
                successful, data = self.save_payload()
                if not successful:
                        _log.error('cannot rename health issue [%s] with [%s]' % (self, description))
                        self._payload['description'] = old_description
                        return False

                return True

        #--------------------------------------------------------
        def get_episodes(self) -> list[gmEpisode.cEpisode]:
                """The episodes linked to this health issue."""
                SQL = 'SELECT * FROM clin.v_pat_episodes WHERE pk_health_issue = %(pk)s'
                args = {'pk': self.pk_obj}
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                return [ gmEpisode.cEpisode(row = {'data': r, 'pk_field': 'pk_episode'})  for r in rows ]

        episodes = property(get_episodes)

        #--------------------------------------------------------
        def close_expired_episode(self, ttl:int=180) -> bool:
                """Close open episode if "older" than the Time To Live.

                Args:
                        ttl: maximum "age" of episode in days
                """
                open_episode = self.open_episode
                if open_episode is None:
                        return True

                #clinical_end = open_episode.best_guess_clinical_end_date
                clinical_end = open_episode.latest_access_date          # :-/
                ttl = datetime.timedelta(ttl)
                now = datetime.datetime.now(tz = clinical_end.tzinfo)
                if (clinical_end + ttl) > now:
                        return False

                open_episode['episode_open'] = False
                success, data = open_episode.save_payload()
                if success:
                        return True

                return False

        #--------------------------------------------------------
        def close_episode(self) -> bool:
                """Unconditionally close the open episode, if any."""
                open_episode = self.get_open_episode()
                open_episode['episode_open'] = False
                success_state, data = open_episode.save_payload()
                return success_state

        #--------------------------------------------------------
        def get_open_episode(self) -> gmEpisode.cEpisode:
                SQL = "select pk FROM clin.episode WHERE fk_health_issue = %(pk_issue)s AND is_open IS True LIMIT 1"
                args = {'pk_issue': self.pk_obj}
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                if rows:
                        return gmEpisode.cEpisode(aPK_obj = rows[0][0])

                return None

        open_episode = property(get_open_episode)

        #--------------------------------------------------------
        def age_noted_human_readable(self):
                if self._payload['age_noted'] is None:
                        return '<???>'

                # since we've already got an interval we are bound to use it,
                # further transformation will only introduce more errors,
                # later we can improve this deeper inside
                return gmDateTime.format_interval_medically(self._payload['age_noted'])

        #--------------------------------------------------------
        def add_code(self, pk_code=None):
                """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
                cmd = "INSERT INTO clin.lnk_code2h_issue (fk_item, fk_generic_code) values (%(item)s, %(code)s)"
                args = {
                        'item': self._payload['pk_health_issue'],
                        'code': pk_code
                }
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return True

        #--------------------------------------------------------
        def remove_code(self, pk_code=None):
                """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
                cmd = "DELETE FROM clin.lnk_code2h_issue WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
                args = {
                        'item': self._payload['pk_health_issue'],
                        'code': pk_code
                }
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return True

        #--------------------------------------------------------
        def format_as_journal(self, left_margin=0, date_format='%Y %b %d, %a'):
                rows = gmClinNarrative.get_as_journal (
                        issues = [self.pk_obj],
                        order_by = 'pk_episode, pk_encounter, clin_when, scr, src_table'
                )

                if len(rows) == 0:
                        return ''

                left_margin = ' ' * left_margin

                lines = []
                lines.append(_('Clinical data generated during encounters under this health issue:'))

                prev_epi = None
                for row in rows:
                        if row['pk_episode'] != prev_epi:
                                lines.append('')
                                prev_epi = row['pk_episode']

                        top_row = '%s%s %s (%s) %s' % (
                                gmTools.u_box_top_left_arc,
                                gmTools.u_box_horiz_single,
                                gmSoapDefs.soap_cat2l10n_str[row['real_soap_cat']],
                                row['clin_when'].strftime(date_format),
                                gmTools.u_box_horiz_single * 5
                        )
                        soap = gmTools.wrap (
                                text = row['narrative'],
                                width = 60,
                                initial_indent = '  ',
                                subsequent_indent = '  ' + left_margin
                        )
                        row_ver = ''
                        if row['row_version'] > 0:
                                row_ver = 'v%s: ' % row['row_version']
                        bottom_row = '%s%s %s, %s%s %s' % (
                                ' ' * 40,
                                gmTools.u_box_horiz_light_heavy,
                                row['modified_by'],
                                row_ver,
                                row['modified_when'].strftime(date_format),
                                gmTools.u_box_horiz_heavy_light
                        )

                        lines.append(top_row)
                        lines.append(soap)
                        lines.append(bottom_row)

                eol_w_margin = '\n%s' % left_margin
                return left_margin + eol_w_margin.join(lines) + '\n'

        #--------------------------------------------------------
        def __format_episodes_for_clinical_data(self, emr) -> list:
                epis = self.get_episodes()
                if epis is None:
                        return [_('Error retrieving episodes for this health issue.')]

                if len(epis) == 0:
                        return [_('There are no episodes for this health issue.')]

                line = _('Episodes: %s (most recent: %s%s%s)') % (
                        len(epis),
                        gmTools.u_left_double_angle_quote,
                        emr.get_most_recent_episode(issue = self._payload['pk_health_issue'])['description'],
                        gmTools.u_right_double_angle_quote
                )
                lines = [line]
                for epi in epis:
                        lines.append(' \u00BB%s\u00AB (%s)' % (
                                epi['description'],
                                gmTools.bool2subst(epi['episode_open'], _('ongoing'), _('closed'))
                        ))
                lines.append('')
                return lines

        #--------------------------------------------------------
        def __format_encounters_for_clinical_data(self, emr) -> list:
                first_encounter = emr.get_first_encounter(issue_id = self._payload['pk_health_issue'])
                if not first_encounter:
                        return [_('No encounters found for this health issue.')]

                last_encounter = emr.get_last_encounter(issue_id = self._payload['pk_health_issue'])
                encs = emr.get_encounters(issues = [self._payload['pk_health_issue']])
                lines = []
                line = _('Encounters: %s (%s - %s):') % (
                        len(encs),
                        first_encounter['started_original_tz'].strftime('%m/%Y'),
                        last_encounter['last_affirmed_original_tz'].strftime('%m/%Y')
                )
                lines.append(line)
                line = _(' Most recent: %s - %s') % (
                        last_encounter['started_original_tz'].strftime('%Y-%m-%d %H:%M'),
                        last_encounter['last_affirmed_original_tz'].strftime('%H:%M')
                )
                lines.append(line)
                return lines

        #--------------------------------------------------------
        def __format_clinical_data(self, left_margin=0, patient=None,
                with_episodes=True,
                with_encounters=True,
                with_medications=True,
                with_hospital_stays=True,
                with_procedures=True,
                with_family_history=True,
                with_documents=True,
                with_tests=True,
                with_vaccinations=True
        ):
                if not patient:
                        return []

                if patient.ID != self._payload['pk_patient']:
                        msg = '<patient>.ID = %s but health issue %s belongs to patient %s' % (
                                patient.ID,
                                self._payload['pk_health_issue'],
                                self._payload['pk_patient']
                        )
                        raise ValueError(msg)

                emr = patient.emr
                lines = []
                # episodes
                if with_episodes:
                        lines.extend(self.__format_episodes_for_clinical_data(emr))
                # encounters
                if with_encounters:
                        lines.extend(self.__format_encounters_for_clinical_data(emr))

                # medications
                if with_medications:
                        meds = emr.get_current_medications (
                                issues = [ self._payload['pk_health_issue'] ],
                                order_by = 'discontinued DESC NULLS FIRST, started, substance'
                        )
                        if len(meds) > 0:
                                lines.append('')
                                lines.append(_('Medications and Substances'))
                        for m in meds:
                                lines.append(m.format(left_margin = (left_margin + 1)))
                        del meds

                # hospitalizations
                if with_hospital_stays:
                        stays = emr.get_hospital_stays (
                                issues = [ self._payload['pk_health_issue'] ]
                        )
                        if len(stays) > 0:
                                lines.append('')
                                lines.append(_('Hospitalizations: %s') % len(stays))
                        for s in stays:
                                lines.append(s.format(left_margin = (left_margin + 1)))
                        del stays

                # procedures
                if with_procedures:
                        procs = emr.get_performed_procedures (
                                issues = [ self._payload['pk_health_issue'] ]
                        )
                        if len(procs) > 0:
                                lines.append('')
                                lines.append(_('Procedures performed: %s') % len(procs))
                        for p in procs:
                                lines.append(p.format(left_margin = (left_margin + 1)))
                        del procs

                # family history
                if with_family_history:
                        fhx = emr.get_family_history(issues = [ self._payload['pk_health_issue'] ])
                        if len(fhx) > 0:
                                lines.append('')
                                lines.append(_('Family History: %s') % len(fhx))
                        for f in fhx:
                                lines.append(f.format (
                                        left_margin = (left_margin + 1),
                                        include_episode = True,
                                        include_comment = True,
                                        include_codes = False
                                ))
                        del fhx

                epis = self.get_episodes()
                if epis:
                        epi_pks = [ e['pk_episode'] for e in epis ]

                        # documents
                        if with_documents:
                                doc_folder = patient.get_document_folder()
                                docs = doc_folder.get_documents(pk_episodes = epi_pks)
                                if len(docs) > 0:
                                        lines.append('')
                                        lines.append(_('Documents: %s') % len(docs))
                                del docs

                        # test results
                        if with_tests:
                                tests = emr.get_test_results_by_date(episodes = epi_pks)
                                if len(tests) > 0:
                                        lines.append('')
                                        lines.append(_('Measurements and Results: %s') % len(tests))
                                del tests

                        # vaccinations
                        if with_vaccinations:
                                vaccs = emr.get_vaccinations(episodes = epi_pks, order_by = 'date_given, vaccine')
                                if len(vaccs) > 0:
                                        lines.append('')
                                        lines.append(_('Vaccinations:'))
                                for vacc in vaccs:
                                        lines.extend(vacc.format(with_reaction = True))
                                del vaccs
                del epis
                return lines

        #--------------------------------------------------------
        def format(self, left_margin=0, patient=None,
                with_summary=True,
                with_codes=True,
                with_episodes=True,
                with_encounters=True,
                with_medications=True,
                with_hospital_stays=True,
                with_procedures=True,
                with_family_history=True,
                with_documents=True,
                with_tests=True,
                with_vaccinations=True,
                with_external_care=True
        ):

                lines = []

                lines.append(_('Health Issue %s%s%s%s   [#%s]') % (
                        '\u00BB',
                        self._payload['description'],
                        '\u00AB',
                        gmTools.coalesce (
                                value2test = self.laterality_description,
                                return_instead = '',
                                template4value = ' (%s)',
                                none_equivalents = [None, '', '?']
                        ),
                        self._payload['pk_health_issue']
                ))

                if self._payload['is_confidential']:
                        lines.append('')
                        lines.append(_(' ***** CONFIDENTIAL *****'))
                        lines.append('')

                if self._payload['is_cause_of_death']:
                        lines.append('')
                        lines.append(_(' contributed to death of patient'))
                        lines.append('')

                from Gnumed.business.gmEncounter import cEncounter
                enc = cEncounter(aPK_obj = self._payload['pk_encounter'])
                lines.append (_(' Created during encounter: %s (%s - %s)   [#%s]') % (
                        enc['l10n_type'],
                        enc['started_original_tz'].strftime('%Y-%m-%d %H:%M'),
                        enc['last_affirmed_original_tz'].strftime('%H:%M'),
                        self._payload['pk_encounter']
                ))

                if self._payload['age_noted'] is not None:
                        lines.append(_(' Noted at age: %s') % self.age_noted_human_readable())

                lines.append(' ' + _('Status') + ': %s, %s%s' % (
                        gmTools.bool2subst(self._payload['is_active'], _('active'), _('inactive')),
                        gmTools.bool2subst(self._payload['clinically_relevant'], _('clinically relevant'), _('not clinically relevant')),
                        gmTools.coalesce (
                                value2test = diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification']),
                                return_instead = '',
                                template4value = ', %s',
                                none_equivalents = [None, '']
                        )
                ))

                if with_summary:
                        if self._payload['summary'] is not None:
                                lines.append(' %s:' % _('Synopsis'))
                                lines.append(gmTools.wrap (
                                        text = self._payload['summary'],
                                        width = 60,
                                        initial_indent = '  ',
                                        subsequent_indent = '  '
                                ))
                lines.append('')
                # patient/emr dependant
                lines.extend(self.__format_clinical_data (
                        left_margin,
                        patient,
                        with_episodes = with_episodes,
                        with_encounters = with_encounters,
                        with_medications = with_medications,
                        with_hospital_stays = with_hospital_stays,
                        with_procedures = with_procedures,
                        with_family_history = with_family_history,
                        with_documents = with_documents,
                        with_tests = with_tests,
                        with_vaccinations = with_vaccinations
                ))
                if with_external_care:
                        care = self._get_external_care(order_by = 'organization, unit, provider')
                        if care:
                                lines.append('')
                                lines.append(_('External care:'))
                        for item in care:
                                lines.append(' %s%s@%s%s' % (
                                        gmTools.coalesce(item['provider'], '', '%s: '),
                                        item['unit'],
                                        item['organization'],
                                        gmTools.coalesce(item['comment'], '', ' (%s)')
                                ))

                left_margin = ' ' * left_margin
                eol_w_margin = '\n%s' % left_margin
                lines = gmTools.strip_trailing_empty_lines(lines = lines, eol = '\n')
                return left_margin + eol_w_margin.join(lines) + '\n'

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def _get_encounter(self):
                from Gnumed.business.gmEncounter import cEncounter
                return cEncounter(aPK_obj = self._payload['pk_encounter'])

        encounter = property(_get_encounter)

        #--------------------------------------------------------
        def _get_external_care(self, order_by=None):
                return gmExternalCare.get_external_care_items(pk_health_issue = self.pk_obj, order_by = order_by)

        external_care = property(_get_external_care)

        #--------------------------------------------------------
        def _get_first_episode(self):

                args = {'pk_issue': self.pk_obj}

                cmd = """SELECT
                        earliest, pk_episode
                FROM (
                                -- .modified_when of all episodes of this issue,
                                -- earliest-possible thereof = when created,
                                -- should actually go all the way back into audit.log_episode
                                (SELECT
                                        c_epi.modified_when AS earliest,
                                        c_epi.pk AS pk_episode
                                 FROM clin.episode c_epi
                                 WHERE c_epi.fk_health_issue = %(pk_issue)s
                                )
                        UNION ALL

                                -- last modification of encounter in which episodes of this issue were created,
                                -- earliest-possible thereof = initial creation of that encounter
                                (SELECT
                                        c_enc.modified_when AS earliest,
                                        c_epi.pk AS pk_episode
                                 FROM
                                        clin.episode c_epi
                                                INNER JOIN clin.encounter c_enc ON (c_enc.pk = c_epi.fk_encounter)
                                                INNER JOIN clin.health_issue c_hi ON (c_hi.pk = c_epi.fk_health_issue)
                                 WHERE c_hi.pk = %(pk_issue)s
                                )
                        UNION ALL

                                -- start of encounter in which episodes of this issue were created,
                                -- earliest-possible thereof = set by user
                                (SELECT
                                        c_enc.started AS earliest,
                                        c_epi.pk AS pk_episode
                                 FROM
                                        clin.episode c_epi
                                                INNER JOIN clin.encounter c_enc ON (c_enc.pk = c_epi.fk_encounter)
                                                INNER JOIN clin.health_issue c_hi ON (c_hi.pk = c_epi.fk_health_issue)
                                 WHERE c_hi.pk = %(pk_issue)s
                                )
                        UNION ALL

                                -- start of encounters of clinical items linked to episodes of this issue,
                                -- earliest-possible thereof = explicitly set by user
                                (SELECT
                                        c_enc.started AS earliest,
                                        c_epi.pk AS pk_episode
                                 FROM
                                        clin.clin_root_item c_cri
                                                INNER JOIN clin.encounter c_enc ON (c_cri.fk_encounter = c_enc.pk)
                                                INNER JOIN clin.episode c_epi ON (c_cri.fk_episode = c_epi.pk)
                                                        INNER JOIN clin.health_issue c_hi ON (c_epi.fk_health_issue = c_hi.pk)
                                 WHERE c_hi.pk = %(pk_issue)s
                                )
                        UNION ALL

                                -- .clin_when of clinical items linked to episodes of this issue,
                                -- earliest-possible thereof = explicitly set by user
                                (SELECT
                                        c_cri.clin_when AS earliest,
                                        c_epi.pk AS pk_episode
                                 FROM
                                        clin.clin_root_item c_cri
                                                INNER JOIN clin.episode c_epi ON (c_cri.fk_episode = c_epi.pk)
                                                        INNER JOIN clin.health_issue c_hi ON (c_epi.fk_health_issue = c_hi.pk)
                                 WHERE c_hi.pk = %(pk_issue)s
                                )
                        UNION ALL

                                -- earliest modification time of clinical items linked to episodes of this issue
                                -- this CAN be used since if an item is linked to an episode it can be
                                -- assumed the episode (should have) existed at the time of creation
                                (SELECT
                                        c_cri.modified_when AS earliest,
                                        c_epi.pk AS pk_episode
                                 FROM
                                        clin.clin_root_item c_cri
                                                INNER JOIN clin.episode c_epi ON (c_cri.fk_episode = c_epi.pk)
                                                        INNER JOIN clin.health_issue c_hi ON (c_epi.fk_health_issue = c_hi.pk)
                                 WHERE c_hi.pk = %(pk_issue)s
                                )
                        UNION ALL

                                -- there may not be items, but there may still be documents ...
                                (SELECT
                                        b_dm.clin_when AS earliest,
                                        c_epi.pk AS pk_episode
                                 FROM
                                        blobs.doc_med b_dm
                                                INNER JOIN clin.episode c_epi ON (b_dm.fk_episode = c_epi.pk)
                                                        INNER JOIN clin.health_issue c_hi ON (c_epi.fk_health_issue = c_hi.pk)
                                  WHERE c_hi.pk = %(pk_issue)s
                                )
                ) AS candidates
                ORDER BY earliest NULLS LAST
                LIMIT 1"""
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                if len(rows) == 0:
                        return None

                return gmEpisode.cEpisode(aPK_obj = rows[0]['pk_episode'])

        first_episode = property(_get_first_episode)

        #--------------------------------------------------------
        def _get_latest_episode(self):

                # explicit always wins:
                if self._payload['has_open_episode']:
                        return self.open_episode

                args = {'pk_issue': self.pk_obj}

                # cheap query first: any episodes at all ?
                cmd = "SELECT 1 FROM clin.episode WHERE fk_health_issue = %(pk_issue)s"
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                if len(rows) == 0:
                        return None

                cmd = """SELECT
                        latest, pk_episode
                FROM (
                                -- .clin_when of clinical items linked to episodes of this issue,
                                -- latest-possible thereof = explicitly set by user
                                (SELECT
                                        c_cri.clin_when AS latest,
                                        c_epi.pk AS pk_episode,
                                        1 AS rank
                                 FROM
                                        clin.clin_root_item c_cri
                                                INNER JOIN clin.episode c_epi ON (c_cri.fk_episode = c_epi.pk)
                                                        INNER JOIN clin.health_issue c_hi ON (c_epi.fk_health_issue = c_hi.pk)
                                 WHERE c_hi.pk = %(pk_issue)s
                                )
                        UNION ALL

                                -- .clin_when of documents linked to episodes of this issue
                                (SELECT
                                        b_dm.clin_when AS latest,
                                        c_epi.pk AS pk_episode,
                                        1 AS rank
                                 FROM
                                        blobs.doc_med b_dm
                                                INNER JOIN clin.episode c_epi ON (b_dm.fk_episode = c_epi.pk)
                                                        INNER JOIN clin.health_issue c_hi ON (c_epi.fk_health_issue = c_hi.pk)
                                 WHERE c_hi.pk = %(pk_issue)s
                                )
                        UNION ALL

                                -- last_affirmed of encounter in which episodes of this issue were created,
                                -- earliest-possible thereof = set by user
                                (SELECT
                                        c_enc.last_affirmed AS latest,
                                        c_epi.pk AS pk_episode,
                                        2 AS rank
                                 FROM
                                        clin.episode c_epi
                                                INNER JOIN clin.encounter c_enc ON (c_enc.pk = c_epi.fk_encounter)
                                                INNER JOIN clin.health_issue c_hi ON (c_hi.pk = c_epi.fk_health_issue)
                                 WHERE c_hi.pk = %(pk_issue)s
                                )

                ) AS candidates
                WHERE
                        -- weed out NULL rows due to episodes w/o clinical items and w/o documents
                        latest IS NOT NULL
                ORDER BY
                        rank,
                        latest DESC
                LIMIT 1
                """
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                if len(rows) == 0:
                        # there were no episodes for this issue
                        return None

                return gmEpisode.cEpisode(aPK_obj = rows[0]['pk_episode'])

        latest_episode = property(_get_latest_episode)

        #--------------------------------------------------------
        # Steffi suggested we divide into safe and assumed (= possible) start dates
        def _get_safe_start_date(self):
                """This returns the date when we can assume to safely KNOW
                   the health issue existed (because the provider said so)."""

                args = {
                        'enc': self._payload['pk_encounter'],
                        'pk': self._payload['pk_health_issue']
                }
                cmd = """SELECT COALESCE (
                        -- this one must override all:
                        -- .age_noted if not null and DOB is known
                        (CASE
                                WHEN c_hi.age_noted IS NULL
                                THEN NULL::timestamp with time zone
                                WHEN
                                        (SELECT d_i.dob FROM dem.identity d_i WHERE d_i.pk = (
                                                SELECT c_enc.fk_patient FROM clin.encounter c_enc WHERE c_enc.pk = %(enc)s
                                        )) IS NULL
                                THEN NULL::timestamp with time zone
                                ELSE
                                        c_hi.age_noted + (
                                                SELECT d_i.dob FROM dem.identity d_i WHERE d_i.pk = (
                                                        SELECT c_enc.fk_patient FROM clin.encounter c_enc WHERE c_enc.pk = %(enc)s
                                                )
                                        )
                        END),

                        -- look at best_guess_clinical_start_date of all linked episodes

                        -- start of encounter in which created, earliest = explicitly set
                        (SELECT c_enc.started AS earliest FROM clin.encounter c_enc WHERE c_enc.pk = c_hi.fk_encounter)
                )
                FROM clin.health_issue c_hi
                WHERE c_hi.pk = %(pk)s"""
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                start = rows[0][0]
                # leads to a loop:
                #end = self.clinical_end_date
                #if start > end:
                #       return end
                return start

        safe_start_date = property(_get_safe_start_date)

        #--------------------------------------------------------
        def _get_possible_start_date(self):
                args = {'pk': self._payload['pk_health_issue']}
                cmd = """
SELECT MIN(earliest) FROM (
        -- last modification, earliest = when created in/changed to the current state
        (SELECT modified_when AS earliest FROM clin.health_issue WHERE pk = %(pk)s)

UNION ALL
        -- last modification of encounter in which created, earliest = initial creation of that encounter
        (SELECT c_enc.modified_when AS earliest FROM clin.encounter c_enc WHERE c_enc.pk = (
                SELECT c_hi.fk_encounter FROM clin.health_issue c_hi WHERE c_hi.pk = %(pk)s
        ))

UNION ALL
        -- earliest explicit .clin_when of clinical items linked to this health_issue
        (SELECT MIN(c_vpi.clin_when) AS earliest FROM clin.v_pat_items c_vpi WHERE c_vpi.pk_health_issue = %(pk)s)

UNION ALL
        -- earliest modification time of clinical items linked to this health issue
        -- this CAN be used since if an item is linked to a health issue it can be
        -- assumed the health issue (should have) existed at the time of creation
        (SELECT MIN(c_vpi.modified_when) AS earliest FROM clin.v_pat_items c_vpi WHERE c_vpi.pk_health_issue = %(pk)s)

UNION ALL
        -- earliest start of encounters of clinical items linked to this episode
        (SELECT MIN(c_enc.started) AS earliest FROM clin.encounter c_enc WHERE c_enc.pk IN (
                SELECT c_vpi.pk_encounter FROM clin.v_pat_items c_vpi WHERE c_vpi.pk_health_issue = %(pk)s
        ))

-- here we should be looking at
-- .best_guess_clinical_start_date of all episodes linked to this encounter

) AS candidates"""

                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return rows[0][0]

        possible_start_date = property(_get_possible_start_date)

        #--------------------------------------------------------
        def _get_clinical_end_date(self):
                if self._payload['is_active']:
                        return None
                if self._payload['has_open_episode']:
                        return None
                latest_episode = self.latest_episode
                if latest_episode is not None:
                        return latest_episode.best_guess_clinical_end_date
                # apparently, there are no episodes for this issue
                # and the issue is not active either
                # so, we simply do not know, the safest assumption is:
                return self.safe_start_date

        clinical_end_date = property(_get_clinical_end_date)

        #--------------------------------------------------------
        def _get_latest_access_date(self):
                cmd = """
SELECT
        MAX(latest)
FROM (
        -- last modification, latest = when last changed to the current state
        -- DO NOT USE: database upgrades may change this field
        (SELECT modified_when AS latest FROM clin.health_issue WHERE pk = %(pk)s)

        --UNION ALL
        -- last modification of encounter in which created, latest = initial creation of that encounter
        -- DO NOT USE: just because one corrects a typo does not mean the issue took any longer
        --(SELECT c_enc.modified_when AS latest FROM clin.encounter c_enc WHERE c_enc.pk = (
        --      SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s
        -- )
        --)

        --UNION ALL
        -- end of encounter in which created, latest = explicitly set
        -- DO NOT USE: we can retrospectively create issues which
        -- DO NOT USE: are long since finished
        --(SELECT c_enc.last_affirmed AS latest FROM clin.encounter c_enc WHERE c_enc.pk = (
        --      SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s
        -- )
        --)

        UNION ALL
        -- latest end of encounters of clinical items linked to this issue
        (SELECT
                MAX(last_affirmed) AS latest
         FROM clin.encounter
         WHERE pk IN (
                SELECT pk_encounter FROM clin.v_pat_items WHERE pk_health_issue = %(pk)s
         )
        )

        UNION ALL
        -- latest explicit .clin_when of clinical items linked to this issue
        (SELECT
                MAX(clin_when) AS latest
         FROM clin.v_pat_items
         WHERE pk_health_issue = %(pk)s
        )

        -- latest modification time of clinical items linked to this issue
        -- this CAN be used since if an item is linked to an issue it can be
        -- assumed the issue (should have) existed at the time of modification
        -- DO NOT USE, because typo fixes should not extend the issue
        --(SELECT MIN(modified_when) AS latest FROM clin.clin_root_item WHERE fk_episode = %(pk)s)

) AS candidates"""
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': {'pk': self.pk_obj}}])
                return rows[0][0]

        latest_access_date = property(_get_latest_access_date)

        #--------------------------------------------------------
        def _get_laterality_description(self):
                try:
                        return laterality2str[self._payload['laterality']]
                except KeyError:
                        return '<?>'

        laterality_description = property(_get_laterality_description)

        #--------------------------------------------------------
        def _get_diagnostic_certainty_description(self):
                return diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification'])

        diagnostic_certainty_description = property(_get_diagnostic_certainty_description)

        #--------------------------------------------------------
        def _get_formatted_revision_history(self):
                cmd = """SELECT
                                '<N/A>'::TEXT as audit__action_applied,
                                NULL AS audit__action_when,
                                '<N/A>'::TEXT AS audit__action_by,
                                pk_audit,
                                row_version,
                                modified_when,
                                modified_by,
                                pk,
                                description,
                                laterality,
                                age_noted,
                                is_active,
                                clinically_relevant,
                                is_confidential,
                                is_cause_of_death,
                                fk_encounter,
                                grouping,
                                diagnostic_certainty_classification,
                                summary
                        FROM clin.health_issue
                        WHERE pk = %(pk_health_issue)s
                UNION ALL (
                        SELECT
                                audit_action as audit__action_applied,
                                audit_when as audit__action_when,
                                audit_by as audit__action_by,
                                pk_audit,
                                orig_version as row_version,
                                orig_when as modified_when,
                                orig_by as modified_by,
                                pk,
                                description,
                                laterality,
                                age_noted,
                                is_active,
                                clinically_relevant,
                                is_confidential,
                                is_cause_of_death,
                                fk_encounter,
                                grouping,
                                diagnostic_certainty_classification,
                                summary
                        FROM audit.log_health_issue
                        WHERE pk = %(pk_health_issue)s
                )
                ORDER BY row_version DESC
                """
                args = {'pk_health_issue': self.pk_obj}
                title = _('Health issue: %s%s%s') % (
                        gmTools.u_left_double_angle_quote,
                        self._payload['description'],
                        gmTools.u_right_double_angle_quote
                )
                return '\n'.join(self._get_revision_history(cmd, args, title))

        formatted_revision_history = property(_get_formatted_revision_history)

        #--------------------------------------------------------
        def _get_generic_codes(self):
                if len(self._payload['pk_generic_codes']) == 0:
                        return []

                cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
                args = {'pks': self._payload['pk_generic_codes']}
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]

        def _set_generic_codes(self, pk_codes):
                queries = []
                # remove all codes
                if len(self._payload['pk_generic_codes']) > 0:
                        queries.append ({
                                'sql': 'DELETE FROM clin.lnk_code2h_issue WHERE fk_item = %(issue)s AND fk_generic_code = ANY(%(codes)s)',
                                'args': {
                                        'issue': self._payload['pk_health_issue'],
                                        'codes': self._payload['pk_generic_codes']
                                }
                        })
                # add new codes
                for pk_code in pk_codes:
                        queries.append ({
                                'sql': 'INSERT INTO clin.lnk_code2h_issue (fk_item, fk_generic_code) VALUES (%(issue)s, %(pk_code)s)',
                                'args': {
                                        'issue': self._payload['pk_health_issue'],
                                        'pk_code': pk_code
                                }
                        })
                if len(queries) == 0:
                        return
                # run it all in one transaction
                gmPG2.run_rw_queries(queries = queries)
                return

        generic_codes = property(_get_generic_codes, _set_generic_codes)

Represents one health issue.

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)

Args

aPK_obj
retrieve data from backend
  • an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
  • a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
must hold the fields
  • data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key column OR
  • pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    }
    
    rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
    objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
    

Ancestors

Static methods

def from_problem(problem) ‑> cHealthIssue

Initialize health issue from issue-type problem.

Instance variables

prop clinical_end_date
Expand source code
def _get_clinical_end_date(self):
        if self._payload['is_active']:
                return None
        if self._payload['has_open_episode']:
                return None
        latest_episode = self.latest_episode
        if latest_episode is not None:
                return latest_episode.best_guess_clinical_end_date
        # apparently, there are no episodes for this issue
        # and the issue is not active either
        # so, we simply do not know, the safest assumption is:
        return self.safe_start_date
prop diagnostic_certainty_description
Expand source code
def _get_diagnostic_certainty_description(self):
        return diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification'])
prop encounter
Expand source code
def _get_encounter(self):
        from Gnumed.business.gmEncounter import cEncounter
        return cEncounter(aPK_obj = self._payload['pk_encounter'])
prop episodes : list[cEpisode]
Expand source code
def get_episodes(self) -> list[gmEpisode.cEpisode]:
        """The episodes linked to this health issue."""
        SQL = 'SELECT * FROM clin.v_pat_episodes WHERE pk_health_issue = %(pk)s'
        args = {'pk': self.pk_obj}
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        return [ gmEpisode.cEpisode(row = {'data': r, 'pk_field': 'pk_episode'})  for r in rows ]

The episodes linked to this health issue.

prop external_care
Expand source code
def _get_external_care(self, order_by=None):
        return gmExternalCare.get_external_care_items(pk_health_issue = self.pk_obj, order_by = order_by)
prop first_episode
Expand source code
def _get_first_episode(self):

        args = {'pk_issue': self.pk_obj}

        cmd = """SELECT
                earliest, pk_episode
        FROM (
                        -- .modified_when of all episodes of this issue,
                        -- earliest-possible thereof = when created,
                        -- should actually go all the way back into audit.log_episode
                        (SELECT
                                c_epi.modified_when AS earliest,
                                c_epi.pk AS pk_episode
                         FROM clin.episode c_epi
                         WHERE c_epi.fk_health_issue = %(pk_issue)s
                        )
                UNION ALL

                        -- last modification of encounter in which episodes of this issue were created,
                        -- earliest-possible thereof = initial creation of that encounter
                        (SELECT
                                c_enc.modified_when AS earliest,
                                c_epi.pk AS pk_episode
                         FROM
                                clin.episode c_epi
                                        INNER JOIN clin.encounter c_enc ON (c_enc.pk = c_epi.fk_encounter)
                                        INNER JOIN clin.health_issue c_hi ON (c_hi.pk = c_epi.fk_health_issue)
                         WHERE c_hi.pk = %(pk_issue)s
                        )
                UNION ALL

                        -- start of encounter in which episodes of this issue were created,
                        -- earliest-possible thereof = set by user
                        (SELECT
                                c_enc.started AS earliest,
                                c_epi.pk AS pk_episode
                         FROM
                                clin.episode c_epi
                                        INNER JOIN clin.encounter c_enc ON (c_enc.pk = c_epi.fk_encounter)
                                        INNER JOIN clin.health_issue c_hi ON (c_hi.pk = c_epi.fk_health_issue)
                         WHERE c_hi.pk = %(pk_issue)s
                        )
                UNION ALL

                        -- start of encounters of clinical items linked to episodes of this issue,
                        -- earliest-possible thereof = explicitly set by user
                        (SELECT
                                c_enc.started AS earliest,
                                c_epi.pk AS pk_episode
                         FROM
                                clin.clin_root_item c_cri
                                        INNER JOIN clin.encounter c_enc ON (c_cri.fk_encounter = c_enc.pk)
                                        INNER JOIN clin.episode c_epi ON (c_cri.fk_episode = c_epi.pk)
                                                INNER JOIN clin.health_issue c_hi ON (c_epi.fk_health_issue = c_hi.pk)
                         WHERE c_hi.pk = %(pk_issue)s
                        )
                UNION ALL

                        -- .clin_when of clinical items linked to episodes of this issue,
                        -- earliest-possible thereof = explicitly set by user
                        (SELECT
                                c_cri.clin_when AS earliest,
                                c_epi.pk AS pk_episode
                         FROM
                                clin.clin_root_item c_cri
                                        INNER JOIN clin.episode c_epi ON (c_cri.fk_episode = c_epi.pk)
                                                INNER JOIN clin.health_issue c_hi ON (c_epi.fk_health_issue = c_hi.pk)
                         WHERE c_hi.pk = %(pk_issue)s
                        )
                UNION ALL

                        -- earliest modification time of clinical items linked to episodes of this issue
                        -- this CAN be used since if an item is linked to an episode it can be
                        -- assumed the episode (should have) existed at the time of creation
                        (SELECT
                                c_cri.modified_when AS earliest,
                                c_epi.pk AS pk_episode
                         FROM
                                clin.clin_root_item c_cri
                                        INNER JOIN clin.episode c_epi ON (c_cri.fk_episode = c_epi.pk)
                                                INNER JOIN clin.health_issue c_hi ON (c_epi.fk_health_issue = c_hi.pk)
                         WHERE c_hi.pk = %(pk_issue)s
                        )
                UNION ALL

                        -- there may not be items, but there may still be documents ...
                        (SELECT
                                b_dm.clin_when AS earliest,
                                c_epi.pk AS pk_episode
                         FROM
                                blobs.doc_med b_dm
                                        INNER JOIN clin.episode c_epi ON (b_dm.fk_episode = c_epi.pk)
                                                INNER JOIN clin.health_issue c_hi ON (c_epi.fk_health_issue = c_hi.pk)
                          WHERE c_hi.pk = %(pk_issue)s
                        )
        ) AS candidates
        ORDER BY earliest NULLS LAST
        LIMIT 1"""
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        if len(rows) == 0:
                return None

        return gmEpisode.cEpisode(aPK_obj = rows[0]['pk_episode'])
prop formatted_revision_history
Expand source code
def _get_formatted_revision_history(self):
        cmd = """SELECT
                        '<N/A>'::TEXT as audit__action_applied,
                        NULL AS audit__action_when,
                        '<N/A>'::TEXT AS audit__action_by,
                        pk_audit,
                        row_version,
                        modified_when,
                        modified_by,
                        pk,
                        description,
                        laterality,
                        age_noted,
                        is_active,
                        clinically_relevant,
                        is_confidential,
                        is_cause_of_death,
                        fk_encounter,
                        grouping,
                        diagnostic_certainty_classification,
                        summary
                FROM clin.health_issue
                WHERE pk = %(pk_health_issue)s
        UNION ALL (
                SELECT
                        audit_action as audit__action_applied,
                        audit_when as audit__action_when,
                        audit_by as audit__action_by,
                        pk_audit,
                        orig_version as row_version,
                        orig_when as modified_when,
                        orig_by as modified_by,
                        pk,
                        description,
                        laterality,
                        age_noted,
                        is_active,
                        clinically_relevant,
                        is_confidential,
                        is_cause_of_death,
                        fk_encounter,
                        grouping,
                        diagnostic_certainty_classification,
                        summary
                FROM audit.log_health_issue
                WHERE pk = %(pk_health_issue)s
        )
        ORDER BY row_version DESC
        """
        args = {'pk_health_issue': self.pk_obj}
        title = _('Health issue: %s%s%s') % (
                gmTools.u_left_double_angle_quote,
                self._payload['description'],
                gmTools.u_right_double_angle_quote
        )
        return '\n'.join(self._get_revision_history(cmd, args, title))
prop generic_codes
Expand source code
def _get_generic_codes(self):
        if len(self._payload['pk_generic_codes']) == 0:
                return []

        cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
        args = {'pks': self._payload['pk_generic_codes']}
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]
prop laterality_description
Expand source code
def _get_laterality_description(self):
        try:
                return laterality2str[self._payload['laterality']]
        except KeyError:
                return '<?>'
prop latest_access_date
Expand source code
        def _get_latest_access_date(self):
                cmd = """
SELECT
        MAX(latest)
FROM (
        -- last modification, latest = when last changed to the current state
        -- DO NOT USE: database upgrades may change this field
        (SELECT modified_when AS latest FROM clin.health_issue WHERE pk = %(pk)s)

        --UNION ALL
        -- last modification of encounter in which created, latest = initial creation of that encounter
        -- DO NOT USE: just because one corrects a typo does not mean the issue took any longer
        --(SELECT c_enc.modified_when AS latest FROM clin.encounter c_enc WHERE c_enc.pk = (
        --      SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s
        -- )
        --)

        --UNION ALL
        -- end of encounter in which created, latest = explicitly set
        -- DO NOT USE: we can retrospectively create issues which
        -- DO NOT USE: are long since finished
        --(SELECT c_enc.last_affirmed AS latest FROM clin.encounter c_enc WHERE c_enc.pk = (
        --      SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s
        -- )
        --)

        UNION ALL
        -- latest end of encounters of clinical items linked to this issue
        (SELECT
                MAX(last_affirmed) AS latest
         FROM clin.encounter
         WHERE pk IN (
                SELECT pk_encounter FROM clin.v_pat_items WHERE pk_health_issue = %(pk)s
         )
        )

        UNION ALL
        -- latest explicit .clin_when of clinical items linked to this issue
        (SELECT
                MAX(clin_when) AS latest
         FROM clin.v_pat_items
         WHERE pk_health_issue = %(pk)s
        )

        -- latest modification time of clinical items linked to this issue
        -- this CAN be used since if an item is linked to an issue it can be
        -- assumed the issue (should have) existed at the time of modification
        -- DO NOT USE, because typo fixes should not extend the issue
        --(SELECT MIN(modified_when) AS latest FROM clin.clin_root_item WHERE fk_episode = %(pk)s)

) AS candidates"""
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': {'pk': self.pk_obj}}])
                return rows[0][0]
prop latest_episode
Expand source code
def _get_latest_episode(self):

        # explicit always wins:
        if self._payload['has_open_episode']:
                return self.open_episode

        args = {'pk_issue': self.pk_obj}

        # cheap query first: any episodes at all ?
        cmd = "SELECT 1 FROM clin.episode WHERE fk_health_issue = %(pk_issue)s"
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        if len(rows) == 0:
                return None

        cmd = """SELECT
                latest, pk_episode
        FROM (
                        -- .clin_when of clinical items linked to episodes of this issue,
                        -- latest-possible thereof = explicitly set by user
                        (SELECT
                                c_cri.clin_when AS latest,
                                c_epi.pk AS pk_episode,
                                1 AS rank
                         FROM
                                clin.clin_root_item c_cri
                                        INNER JOIN clin.episode c_epi ON (c_cri.fk_episode = c_epi.pk)
                                                INNER JOIN clin.health_issue c_hi ON (c_epi.fk_health_issue = c_hi.pk)
                         WHERE c_hi.pk = %(pk_issue)s
                        )
                UNION ALL

                        -- .clin_when of documents linked to episodes of this issue
                        (SELECT
                                b_dm.clin_when AS latest,
                                c_epi.pk AS pk_episode,
                                1 AS rank
                         FROM
                                blobs.doc_med b_dm
                                        INNER JOIN clin.episode c_epi ON (b_dm.fk_episode = c_epi.pk)
                                                INNER JOIN clin.health_issue c_hi ON (c_epi.fk_health_issue = c_hi.pk)
                         WHERE c_hi.pk = %(pk_issue)s
                        )
                UNION ALL

                        -- last_affirmed of encounter in which episodes of this issue were created,
                        -- earliest-possible thereof = set by user
                        (SELECT
                                c_enc.last_affirmed AS latest,
                                c_epi.pk AS pk_episode,
                                2 AS rank
                         FROM
                                clin.episode c_epi
                                        INNER JOIN clin.encounter c_enc ON (c_enc.pk = c_epi.fk_encounter)
                                        INNER JOIN clin.health_issue c_hi ON (c_hi.pk = c_epi.fk_health_issue)
                         WHERE c_hi.pk = %(pk_issue)s
                        )

        ) AS candidates
        WHERE
                -- weed out NULL rows due to episodes w/o clinical items and w/o documents
                latest IS NOT NULL
        ORDER BY
                rank,
                latest DESC
        LIMIT 1
        """
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        if len(rows) == 0:
                # there were no episodes for this issue
                return None

        return gmEpisode.cEpisode(aPK_obj = rows[0]['pk_episode'])
prop open_episodecEpisode
Expand source code
def get_open_episode(self) -> gmEpisode.cEpisode:
        SQL = "select pk FROM clin.episode WHERE fk_health_issue = %(pk_issue)s AND is_open IS True LIMIT 1"
        args = {'pk_issue': self.pk_obj}
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        if rows:
                return gmEpisode.cEpisode(aPK_obj = rows[0][0])

        return None
prop possible_start_date
Expand source code
        def _get_possible_start_date(self):
                args = {'pk': self._payload['pk_health_issue']}
                cmd = """
SELECT MIN(earliest) FROM (
        -- last modification, earliest = when created in/changed to the current state
        (SELECT modified_when AS earliest FROM clin.health_issue WHERE pk = %(pk)s)

UNION ALL
        -- last modification of encounter in which created, earliest = initial creation of that encounter
        (SELECT c_enc.modified_when AS earliest FROM clin.encounter c_enc WHERE c_enc.pk = (
                SELECT c_hi.fk_encounter FROM clin.health_issue c_hi WHERE c_hi.pk = %(pk)s
        ))

UNION ALL
        -- earliest explicit .clin_when of clinical items linked to this health_issue
        (SELECT MIN(c_vpi.clin_when) AS earliest FROM clin.v_pat_items c_vpi WHERE c_vpi.pk_health_issue = %(pk)s)

UNION ALL
        -- earliest modification time of clinical items linked to this health issue
        -- this CAN be used since if an item is linked to a health issue it can be
        -- assumed the health issue (should have) existed at the time of creation
        (SELECT MIN(c_vpi.modified_when) AS earliest FROM clin.v_pat_items c_vpi WHERE c_vpi.pk_health_issue = %(pk)s)

UNION ALL
        -- earliest start of encounters of clinical items linked to this episode
        (SELECT MIN(c_enc.started) AS earliest FROM clin.encounter c_enc WHERE c_enc.pk IN (
                SELECT c_vpi.pk_encounter FROM clin.v_pat_items c_vpi WHERE c_vpi.pk_health_issue = %(pk)s
        ))

-- here we should be looking at
-- .best_guess_clinical_start_date of all episodes linked to this encounter

) AS candidates"""

                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return rows[0][0]
prop safe_start_date
Expand source code
def _get_safe_start_date(self):
        """This returns the date when we can assume to safely KNOW
           the health issue existed (because the provider said so)."""

        args = {
                'enc': self._payload['pk_encounter'],
                'pk': self._payload['pk_health_issue']
        }
        cmd = """SELECT COALESCE (
                -- this one must override all:
                -- .age_noted if not null and DOB is known
                (CASE
                        WHEN c_hi.age_noted IS NULL
                        THEN NULL::timestamp with time zone
                        WHEN
                                (SELECT d_i.dob FROM dem.identity d_i WHERE d_i.pk = (
                                        SELECT c_enc.fk_patient FROM clin.encounter c_enc WHERE c_enc.pk = %(enc)s
                                )) IS NULL
                        THEN NULL::timestamp with time zone
                        ELSE
                                c_hi.age_noted + (
                                        SELECT d_i.dob FROM dem.identity d_i WHERE d_i.pk = (
                                                SELECT c_enc.fk_patient FROM clin.encounter c_enc WHERE c_enc.pk = %(enc)s
                                        )
                                )
                END),

                -- look at best_guess_clinical_start_date of all linked episodes

                -- start of encounter in which created, earliest = explicitly set
                (SELECT c_enc.started AS earliest FROM clin.encounter c_enc WHERE c_enc.pk = c_hi.fk_encounter)
        )
        FROM clin.health_issue c_hi
        WHERE c_hi.pk = %(pk)s"""
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        start = rows[0][0]
        # leads to a loop:
        #end = self.clinical_end_date
        #if start > end:
        #       return end
        return start

This returns the date when we can assume to safely KNOW the health issue existed (because the provider said so).

Methods

def add_code(self, pk_code=None)
Expand source code
def add_code(self, pk_code=None):
        """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
        cmd = "INSERT INTO clin.lnk_code2h_issue (fk_item, fk_generic_code) values (%(item)s, %(code)s)"
        args = {
                'item': self._payload['pk_health_issue'],
                'code': pk_code
        }
        gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
        return True

must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)

def age_noted_human_readable(self)
Expand source code
def age_noted_human_readable(self):
        if self._payload['age_noted'] is None:
                return '<???>'

        # since we've already got an interval we are bound to use it,
        # further transformation will only introduce more errors,
        # later we can improve this deeper inside
        return gmDateTime.format_interval_medically(self._payload['age_noted'])
def close_episode(self) ‑> bool
Expand source code
def close_episode(self) -> bool:
        """Unconditionally close the open episode, if any."""
        open_episode = self.get_open_episode()
        open_episode['episode_open'] = False
        success_state, data = open_episode.save_payload()
        return success_state

Unconditionally close the open episode, if any.

def close_expired_episode(self, ttl: int = 180) ‑> bool
Expand source code
def close_expired_episode(self, ttl:int=180) -> bool:
        """Close open episode if "older" than the Time To Live.

        Args:
                ttl: maximum "age" of episode in days
        """
        open_episode = self.open_episode
        if open_episode is None:
                return True

        #clinical_end = open_episode.best_guess_clinical_end_date
        clinical_end = open_episode.latest_access_date          # :-/
        ttl = datetime.timedelta(ttl)
        now = datetime.datetime.now(tz = clinical_end.tzinfo)
        if (clinical_end + ttl) > now:
                return False

        open_episode['episode_open'] = False
        success, data = open_episode.save_payload()
        if success:
                return True

        return False

Close open episode if "older" than the Time To Live.

Args

ttl
maximum "age" of episode in days
def format_as_journal(self, left_margin=0, date_format='%Y %b %d, %a')
Expand source code
def format_as_journal(self, left_margin=0, date_format='%Y %b %d, %a'):
        rows = gmClinNarrative.get_as_journal (
                issues = [self.pk_obj],
                order_by = 'pk_episode, pk_encounter, clin_when, scr, src_table'
        )

        if len(rows) == 0:
                return ''

        left_margin = ' ' * left_margin

        lines = []
        lines.append(_('Clinical data generated during encounters under this health issue:'))

        prev_epi = None
        for row in rows:
                if row['pk_episode'] != prev_epi:
                        lines.append('')
                        prev_epi = row['pk_episode']

                top_row = '%s%s %s (%s) %s' % (
                        gmTools.u_box_top_left_arc,
                        gmTools.u_box_horiz_single,
                        gmSoapDefs.soap_cat2l10n_str[row['real_soap_cat']],
                        row['clin_when'].strftime(date_format),
                        gmTools.u_box_horiz_single * 5
                )
                soap = gmTools.wrap (
                        text = row['narrative'],
                        width = 60,
                        initial_indent = '  ',
                        subsequent_indent = '  ' + left_margin
                )
                row_ver = ''
                if row['row_version'] > 0:
                        row_ver = 'v%s: ' % row['row_version']
                bottom_row = '%s%s %s, %s%s %s' % (
                        ' ' * 40,
                        gmTools.u_box_horiz_light_heavy,
                        row['modified_by'],
                        row_ver,
                        row['modified_when'].strftime(date_format),
                        gmTools.u_box_horiz_heavy_light
                )

                lines.append(top_row)
                lines.append(soap)
                lines.append(bottom_row)

        eol_w_margin = '\n%s' % left_margin
        return left_margin + eol_w_margin.join(lines) + '\n'
def get_episodes(self) ‑> list[cEpisode]
Expand source code
def get_episodes(self) -> list[gmEpisode.cEpisode]:
        """The episodes linked to this health issue."""
        SQL = 'SELECT * FROM clin.v_pat_episodes WHERE pk_health_issue = %(pk)s'
        args = {'pk': self.pk_obj}
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        return [ gmEpisode.cEpisode(row = {'data': r, 'pk_field': 'pk_episode'})  for r in rows ]

The episodes linked to this health issue.

def get_open_episode(self) ‑> cEpisode
Expand source code
def get_open_episode(self) -> gmEpisode.cEpisode:
        SQL = "select pk FROM clin.episode WHERE fk_health_issue = %(pk_issue)s AND is_open IS True LIMIT 1"
        args = {'pk_issue': self.pk_obj}
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        if rows:
                return gmEpisode.cEpisode(aPK_obj = rows[0][0])

        return None
def remove_code(self, pk_code=None)
Expand source code
def remove_code(self, pk_code=None):
        """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
        cmd = "DELETE FROM clin.lnk_code2h_issue WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
        args = {
                'item': self._payload['pk_health_issue'],
                'code': pk_code
        }
        gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
        return True

must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)

def rename(self, description: str = None) ‑> bool
Expand source code
def rename(self, description:str=None) -> bool:
        """Rename health issue.

        Args:
                description: the new descriptive name for the issue
        """
        if description.strip() == '':
                return False

        # update the issue description
        old_description = self._payload['description']
        self._payload['description'] = description.strip()
        self._is_modified = True
        successful, data = self.save_payload()
        if not successful:
                _log.error('cannot rename health issue [%s] with [%s]' % (self, description))
                self._payload['description'] = old_description
                return False

        return True

Rename health issue.

Args

description
the new descriptive name for the issue

Inherited members