Module Gnumed.business.gmEpisode

GNUmed episode related business object.

license: GPL v2 or later

Expand source code
# -*- coding: utf-8 -*-
"""GNUmed episode related business object.

license: GPL v2 or later
"""
#============================================================
__author__ = "<karsten.hilbert@gmx.net>"

import sys
import logging


if __name__ == '__main__':
        sys.path.insert(0, '../../')
        _ = lambda x:x
from Gnumed.pycommon import gmPG2
from Gnumed.pycommon import gmI18N
from Gnumed.pycommon import gmTools
from Gnumed.pycommon import gmDateTime
from Gnumed.pycommon import gmBusinessDBObject
from Gnumed.pycommon import gmExceptions
from Gnumed.pycommon import gmMatchProvider

from Gnumed.business import gmClinNarrative
from Gnumed.business import gmSoapDefs
from Gnumed.business import gmCoding
from Gnumed.business import gmPraxis

_log = logging.getLogger('gm.emr')

#============================================================
# episodes API
#============================================================
class cEpisode(gmBusinessDBObject.cBusinessDBObject):
        """Represents one clinical episode.
        """
        _cmd_fetch_payload = "select * from clin.v_pat_episodes where pk_episode=%s"
        _cmds_store_payload = [
                """update clin.episode set
                                fk_health_issue = %(pk_health_issue)s,
                                is_open = %(episode_open)s::boolean,
                                description = %(description)s,
                                summary = gm.nullify_empty_string(%(summary)s),
                                diagnostic_certainty_classification = gm.nullify_empty_string(%(diagnostic_certainty_classification)s)
                        where
                                pk = %(pk_episode)s and
                                xmin = %(xmin_episode)s""",
                """select xmin_episode from clin.v_pat_episodes where pk_episode = %(pk_episode)s"""
        ]
        _updatable_fields = [
                'pk_health_issue',
                'episode_open',
                'description',
                'summary',
                'diagnostic_certainty_classification'
        ]
        #--------------------------------------------------------
        def __init__(self, aPK_obj=None, id_patient=None, name='xxxDEFAULTxxx', health_issue=None, row=None, encounter=None, link_obj=None):
                pk = aPK_obj
                if pk is None and row is None:

                        where_parts = ['description = %(desc)s']

                        if id_patient is not None:
                                where_parts.append('pk_patient = %(pat)s')

                        if health_issue is not None:
                                where_parts.append('pk_health_issue = %(issue)s')

                        if encounter is not None:
                                where_parts.append('pk_patient = (SELECT fk_patient FROM clin.encounter WHERE pk = %(enc)s)')

                        args = {
                                'pat': id_patient,
                                'issue': health_issue,
                                'enc': encounter,
                                'desc': name
                        }

                        cmd = 'SELECT * FROM clin.v_pat_episodes WHERE %s' % ' AND '.join(where_parts)

                        rows = gmPG2.run_ro_queries (
                                link_obj = link_obj,
                                queries = [{'sql': cmd, 'args': args}],
                        )

                        if len(rows) == 0:
                                raise gmExceptions.NoSuchBusinessObjectError('no episode for [%s:%s:%s:%s]' % (id_patient, name, health_issue, encounter))

                        r = {'data': rows[0], 'pk_field': 'pk_episode'}
                        gmBusinessDBObject.cBusinessDBObject.__init__(self, row=r)

                else:
                        gmBusinessDBObject.cBusinessDBObject.__init__(self, aPK_obj=pk, row=row, link_obj = link_obj)

        #--------------------------------------------------------
        # external API
        #--------------------------------------------------------
        @classmethod
        def from_problem(cls, problem) -> 'cEpisode':
                """Initialize episode from episody-type problem."""
                if isinstance(problem, cEpisode):
                        return problem

                assert problem['type'] == 'episode', 'cannot convert [%s] to episode' % problem
                return cls(aPK_obj = problem['pk_episode'])

        #--------------------------------------------------------
        def get_patient(self):
                return self._payload['pk_patient']

        #--------------------------------------------------------
        def get_narrative(self, soap_cats=None, encounters=None, order_by = None):
                return gmClinNarrative.get_narrative (
                        soap_cats = soap_cats,
                        encounters = encounters,
                        episodes = [self.pk_obj],
                        order_by = order_by
                )

        #--------------------------------------------------------
        def rename(self, description=None):
                """Method for episode editing, that is, episode renaming.

                @param description
                        - the new descriptive name for the encounter
                @type description
                        - a string instance
                """
                # sanity check
                if description.strip() == '':
                        _log.error('<description> must be a non-empty string instance')
                        return False
                # update the episode description
                old_description = self._payload['description']
                self._payload['description'] = description.strip()
                self._is_modified = True
                successful, data = self.save_payload()
                if not successful:
                        _log.error('cannot rename episode [%s] to [%s]' % (self, description))
                        self._payload['description'] = old_description
                        return False
                return True

        #--------------------------------------------------------
        def add_code(self, pk_code=None):
                """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""

                if pk_code in self._payload['pk_generic_codes']:
                        return

                cmd = """
                        INSERT INTO clin.lnk_code2episode
                                (fk_item, fk_generic_code)
                        SELECT
                                %(item)s,
                                %(code)s
                        WHERE NOT EXISTS (
                                SELECT 1 FROM clin.lnk_code2episode
                                WHERE
                                        fk_item = %(item)s
                                                AND
                                        fk_generic_code = %(code)s
                        )"""
                args = {
                        'item': self._payload['pk_episode'],
                        'code': pk_code
                }
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return

        #--------------------------------------------------------
        def remove_code(self, pk_code=None):
                """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
                cmd = "DELETE FROM clin.lnk_code2episode WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
                args = {
                        'item': self._payload['pk_episode'],
                        'code': pk_code
                }
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return True

        #--------------------------------------------------------
        def format_as_journal(self, left_margin=0, date_format='%Y %b %d, %a'):
                rows = gmClinNarrative.get_as_journal (
                        episodes = [self.pk_obj],
                        order_by = 'pk_encounter, clin_when, scr, src_table'
                        #order_by = u'pk_encounter, scr, clin_when, src_table'
                )

                if len(rows) == 0:
                        return ''

                lines = []

                lines.append(_('Clinical data generated during encounters within this episode:'))

                left_margin = ' ' * left_margin

                prev_enc = None
                for row in rows:
                        if row['pk_encounter'] != prev_enc:
                                lines.append('')
                                prev_enc = row['pk_encounter']

                        when = row['clin_when'].strftime(date_format)
                        top_row = '%s%s %s (%s) %s' % (
                                gmTools.u_box_top_left_arc,
                                gmTools.u_box_horiz_single,
                                gmSoapDefs.soap_cat2l10n_str[row['real_soap_cat']],
                                when,
                                gmTools.u_box_horiz_single * 5
                        )
                        soap = gmTools.wrap (
                                text = row['narrative'],
                                width = 60,
                                initial_indent = '  ',
                                subsequent_indent = '  ' + left_margin
                        )
                        row_ver = ''
                        if row['row_version'] > 0:
                                row_ver = 'v%s: ' % row['row_version']
                        bottom_row = '%s%s %s, %s%s %s' % (
                                ' ' * 40,
                                gmTools.u_box_horiz_light_heavy,
                                row['modified_by'],
                                row_ver,
                                row['modified_when'].strftime(date_format),
                                gmTools.u_box_horiz_heavy_light
                        )

                        lines.append(top_row)
                        lines.append(soap)
                        lines.append(bottom_row)

                eol_w_margin = '\n%s' % left_margin
                return left_margin + eol_w_margin.join(lines) + '\n'

        #--------------------------------------------------------
        def format_maximum_information(self, patient=None):
                if patient is None:
                        from Gnumed.business.gmPerson import gmCurrentPatient, cPatient
                        if not gmCurrentPatient().connected:
                                patient = cPatient(self._payload['pk_patient'])
                        else:
                                if self._payload['pk_patient'] == gmCurrentPatient().ID:
                                        patient = gmCurrentPatient()
                                else:
                                        patient = cPatient(self._payload['pk_patient'])

                return self.format (
                        patient = patient,
                        with_summary = True,
                        with_codes = True,
                        with_encounters = True,
                        with_documents = True,
                        with_hospital_stays = True,
                        with_procedures = True,
                        with_family_history = True,
                        with_tests = False,             # does not inform on the episode itself
                        with_vaccinations = True,
                        with_health_issue = True,
                        return_list = True
                )

        #--------------------------------------------------------
        def __format_encounters(self, left_margin=0, emr=None):
                lines = []

                encs = emr.get_encounters(episodes = [self._payload['pk_episode']])
                if encs is None:
                        return [_('Error retrieving encounters for this episode.')]

                if len(encs) == 0:
                        #lines.append(_('There are no encounters for this issue.'))
                        return []

                first_encounter = emr.get_first_encounter(episode_id = self._payload['pk_episode'])
                last_encounter = emr.get_last_encounter(episode_id = self._payload['pk_episode'])
                lines.append(_('Last worked on: %s\n') % last_encounter['last_affirmed_original_tz'].strftime('%Y-%m-%d %H:%M'))
                if len(encs) < 4:
                        line = _('%s encounter(s) (%s - %s):')
                else:
                        line = _('1st and (up to 3) most recent (of %s) encounters (%s - %s):')
                lines.append(line % (
                        len(encs),
                        first_encounter['started'].strftime('%m/%Y'),
                        last_encounter['last_affirmed'].strftime('%m/%Y')
                ))
                lines.append(' %s - %s (%s):%s' % (
                        first_encounter['started_original_tz'].strftime('%Y-%m-%d %H:%M'),
                        first_encounter['last_affirmed_original_tz'].strftime('%H:%M'),
                        first_encounter['l10n_type'],
                        gmTools.coalesce (
                                first_encounter['assessment_of_encounter'],
                                gmTools.coalesce (
                                        first_encounter['reason_for_encounter'],
                                        '',
                                        ' \u00BB%s\u00AB' + (' (%s)' % _('RFE'))
                                ),
                                ' \u00BB%s\u00AB' + (' (%s)' % _('AOE'))
                        )
                ))
                if len(encs) > 4:
                        lines.append(_(' %s %s skipped %s') % (
                                gmTools.u_ellipsis,
                                (len(encs) - 4),
                                gmTools.u_ellipsis
                        ))
                for enc in encs[1:][-3:]:
                        lines.append(' %s - %s (%s):%s' % (
                                enc['started_original_tz'].strftime('%Y-%m-%d %H:%M'),
                                enc['last_affirmed_original_tz'].strftime('%H:%M'),
                                enc['l10n_type'],
                                gmTools.coalesce (
                                        enc['assessment_of_encounter'],
                                        gmTools.coalesce (
                                                enc['reason_for_encounter'],
                                                '',
                                                ' \u00BB%s\u00AB' + (' (%s)' % _('RFE'))
                                        ),
                                        ' \u00BB%s\u00AB' + (' (%s)' % _('AOE'))
                                )
                        ))
                # spell out last encounter
                if last_encounter:
                        lines.append('')
                        lines.append(_('Progress notes in most recent encounter:'))
                        lines.extend(last_encounter.format_soap (
                                episodes = [ self._payload['pk_episode'] ],
                                left_margin = left_margin,
                                soap_cats = 'soapu',
                                emr = emr
                        ))
                return lines

        #--------------------------------------------------------
        def format(self, left_margin=0, patient=None,
                with_summary=True,
                with_codes=True,
                with_encounters=True,
                with_documents=True,
                with_hospital_stays=True,
                with_procedures=True,
                with_family_history=True,
                with_tests=True,
                with_vaccinations=True,
                with_health_issue=False,
                return_list=False
        ):
                if patient:
                        if patient.ID != self._payload['pk_patient']:
                                msg = '<patient>.ID = %s but episode %s belongs to patient %s' % (
                                        patient.ID,
                                        self._payload['pk_episode'],
                                        self._payload['pk_patient']
                                )
                                raise ValueError(msg)
                        emr = patient.emr
                else:
                        with_encounters = False
                        with_documents = False
                        with_hospital_stays = False
                        with_procedures = False
                        with_family_history = False
                        with_tests = False
                        with_vaccinations = False

                lines = []

                # episode details
                lines.append (_('Episode %s%s%s   [#%s]') % (
                        gmTools.u_left_double_angle_quote,
                        self._payload['description'],
                        gmTools.u_right_double_angle_quote,
                        self._payload['pk_episode']
                ))

                from Gnumed.business.gmEncounter import cEncounter
                enc = cEncounter(aPK_obj = self._payload['pk_encounter'])
                lines.append (' ' + _('Created during encounter: %s (%s - %s)   [#%s]') % (
                        enc['l10n_type'],
                        enc['started_original_tz'].strftime('%Y-%m-%d %H:%M'),
                        enc['last_affirmed_original_tz'].strftime('%H:%M'),
                        self._payload['pk_encounter']
                ))

                if patient is not None:
                        range_str, range_str_verb, duration_str = self.formatted_clinical_duration
                        lines.append(_(' Duration: %s (%s)') % (duration_str, range_str_verb))

                from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
                lines.append(' ' + _('Status') + ': %s%s' % (
                        gmTools.bool2subst(self._payload['episode_open'], _('active'), _('finished')),
                        gmTools.coalesce (
                                value2test = diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification']),
                                return_instead = '',
                                template4value = ', %s',
                                none_equivalents = [None, '']
                        )
                ))

                if with_health_issue:
                        lines.append(' ' + _('Health issue') + ': %s' % gmTools.coalesce (
                                self._payload['health_issue'],
                                _('none associated')
                        ))

                if with_summary:
                        if self._payload['summary'] is not None:
                                lines.append(' %s:' % _('Synopsis'))
                                lines.append(gmTools.wrap (
                                                text = self._payload['summary'],
                                                width = 60,
                                                initial_indent = '  ',
                                                subsequent_indent = '  '
                                        )
                                )
                lines.append('')
                # encounters
                if with_encounters:
                        lines.extend(self.__format_encounters(left_margin = left_margin, emr = emr))
                # documents
                if with_documents:
                        doc_folder = patient.get_document_folder()
                        docs = doc_folder.get_documents (
                                pk_episodes = [ self._payload['pk_episode'] ]
                        )
                        if len(docs) > 0:
                                lines.append('')
                                lines.append(_('Documents: %s') % len(docs))
                        for d in docs:
                                lines.append(' ' + d.format(single_line = True))
                        del docs

                # hospitalizations
                if with_hospital_stays:
                        stays = emr.get_hospital_stays(episodes = [ self._payload['pk_episode'] ])
                        if len(stays) > 0:
                                lines.append('')
                                lines.append(_('Hospitalizations: %s') % len(stays))
                        for s in stays:
                                lines.append(s.format(left_margin = (left_margin + 1)))
                        del stays

                # procedures
                if with_procedures:
                        procs = emr.get_performed_procedures(episodes = [ self._payload['pk_episode'] ])
                        if len(procs) > 0:
                                lines.append('')
                                lines.append(_('Procedures performed: %s') % len(procs))
                        for p in procs:
                                lines.append(p.format (
                                        left_margin = (left_margin + 1),
                                        include_episode = False,
                                        include_codes = True
                                ))
                        del procs

                # family history
                if with_family_history:
                        fhx = emr.get_family_history(episodes = [ self._payload['pk_episode'] ])
                        if len(fhx) > 0:
                                lines.append('')
                                lines.append(_('Family History: %s') % len(fhx))
                        for f in fhx:
                                lines.append(f.format (
                                        left_margin = (left_margin + 1),
                                        include_episode = False,
                                        include_comment = True,
                                        include_codes = True
                                ))
                        del fhx

                # test results
                if with_tests:
                        tests = emr.get_test_results_by_date(episodes = [ self._payload['pk_episode'] ])
                        if len(tests) > 0:
                                lines.append('')
                                lines.append(_('Measurements and Results:'))
                        for t in tests:
                                lines.append(' ' + t.format_concisely(date_format = '%Y %b %d', with_notes = True))
                        del tests

                # vaccinations
                if with_vaccinations:
                        vaccs = emr.get_vaccinations (
                                episodes = [ self._payload['pk_episode'] ],
                                order_by = 'date_given DESC, vaccine'
                        )
                        if len(vaccs) > 0:
                                lines.append('')
                                lines.append(_('Vaccinations:'))
                        for vacc in vaccs:
                                lines.extend(vacc.format (
                                        with_indications = True,
                                        with_comment = True,
                                        with_reaction = True,
                                        date_format = '%Y-%m-%d'
                                ))
                        del vaccs

                lines = gmTools.strip_trailing_empty_lines(lines = lines, eol = '\n')
                if return_list:
                        return lines

                left_margin = ' ' * left_margin
                eol_w_margin = '\n%s' % left_margin
                return left_margin + eol_w_margin.join(lines) + '\n'

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def _get_best_guess_clinical_start_date(self):
                return get_best_guess_clinical_start_date_for_episode(pk_episode = self.pk_obj)

        best_guess_clinical_start_date = property(_get_best_guess_clinical_start_date)

        #--------------------------------------------------------
        def _get_best_guess_clinical_end_date(self):
                return get_best_guess_clinical_end_date_for_episode(self.pk_obj)

        best_guess_clinical_end_date = property(_get_best_guess_clinical_end_date)

        #--------------------------------------------------------
        def _get_formatted_clinical_duration(self):
                return format_clinical_duration_of_episode (
                        start = get_best_guess_clinical_start_date_for_episode(self.pk_obj),
                        end = get_best_guess_clinical_end_date_for_episode(self.pk_obj)
                )

        formatted_clinical_duration = property(_get_formatted_clinical_duration)

        #--------------------------------------------------------
        def _get_latest_access_date(self):
                cmd = """SELECT MAX(latest) FROM (
                        -- last modification, latest = when last changed to the current state
                        (SELECT c_epi.modified_when AS latest, 'clin.episode.modified_when'::text AS candidate FROM clin.episode c_epi WHERE c_epi.pk = %(pk)s)

                                UNION ALL

                        -- last modification of encounter in which created, latest = initial creation of that encounter
                        -- DO NOT USE: just because one corrects a typo does not mean the episode took longer
                        --(SELECT c_enc.modified_when AS latest FROM clin.encounter c_enc WHERE c_enc.pk = (
                        --      SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s
                        --))

                        -- end of encounter in which created, latest = explicitly set
                        -- DO NOT USE: we can retrospectively create episodes which
                        -- DO NOT USE: are long since finished
                        --(SELECT c_enc.last_affirmed AS latest FROM clin.encounter c_enc WHERE c_enc.pk = (
                        --      SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s
                        --))

                        -- latest end of encounters of clinical items linked to this episode
                        (SELECT
                                MAX(last_affirmed) AS latest,
                                'clin.episode.pk = clin.clin_root_item,fk_episode -> .fk_encounter.last_affirmed'::text AS candidate
                         FROM clin.encounter
                         WHERE pk IN (
                                SELECT fk_encounter FROM clin.clin_root_item WHERE fk_episode = %(pk)s
                        ))
                                UNION ALL

                        -- latest explicit .clin_when of clinical items linked to this episode
                        (SELECT
                                MAX(clin_when) AS latest,
                                'clin.episode.pk = clin.clin_root_item,fk_episode -> .clin_when'::text AS candidate
                         FROM clin.clin_root_item
                         WHERE fk_episode = %(pk)s
                        )

                        -- latest modification time of clinical items linked to this episode
                        -- this CAN be used since if an item is linked to an episode it can be
                        -- assumed the episode (should have) existed at the time of creation
                        -- DO NOT USE, because typo fixes should not extend the episode
                        --(SELECT MIN(modified_when) AS latest FROM clin.clin_root_item WHERE fk_episode = %(pk)s)

                        -- not sure about this one:
                        -- .pk -> clin.clin_root_item.fk_encounter.modified_when

                ) AS candidates"""
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': {'pk': self.pk_obj}}])
                return rows[0][0]

        latest_access_date = property(_get_latest_access_date)

        #--------------------------------------------------------
        def _get_diagnostic_certainty_description(self):
                from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
                return diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification'])

        diagnostic_certainty_description = property(_get_diagnostic_certainty_description)

        #--------------------------------------------------------
        def _get_formatted_revision_history(self):
                cmd = """SELECT
                                '<N/A>'::TEXT as audit__action_applied,
                                NULL AS audit__action_when,
                                '<N/A>'::TEXT AS audit__action_by,
                                pk_audit,
                                row_version,
                                modified_when,
                                modified_by,
                                pk, fk_health_issue, description, is_open, fk_encounter,
                                diagnostic_certainty_classification,
                                summary
                        FROM clin.episode
                        WHERE pk = %(pk_episode)s
                UNION ALL (
                        SELECT
                                audit_action as audit__action_applied,
                                audit_when as audit__action_when,
                                audit_by as audit__action_by,
                                pk_audit,
                                orig_version as row_version,
                                orig_when as modified_when,
                                orig_by as modified_by,
                                pk, fk_health_issue, description, is_open, fk_encounter,
                                diagnostic_certainty_classification,
                                summary
                        FROM audit.log_episode
                        WHERE pk = %(pk_episode)s
                )
                ORDER BY row_version DESC
                """
                args = {'pk_episode': self.pk_obj}
                title = _('Episode: %s%s%s') % (
                        gmTools.u_left_double_angle_quote,
                        self._payload['description'],
                        gmTools.u_right_double_angle_quote
                )
                return '\n'.join(self._get_revision_history(cmd, args, title))

        formatted_revision_history = property(_get_formatted_revision_history)

        #--------------------------------------------------------
        def _get_generic_codes(self):
                if len(self._payload['pk_generic_codes']) == 0:
                        return []

                cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
                args = {'pks': self._payload['pk_generic_codes']}
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]

        def _set_generic_codes(self, pk_codes):
                queries = []
                # remove all codes
                if len(self._payload['pk_generic_codes']) > 0:
                        queries.append ({
                                'sql': 'DELETE FROM clin.lnk_code2episode WHERE fk_item = %(epi)s AND fk_generic_code = ANY(%(codes)s)',
                                'args': {
                                        'epi': self._payload['pk_episode'],
                                        'codes': self._payload['pk_generic_codes']
                                }
                        })
                # add new codes
                for pk_code in pk_codes:
                        queries.append ({
                                'sql': 'INSERT INTO clin.lnk_code2episode (fk_item, fk_generic_code) VALUES (%(epi)s, %(pk_code)s)',
                                'args': {
                                        'epi': self._payload['pk_episode'],
                                        'pk_code': pk_code
                                }
                        })
                if len(queries) == 0:
                        return
                # run it all in one transaction
                gmPG2.run_rw_queries(queries = queries)
                return

        generic_codes = property(_get_generic_codes, _set_generic_codes)

        #--------------------------------------------------------
        def _get_has_narrative(self):
                cmd = """SELECT EXISTS (
                        SELECT 1 FROM clin.clin_narrative
                        WHERE
                                fk_episode = %(epi)s
                                        AND
                                fk_encounter IN (
                                        SELECT pk FROM clin.encounter WHERE fk_patient = %(pat)s
                                )
                )"""
                args = {
                        'pat': self._payload['pk_patient'],
                        'epi': self._payload['pk_episode']
                }
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return rows[0][0]

        has_narrative = property(_get_has_narrative)

        #--------------------------------------------------------
        def _get_health_issue(self):
                if self._payload['pk_health_issue'] is None:
                        return None

                from Gnumed.business.gmHealthIssue import cHealthIssue
                return cHealthIssue(self._payload['pk_health_issue'])

        health_issue = property(_get_health_issue)

#============================================================
def create_episode(pk_health_issue:int=None, episode_name:str=None, is_open:bool=False, allow_dupes:bool=False, encounter:int=None, link_obj=None):
        """Creates a new episode for a given patient's health issue.

        pk_health_issue - given health issue PK
        episode_name - name of episode
        is_open - whether a *new* episode is to be open (don't tinker with existing ones)
        """
        if not allow_dupes:
                try:
                        return cEpisode(name = episode_name, health_issue = pk_health_issue, encounter = encounter, link_obj = link_obj)

                except gmExceptions.ConstructorError:
                        pass
        queries = []
        SQL = "INSERT INTO clin.episode (fk_health_issue, description, is_open, fk_encounter) VALUES (%(pk_hi)s, %(epi_name)s, %(open)s::boolean, %(pk_enc)s)"
        args = {
                'pk_hi': pk_health_issue,
                'epi_name': episode_name,
                'open': is_open,
                'pk_enc': encounter
        }
        queries.append({'sql': SQL, 'args': args})
        queries.append({'sql': cEpisode._cmd_fetch_payload % "currval('clin.episode_pk_seq')"})
        rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = queries, return_data = True)
        return cEpisode(row = {'data': rows[0], 'pk_field': 'pk_episode'})

#-----------------------------------------------------------
def delete_episode(episode=None):
        if isinstance(episode, cEpisode):
                pk = episode['pk_episode']
        else:
                pk = int(episode)

        cmd = 'DELETE FROM clin.episode WHERE pk = %(pk)s'
        try:
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': {'pk': pk}}])
        except gmPG2.dbapi.IntegrityError:
                # should be parsing pgcode/and or error message
                _log.exception('cannot delete episode, it is in use')
                return False

        return True

#-----------------------------------------------------------
_SQL_best_guess_clinical_start_date_for_episode = """
        SELECT MIN(earliest) FROM (
                -- modified_when of episode,
                -- earliest possible thereof = when created,
                -- should actually go all the way back into audit.log_episode
                (SELECT c_epi.modified_when AS earliest FROM clin.episode c_epi WHERE c_epi.pk = %(pk)s)

                        UNION ALL

                -- last modification of encounter in which created,
                -- earliest-possible thereof = initial creation of that encounter
                (SELECT c_enc.modified_when AS earliest FROM clin.encounter c_enc WHERE c_enc.pk = (
                        SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s
                ))
                        UNION ALL

                -- start of encounter in which created,
                -- earliest-possible thereof = explicitly set by user
                (SELECT c_enc.started AS earliest FROM clin.encounter c_enc WHERE c_enc.pk = (
                        SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s
                ))
                        UNION ALL

                -- start of encounters of clinical items linked to this episode,
                -- earliest-possible thereof = explicitly set by user
                (SELECT MIN(started) AS earliest FROM clin.encounter WHERE pk IN (
                        SELECT fk_encounter FROM clin.clin_root_item WHERE fk_episode = %(pk)s
                ))
                        UNION ALL

                -- .clin_when of clinical items linked to this episode,
                -- earliest-possible thereof = explicitly set by user
                (SELECT MIN(clin_when) AS earliest FROM clin.clin_root_item WHERE fk_episode = %(pk)s)

                        UNION ALL

                -- earliest modification time of clinical items linked to this episode
                -- this CAN be used since if an item is linked to an episode it can be
                -- assumed the episode (should have) existed at the time of creation
                (SELECT MIN(modified_when) AS earliest FROM clin.clin_root_item WHERE fk_episode = %(pk)s)

                        UNION ALL

                -- there may not be items, but there may still be documents ...
                (SELECT MIN(clin_when) AS earliest FROM blobs.doc_med WHERE fk_episode = %(pk)s)
        ) AS candidates
"""

def get_best_guess_clinical_start_date_for_episode(pk_episode=None):
        assert (pk_episode is not None), '<pk_episode> must not be None'
        query = {
                'sql': _SQL_best_guess_clinical_start_date_for_episode,
                'args': {'pk': pk_episode}
        }
        rows = gmPG2.run_ro_queries(queries = [query])
        return rows[0][0]

#-----------------------------------------------------------
_SQL_best_guess_clinical_end_date_for_episode = """
        SELECT
                CASE WHEN
                        -- if open episode ...
                        (SELECT is_open FROM clin.episode WHERE pk = %(pk)s)
                THEN
                        -- ... no end date
                        NULL::timestamp with time zone
                ELSE (
                        SELECT COALESCE (
                                (SELECT
                                        latest --, source_type
                                 FROM (
                                        -- latest explicit .clin_when of clinical items linked to this episode
                                        (SELECT
                                                MAX(clin_when) AS latest,
                                                'clin.episode.pk = clin.clin_root_item.fk_episode -> .clin_when'::text AS source_type
                                         FROM clin.clin_root_item
                                         WHERE fk_episode = %(pk)s
                                        )
                                                UNION ALL
                                        -- latest explicit .clin_when of documents linked to this episode
                                        (SELECT
                                                MAX(clin_when) AS latest,
                                                'clin.episode.pk = blobs.doc_med.fk_episode -> .clin_when'::text AS source_type
                                         FROM blobs.doc_med
                                         WHERE fk_episode = %(pk)s
                                        )
                                 ) AS candidates
                                 ORDER BY latest DESC NULLS LAST
                                 LIMIT 1
                                ),
                                -- last ditch, always exists, only use when no clinical items or documents linked:
                                -- last modification, latest = when last changed to the current state
                                (SELECT c_epi.modified_when AS latest --, 'clin.episode.modified_when'::text AS source_type
                                 FROM clin.episode c_epi WHERE c_epi.pk = %(pk)s
                                )
                        )
                )
                END
"""

def get_best_guess_clinical_end_date_for_episode(pk_episode=None):
        assert (pk_episode is not None), '<pk_episode> must not be None'
        query = {
                'sql': _SQL_best_guess_clinical_end_date_for_episode,
                'args': {'pk': pk_episode}
        }
        rows = gmPG2.run_ro_queries(queries = [query])
        return rows[0][0]

#-----------------------------------------------------------
def format_clinical_duration_of_episode(start=None, end=None):
        assert (start is not None), '<start> must not be None'

        if end is None:
                start_end_str = '%s-%s' % (
                        start.strftime("%b'%y"),
                        gmTools.u_ellipsis
                )
                start_end_str_long = '%s - %s' % (
                        start.strftime('%b %d %Y'),
                        gmTools.u_ellipsis
                )
                duration_str = _('%s so far') % gmDateTime.format_interval_medically(gmDateTime.pydt_now_here() - start)
                return (start_end_str, start_end_str_long, duration_str)

        duration_str = gmDateTime.format_interval_medically(end - start)
        # year different:
        if end.year != start.year:
                start_end_str = '%s-%s' % (
                        start.strftime("%b'%y"),
                        end.strftime("%b'%y")
                )
                start_end_str_long = '%s - %s' % (
                        start.strftime('%b %d %Y'),
                        end.strftime('%b %d %Y')
                )
                return (start_end_str, start_end_str_long, duration_str)
        # same year:
        if end.month != start.month:
                start_end_str = '%s-%s' % (
                        start.strftime('%b'),
                        end.strftime("%b'%y")
                )
                start_end_str_long = '%s - %s' % (
                        start.strftime('%b %d'),
                        end.strftime('%b %d %Y')
                )
                return (start_end_str, start_end_str_long, duration_str)

        # same year and same month
        start_end_str = start.strftime("%b'%y")
        start_end_str_long = start.strftime('%b %d %Y')
        return (start_end_str, start_end_str_long, duration_str)

#============================================================
class cEpisodeMatchProvider(gmMatchProvider.cMatchProvider_SQL2):
        """Find episodes for patient."""

        _SQL_episode_start = _SQL_best_guess_clinical_start_date_for_episode % {'pk': 'c_vpe.pk_episode'}
        _SQL_episode_end = _SQL_best_guess_clinical_end_date_for_episode % {'pk': 'c_vpe.pk_episode'}

        _SQL_open_episodes = """
                SELECT
                        c_vpe.pk_episode,
                        c_vpe.description
                                AS episode,
                        c_vpe.health_issue,
                        1 AS rank,
                        (%s) AS episode_start,
                        NULL::timestamp with time zone AS episode_end
                FROM
                        clin.v_pat_episodes c_vpe
                WHERE
                        c_vpe.episode_open IS TRUE
                                AND
                        c_vpe.description %%(fragment_condition)s
                        %%(ctxt_pat)s
        """ % _SQL_episode_start

        _SQL_closed_episodes = """
                SELECT
                        c_vpe.pk_episode,
                        c_vpe.description
                                AS episode,
                        c_vpe.health_issue,
                        2 AS rank,
                        (%s) AS episode_start,
                        (%s) AS episode_end
                FROM
                        clin.v_pat_episodes c_vpe
                WHERE
                        c_vpe.episode_open IS FALSE
                                AND
                        c_vpe.description %%(fragment_condition)s
                        %%(ctxt_pat)s
        """ % (
                _SQL_episode_start,
                _SQL_episode_end
        )

        #--------------------------------------------------------
        def __init__(self):
                query = """
                        (
                                %s
                        ) UNION ALL (
                                %s
                        )
                        ORDER BY rank, episode
                        LIMIT 30""" % (
                                cEpisodeMatchProvider._SQL_open_episodes,
                                cEpisodeMatchProvider._SQL_closed_episodes
                        )
                ctxt = {'ctxt_pat': {'where_part': 'AND pk_patient = %(pat)s', 'placeholder': 'pat'}}
                super().__init__(queries = [query], context = ctxt)

        #--------------------------------------------------------
        def _find_matches(self, fragment_condition):
                try:
                        pat = self._context_vals['pat']
                except KeyError:
                        pat = None
                if not pat:
                        # not patient, no search
                        return (False, [])

                return super()._find_matches(fragment_condition)

        #--------------------------------------------------------
        def _rows2matches(self, rows):
                matches = []
                for row in rows:
                        match = {
                                'weight': 0,
                                'data': row['pk_episode']
                        }
                        label = '%s (%s)%s' % (
                                row['episode'],
                                format_clinical_duration_of_episode (
                                        start = row['episode_start'],
                                        end = row['episode_end']
                                )[0],
                                gmTools.coalesce (
                                        row['health_issue'],
                                        '',
                                        ' - %s'
                                )
                        )
                        match['list_label'] = label
                        match['field_label'] = label
                        matches.append(match)
                return matches

#============================================================
# main - unit testing
#------------------------------------------------------------
if __name__ == '__main__':

        if len(sys.argv) < 2:
                sys.exit()

        if sys.argv[1] != 'test':
                sys.exit()

        gmI18N.activate_locale()
        gmI18N.install_domain('gnumed')

        #--------------------------------------------------------
        # define tests
        #--------------------------------------------------------
        def test_episode():
                print("episode test")
                gmPraxis.gmCurrentPraxisBranch.from_first_branch()
                for i in range(1,15):
                        print("------------")
                        episode = cEpisode(aPK_obj = i)#322) #1674) #1354) #1461) #1299)
                        #print(episode['description'])
                        #print(' start:', episode.best_guess_clinical_start_date)
                        #print(' end  :', episode.best_guess_clinical_end_date)
                        #print(' dura :', get_formatted_clinical_duration(pk_episode = i))
                        for line in episode.format_maximum_information():
                                print(line)
                        input('ENTER to continue')
                return

                print(episode)
                fields = episode.get_fields()
                for field in fields:
                        print(field, ':', episode[field])
                print("updatable:", episode.get_updatable_fields())
                input('ENTER to continue')

                desc = '1-%s' % episode['description']
                print("==> renaming to", desc)
                successful = episode.rename (
                        description = desc
                )
                if not successful:
                        print("error")
                else:
                        print("success")
                        for field in fields:
                                print(field, ':', episode[field])

                print(episode.formatted_revision_history)

                input('ENTER to continue')

        #--------------------------------------------------------
        def test_episode_codes():
                epi = cEpisode(aPK_obj = 2)
                print(epi)
                print(epi.generic_codes)
        #--------------------------------------------------------
        def test_episode_encounters():
                epi = cEpisode(aPK_obj = 1638)
                print(epi.format())

        #--------------------------------------------------------
        def test_cEpisodeMatchProvider():
                mp = cEpisodeMatchProvider()
                print(mp._find_matches('no patient'))

        #--------------------------------------------------------
        gmPG2.request_login_params(setup_pool = True)

        test_episode()
        #test_episode_encounters()
        #test_episode_codes()

Functions

def create_episode(pk_health_issue: int = None, episode_name: str = None, is_open: bool = False, allow_dupes: bool = False, encounter: int = None, link_obj=None)

Creates a new episode for a given patient's health issue.

pk_health_issue - given health issue PK episode_name - name of episode is_open - whether a new episode is to be open (don't tinker with existing ones)

Expand source code
def create_episode(pk_health_issue:int=None, episode_name:str=None, is_open:bool=False, allow_dupes:bool=False, encounter:int=None, link_obj=None):
        """Creates a new episode for a given patient's health issue.

        pk_health_issue - given health issue PK
        episode_name - name of episode
        is_open - whether a *new* episode is to be open (don't tinker with existing ones)
        """
        if not allow_dupes:
                try:
                        return cEpisode(name = episode_name, health_issue = pk_health_issue, encounter = encounter, link_obj = link_obj)

                except gmExceptions.ConstructorError:
                        pass
        queries = []
        SQL = "INSERT INTO clin.episode (fk_health_issue, description, is_open, fk_encounter) VALUES (%(pk_hi)s, %(epi_name)s, %(open)s::boolean, %(pk_enc)s)"
        args = {
                'pk_hi': pk_health_issue,
                'epi_name': episode_name,
                'open': is_open,
                'pk_enc': encounter
        }
        queries.append({'sql': SQL, 'args': args})
        queries.append({'sql': cEpisode._cmd_fetch_payload % "currval('clin.episode_pk_seq')"})
        rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = queries, return_data = True)
        return cEpisode(row = {'data': rows[0], 'pk_field': 'pk_episode'})
def delete_episode(episode=None)
Expand source code
def delete_episode(episode=None):
        if isinstance(episode, cEpisode):
                pk = episode['pk_episode']
        else:
                pk = int(episode)

        cmd = 'DELETE FROM clin.episode WHERE pk = %(pk)s'
        try:
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': {'pk': pk}}])
        except gmPG2.dbapi.IntegrityError:
                # should be parsing pgcode/and or error message
                _log.exception('cannot delete episode, it is in use')
                return False

        return True
def format_clinical_duration_of_episode(start=None, end=None)
Expand source code
def format_clinical_duration_of_episode(start=None, end=None):
        assert (start is not None), '<start> must not be None'

        if end is None:
                start_end_str = '%s-%s' % (
                        start.strftime("%b'%y"),
                        gmTools.u_ellipsis
                )
                start_end_str_long = '%s - %s' % (
                        start.strftime('%b %d %Y'),
                        gmTools.u_ellipsis
                )
                duration_str = _('%s so far') % gmDateTime.format_interval_medically(gmDateTime.pydt_now_here() - start)
                return (start_end_str, start_end_str_long, duration_str)

        duration_str = gmDateTime.format_interval_medically(end - start)
        # year different:
        if end.year != start.year:
                start_end_str = '%s-%s' % (
                        start.strftime("%b'%y"),
                        end.strftime("%b'%y")
                )
                start_end_str_long = '%s - %s' % (
                        start.strftime('%b %d %Y'),
                        end.strftime('%b %d %Y')
                )
                return (start_end_str, start_end_str_long, duration_str)
        # same year:
        if end.month != start.month:
                start_end_str = '%s-%s' % (
                        start.strftime('%b'),
                        end.strftime("%b'%y")
                )
                start_end_str_long = '%s - %s' % (
                        start.strftime('%b %d'),
                        end.strftime('%b %d %Y')
                )
                return (start_end_str, start_end_str_long, duration_str)

        # same year and same month
        start_end_str = start.strftime("%b'%y")
        start_end_str_long = start.strftime('%b %d %Y')
        return (start_end_str, start_end_str_long, duration_str)
def get_best_guess_clinical_end_date_for_episode(pk_episode=None)
Expand source code
def get_best_guess_clinical_end_date_for_episode(pk_episode=None):
        assert (pk_episode is not None), '<pk_episode> must not be None'
        query = {
                'sql': _SQL_best_guess_clinical_end_date_for_episode,
                'args': {'pk': pk_episode}
        }
        rows = gmPG2.run_ro_queries(queries = [query])
        return rows[0][0]
def get_best_guess_clinical_start_date_for_episode(pk_episode=None)
Expand source code
def get_best_guess_clinical_start_date_for_episode(pk_episode=None):
        assert (pk_episode is not None), '<pk_episode> must not be None'
        query = {
                'sql': _SQL_best_guess_clinical_start_date_for_episode,
                'args': {'pk': pk_episode}
        }
        rows = gmPG2.run_ro_queries(queries = [query])
        return rows[0][0]

Classes

class cEpisode (aPK_obj=None, id_patient=None, name='xxxDEFAULTxxx', health_issue=None, row=None, encounter=None, link_obj=None)

Represents one clinical episode.

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)

Args

aPK_obj
retrieve data from backend
  • an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
  • a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
must hold the fields
  • data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key column OR
  • pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    }
    
    rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
    objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
    
Expand source code
class cEpisode(gmBusinessDBObject.cBusinessDBObject):
        """Represents one clinical episode.
        """
        _cmd_fetch_payload = "select * from clin.v_pat_episodes where pk_episode=%s"
        _cmds_store_payload = [
                """update clin.episode set
                                fk_health_issue = %(pk_health_issue)s,
                                is_open = %(episode_open)s::boolean,
                                description = %(description)s,
                                summary = gm.nullify_empty_string(%(summary)s),
                                diagnostic_certainty_classification = gm.nullify_empty_string(%(diagnostic_certainty_classification)s)
                        where
                                pk = %(pk_episode)s and
                                xmin = %(xmin_episode)s""",
                """select xmin_episode from clin.v_pat_episodes where pk_episode = %(pk_episode)s"""
        ]
        _updatable_fields = [
                'pk_health_issue',
                'episode_open',
                'description',
                'summary',
                'diagnostic_certainty_classification'
        ]
        #--------------------------------------------------------
        def __init__(self, aPK_obj=None, id_patient=None, name='xxxDEFAULTxxx', health_issue=None, row=None, encounter=None, link_obj=None):
                pk = aPK_obj
                if pk is None and row is None:

                        where_parts = ['description = %(desc)s']

                        if id_patient is not None:
                                where_parts.append('pk_patient = %(pat)s')

                        if health_issue is not None:
                                where_parts.append('pk_health_issue = %(issue)s')

                        if encounter is not None:
                                where_parts.append('pk_patient = (SELECT fk_patient FROM clin.encounter WHERE pk = %(enc)s)')

                        args = {
                                'pat': id_patient,
                                'issue': health_issue,
                                'enc': encounter,
                                'desc': name
                        }

                        cmd = 'SELECT * FROM clin.v_pat_episodes WHERE %s' % ' AND '.join(where_parts)

                        rows = gmPG2.run_ro_queries (
                                link_obj = link_obj,
                                queries = [{'sql': cmd, 'args': args}],
                        )

                        if len(rows) == 0:
                                raise gmExceptions.NoSuchBusinessObjectError('no episode for [%s:%s:%s:%s]' % (id_patient, name, health_issue, encounter))

                        r = {'data': rows[0], 'pk_field': 'pk_episode'}
                        gmBusinessDBObject.cBusinessDBObject.__init__(self, row=r)

                else:
                        gmBusinessDBObject.cBusinessDBObject.__init__(self, aPK_obj=pk, row=row, link_obj = link_obj)

        #--------------------------------------------------------
        # external API
        #--------------------------------------------------------
        @classmethod
        def from_problem(cls, problem) -> 'cEpisode':
                """Initialize episode from episody-type problem."""
                if isinstance(problem, cEpisode):
                        return problem

                assert problem['type'] == 'episode', 'cannot convert [%s] to episode' % problem
                return cls(aPK_obj = problem['pk_episode'])

        #--------------------------------------------------------
        def get_patient(self):
                return self._payload['pk_patient']

        #--------------------------------------------------------
        def get_narrative(self, soap_cats=None, encounters=None, order_by = None):
                return gmClinNarrative.get_narrative (
                        soap_cats = soap_cats,
                        encounters = encounters,
                        episodes = [self.pk_obj],
                        order_by = order_by
                )

        #--------------------------------------------------------
        def rename(self, description=None):
                """Method for episode editing, that is, episode renaming.

                @param description
                        - the new descriptive name for the encounter
                @type description
                        - a string instance
                """
                # sanity check
                if description.strip() == '':
                        _log.error('<description> must be a non-empty string instance')
                        return False
                # update the episode description
                old_description = self._payload['description']
                self._payload['description'] = description.strip()
                self._is_modified = True
                successful, data = self.save_payload()
                if not successful:
                        _log.error('cannot rename episode [%s] to [%s]' % (self, description))
                        self._payload['description'] = old_description
                        return False
                return True

        #--------------------------------------------------------
        def add_code(self, pk_code=None):
                """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""

                if pk_code in self._payload['pk_generic_codes']:
                        return

                cmd = """
                        INSERT INTO clin.lnk_code2episode
                                (fk_item, fk_generic_code)
                        SELECT
                                %(item)s,
                                %(code)s
                        WHERE NOT EXISTS (
                                SELECT 1 FROM clin.lnk_code2episode
                                WHERE
                                        fk_item = %(item)s
                                                AND
                                        fk_generic_code = %(code)s
                        )"""
                args = {
                        'item': self._payload['pk_episode'],
                        'code': pk_code
                }
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return

        #--------------------------------------------------------
        def remove_code(self, pk_code=None):
                """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
                cmd = "DELETE FROM clin.lnk_code2episode WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
                args = {
                        'item': self._payload['pk_episode'],
                        'code': pk_code
                }
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return True

        #--------------------------------------------------------
        def format_as_journal(self, left_margin=0, date_format='%Y %b %d, %a'):
                rows = gmClinNarrative.get_as_journal (
                        episodes = [self.pk_obj],
                        order_by = 'pk_encounter, clin_when, scr, src_table'
                        #order_by = u'pk_encounter, scr, clin_when, src_table'
                )

                if len(rows) == 0:
                        return ''

                lines = []

                lines.append(_('Clinical data generated during encounters within this episode:'))

                left_margin = ' ' * left_margin

                prev_enc = None
                for row in rows:
                        if row['pk_encounter'] != prev_enc:
                                lines.append('')
                                prev_enc = row['pk_encounter']

                        when = row['clin_when'].strftime(date_format)
                        top_row = '%s%s %s (%s) %s' % (
                                gmTools.u_box_top_left_arc,
                                gmTools.u_box_horiz_single,
                                gmSoapDefs.soap_cat2l10n_str[row['real_soap_cat']],
                                when,
                                gmTools.u_box_horiz_single * 5
                        )
                        soap = gmTools.wrap (
                                text = row['narrative'],
                                width = 60,
                                initial_indent = '  ',
                                subsequent_indent = '  ' + left_margin
                        )
                        row_ver = ''
                        if row['row_version'] > 0:
                                row_ver = 'v%s: ' % row['row_version']
                        bottom_row = '%s%s %s, %s%s %s' % (
                                ' ' * 40,
                                gmTools.u_box_horiz_light_heavy,
                                row['modified_by'],
                                row_ver,
                                row['modified_when'].strftime(date_format),
                                gmTools.u_box_horiz_heavy_light
                        )

                        lines.append(top_row)
                        lines.append(soap)
                        lines.append(bottom_row)

                eol_w_margin = '\n%s' % left_margin
                return left_margin + eol_w_margin.join(lines) + '\n'

        #--------------------------------------------------------
        def format_maximum_information(self, patient=None):
                if patient is None:
                        from Gnumed.business.gmPerson import gmCurrentPatient, cPatient
                        if not gmCurrentPatient().connected:
                                patient = cPatient(self._payload['pk_patient'])
                        else:
                                if self._payload['pk_patient'] == gmCurrentPatient().ID:
                                        patient = gmCurrentPatient()
                                else:
                                        patient = cPatient(self._payload['pk_patient'])

                return self.format (
                        patient = patient,
                        with_summary = True,
                        with_codes = True,
                        with_encounters = True,
                        with_documents = True,
                        with_hospital_stays = True,
                        with_procedures = True,
                        with_family_history = True,
                        with_tests = False,             # does not inform on the episode itself
                        with_vaccinations = True,
                        with_health_issue = True,
                        return_list = True
                )

        #--------------------------------------------------------
        def __format_encounters(self, left_margin=0, emr=None):
                lines = []

                encs = emr.get_encounters(episodes = [self._payload['pk_episode']])
                if encs is None:
                        return [_('Error retrieving encounters for this episode.')]

                if len(encs) == 0:
                        #lines.append(_('There are no encounters for this issue.'))
                        return []

                first_encounter = emr.get_first_encounter(episode_id = self._payload['pk_episode'])
                last_encounter = emr.get_last_encounter(episode_id = self._payload['pk_episode'])
                lines.append(_('Last worked on: %s\n') % last_encounter['last_affirmed_original_tz'].strftime('%Y-%m-%d %H:%M'))
                if len(encs) < 4:
                        line = _('%s encounter(s) (%s - %s):')
                else:
                        line = _('1st and (up to 3) most recent (of %s) encounters (%s - %s):')
                lines.append(line % (
                        len(encs),
                        first_encounter['started'].strftime('%m/%Y'),
                        last_encounter['last_affirmed'].strftime('%m/%Y')
                ))
                lines.append(' %s - %s (%s):%s' % (
                        first_encounter['started_original_tz'].strftime('%Y-%m-%d %H:%M'),
                        first_encounter['last_affirmed_original_tz'].strftime('%H:%M'),
                        first_encounter['l10n_type'],
                        gmTools.coalesce (
                                first_encounter['assessment_of_encounter'],
                                gmTools.coalesce (
                                        first_encounter['reason_for_encounter'],
                                        '',
                                        ' \u00BB%s\u00AB' + (' (%s)' % _('RFE'))
                                ),
                                ' \u00BB%s\u00AB' + (' (%s)' % _('AOE'))
                        )
                ))
                if len(encs) > 4:
                        lines.append(_(' %s %s skipped %s') % (
                                gmTools.u_ellipsis,
                                (len(encs) - 4),
                                gmTools.u_ellipsis
                        ))
                for enc in encs[1:][-3:]:
                        lines.append(' %s - %s (%s):%s' % (
                                enc['started_original_tz'].strftime('%Y-%m-%d %H:%M'),
                                enc['last_affirmed_original_tz'].strftime('%H:%M'),
                                enc['l10n_type'],
                                gmTools.coalesce (
                                        enc['assessment_of_encounter'],
                                        gmTools.coalesce (
                                                enc['reason_for_encounter'],
                                                '',
                                                ' \u00BB%s\u00AB' + (' (%s)' % _('RFE'))
                                        ),
                                        ' \u00BB%s\u00AB' + (' (%s)' % _('AOE'))
                                )
                        ))
                # spell out last encounter
                if last_encounter:
                        lines.append('')
                        lines.append(_('Progress notes in most recent encounter:'))
                        lines.extend(last_encounter.format_soap (
                                episodes = [ self._payload['pk_episode'] ],
                                left_margin = left_margin,
                                soap_cats = 'soapu',
                                emr = emr
                        ))
                return lines

        #--------------------------------------------------------
        def format(self, left_margin=0, patient=None,
                with_summary=True,
                with_codes=True,
                with_encounters=True,
                with_documents=True,
                with_hospital_stays=True,
                with_procedures=True,
                with_family_history=True,
                with_tests=True,
                with_vaccinations=True,
                with_health_issue=False,
                return_list=False
        ):
                if patient:
                        if patient.ID != self._payload['pk_patient']:
                                msg = '<patient>.ID = %s but episode %s belongs to patient %s' % (
                                        patient.ID,
                                        self._payload['pk_episode'],
                                        self._payload['pk_patient']
                                )
                                raise ValueError(msg)
                        emr = patient.emr
                else:
                        with_encounters = False
                        with_documents = False
                        with_hospital_stays = False
                        with_procedures = False
                        with_family_history = False
                        with_tests = False
                        with_vaccinations = False

                lines = []

                # episode details
                lines.append (_('Episode %s%s%s   [#%s]') % (
                        gmTools.u_left_double_angle_quote,
                        self._payload['description'],
                        gmTools.u_right_double_angle_quote,
                        self._payload['pk_episode']
                ))

                from Gnumed.business.gmEncounter import cEncounter
                enc = cEncounter(aPK_obj = self._payload['pk_encounter'])
                lines.append (' ' + _('Created during encounter: %s (%s - %s)   [#%s]') % (
                        enc['l10n_type'],
                        enc['started_original_tz'].strftime('%Y-%m-%d %H:%M'),
                        enc['last_affirmed_original_tz'].strftime('%H:%M'),
                        self._payload['pk_encounter']
                ))

                if patient is not None:
                        range_str, range_str_verb, duration_str = self.formatted_clinical_duration
                        lines.append(_(' Duration: %s (%s)') % (duration_str, range_str_verb))

                from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
                lines.append(' ' + _('Status') + ': %s%s' % (
                        gmTools.bool2subst(self._payload['episode_open'], _('active'), _('finished')),
                        gmTools.coalesce (
                                value2test = diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification']),
                                return_instead = '',
                                template4value = ', %s',
                                none_equivalents = [None, '']
                        )
                ))

                if with_health_issue:
                        lines.append(' ' + _('Health issue') + ': %s' % gmTools.coalesce (
                                self._payload['health_issue'],
                                _('none associated')
                        ))

                if with_summary:
                        if self._payload['summary'] is not None:
                                lines.append(' %s:' % _('Synopsis'))
                                lines.append(gmTools.wrap (
                                                text = self._payload['summary'],
                                                width = 60,
                                                initial_indent = '  ',
                                                subsequent_indent = '  '
                                        )
                                )
                lines.append('')
                # encounters
                if with_encounters:
                        lines.extend(self.__format_encounters(left_margin = left_margin, emr = emr))
                # documents
                if with_documents:
                        doc_folder = patient.get_document_folder()
                        docs = doc_folder.get_documents (
                                pk_episodes = [ self._payload['pk_episode'] ]
                        )
                        if len(docs) > 0:
                                lines.append('')
                                lines.append(_('Documents: %s') % len(docs))
                        for d in docs:
                                lines.append(' ' + d.format(single_line = True))
                        del docs

                # hospitalizations
                if with_hospital_stays:
                        stays = emr.get_hospital_stays(episodes = [ self._payload['pk_episode'] ])
                        if len(stays) > 0:
                                lines.append('')
                                lines.append(_('Hospitalizations: %s') % len(stays))
                        for s in stays:
                                lines.append(s.format(left_margin = (left_margin + 1)))
                        del stays

                # procedures
                if with_procedures:
                        procs = emr.get_performed_procedures(episodes = [ self._payload['pk_episode'] ])
                        if len(procs) > 0:
                                lines.append('')
                                lines.append(_('Procedures performed: %s') % len(procs))
                        for p in procs:
                                lines.append(p.format (
                                        left_margin = (left_margin + 1),
                                        include_episode = False,
                                        include_codes = True
                                ))
                        del procs

                # family history
                if with_family_history:
                        fhx = emr.get_family_history(episodes = [ self._payload['pk_episode'] ])
                        if len(fhx) > 0:
                                lines.append('')
                                lines.append(_('Family History: %s') % len(fhx))
                        for f in fhx:
                                lines.append(f.format (
                                        left_margin = (left_margin + 1),
                                        include_episode = False,
                                        include_comment = True,
                                        include_codes = True
                                ))
                        del fhx

                # test results
                if with_tests:
                        tests = emr.get_test_results_by_date(episodes = [ self._payload['pk_episode'] ])
                        if len(tests) > 0:
                                lines.append('')
                                lines.append(_('Measurements and Results:'))
                        for t in tests:
                                lines.append(' ' + t.format_concisely(date_format = '%Y %b %d', with_notes = True))
                        del tests

                # vaccinations
                if with_vaccinations:
                        vaccs = emr.get_vaccinations (
                                episodes = [ self._payload['pk_episode'] ],
                                order_by = 'date_given DESC, vaccine'
                        )
                        if len(vaccs) > 0:
                                lines.append('')
                                lines.append(_('Vaccinations:'))
                        for vacc in vaccs:
                                lines.extend(vacc.format (
                                        with_indications = True,
                                        with_comment = True,
                                        with_reaction = True,
                                        date_format = '%Y-%m-%d'
                                ))
                        del vaccs

                lines = gmTools.strip_trailing_empty_lines(lines = lines, eol = '\n')
                if return_list:
                        return lines

                left_margin = ' ' * left_margin
                eol_w_margin = '\n%s' % left_margin
                return left_margin + eol_w_margin.join(lines) + '\n'

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def _get_best_guess_clinical_start_date(self):
                return get_best_guess_clinical_start_date_for_episode(pk_episode = self.pk_obj)

        best_guess_clinical_start_date = property(_get_best_guess_clinical_start_date)

        #--------------------------------------------------------
        def _get_best_guess_clinical_end_date(self):
                return get_best_guess_clinical_end_date_for_episode(self.pk_obj)

        best_guess_clinical_end_date = property(_get_best_guess_clinical_end_date)

        #--------------------------------------------------------
        def _get_formatted_clinical_duration(self):
                return format_clinical_duration_of_episode (
                        start = get_best_guess_clinical_start_date_for_episode(self.pk_obj),
                        end = get_best_guess_clinical_end_date_for_episode(self.pk_obj)
                )

        formatted_clinical_duration = property(_get_formatted_clinical_duration)

        #--------------------------------------------------------
        def _get_latest_access_date(self):
                cmd = """SELECT MAX(latest) FROM (
                        -- last modification, latest = when last changed to the current state
                        (SELECT c_epi.modified_when AS latest, 'clin.episode.modified_when'::text AS candidate FROM clin.episode c_epi WHERE c_epi.pk = %(pk)s)

                                UNION ALL

                        -- last modification of encounter in which created, latest = initial creation of that encounter
                        -- DO NOT USE: just because one corrects a typo does not mean the episode took longer
                        --(SELECT c_enc.modified_when AS latest FROM clin.encounter c_enc WHERE c_enc.pk = (
                        --      SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s
                        --))

                        -- end of encounter in which created, latest = explicitly set
                        -- DO NOT USE: we can retrospectively create episodes which
                        -- DO NOT USE: are long since finished
                        --(SELECT c_enc.last_affirmed AS latest FROM clin.encounter c_enc WHERE c_enc.pk = (
                        --      SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s
                        --))

                        -- latest end of encounters of clinical items linked to this episode
                        (SELECT
                                MAX(last_affirmed) AS latest,
                                'clin.episode.pk = clin.clin_root_item,fk_episode -> .fk_encounter.last_affirmed'::text AS candidate
                         FROM clin.encounter
                         WHERE pk IN (
                                SELECT fk_encounter FROM clin.clin_root_item WHERE fk_episode = %(pk)s
                        ))
                                UNION ALL

                        -- latest explicit .clin_when of clinical items linked to this episode
                        (SELECT
                                MAX(clin_when) AS latest,
                                'clin.episode.pk = clin.clin_root_item,fk_episode -> .clin_when'::text AS candidate
                         FROM clin.clin_root_item
                         WHERE fk_episode = %(pk)s
                        )

                        -- latest modification time of clinical items linked to this episode
                        -- this CAN be used since if an item is linked to an episode it can be
                        -- assumed the episode (should have) existed at the time of creation
                        -- DO NOT USE, because typo fixes should not extend the episode
                        --(SELECT MIN(modified_when) AS latest FROM clin.clin_root_item WHERE fk_episode = %(pk)s)

                        -- not sure about this one:
                        -- .pk -> clin.clin_root_item.fk_encounter.modified_when

                ) AS candidates"""
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': {'pk': self.pk_obj}}])
                return rows[0][0]

        latest_access_date = property(_get_latest_access_date)

        #--------------------------------------------------------
        def _get_diagnostic_certainty_description(self):
                from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
                return diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification'])

        diagnostic_certainty_description = property(_get_diagnostic_certainty_description)

        #--------------------------------------------------------
        def _get_formatted_revision_history(self):
                cmd = """SELECT
                                '<N/A>'::TEXT as audit__action_applied,
                                NULL AS audit__action_when,
                                '<N/A>'::TEXT AS audit__action_by,
                                pk_audit,
                                row_version,
                                modified_when,
                                modified_by,
                                pk, fk_health_issue, description, is_open, fk_encounter,
                                diagnostic_certainty_classification,
                                summary
                        FROM clin.episode
                        WHERE pk = %(pk_episode)s
                UNION ALL (
                        SELECT
                                audit_action as audit__action_applied,
                                audit_when as audit__action_when,
                                audit_by as audit__action_by,
                                pk_audit,
                                orig_version as row_version,
                                orig_when as modified_when,
                                orig_by as modified_by,
                                pk, fk_health_issue, description, is_open, fk_encounter,
                                diagnostic_certainty_classification,
                                summary
                        FROM audit.log_episode
                        WHERE pk = %(pk_episode)s
                )
                ORDER BY row_version DESC
                """
                args = {'pk_episode': self.pk_obj}
                title = _('Episode: %s%s%s') % (
                        gmTools.u_left_double_angle_quote,
                        self._payload['description'],
                        gmTools.u_right_double_angle_quote
                )
                return '\n'.join(self._get_revision_history(cmd, args, title))

        formatted_revision_history = property(_get_formatted_revision_history)

        #--------------------------------------------------------
        def _get_generic_codes(self):
                if len(self._payload['pk_generic_codes']) == 0:
                        return []

                cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
                args = {'pks': self._payload['pk_generic_codes']}
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]

        def _set_generic_codes(self, pk_codes):
                queries = []
                # remove all codes
                if len(self._payload['pk_generic_codes']) > 0:
                        queries.append ({
                                'sql': 'DELETE FROM clin.lnk_code2episode WHERE fk_item = %(epi)s AND fk_generic_code = ANY(%(codes)s)',
                                'args': {
                                        'epi': self._payload['pk_episode'],
                                        'codes': self._payload['pk_generic_codes']
                                }
                        })
                # add new codes
                for pk_code in pk_codes:
                        queries.append ({
                                'sql': 'INSERT INTO clin.lnk_code2episode (fk_item, fk_generic_code) VALUES (%(epi)s, %(pk_code)s)',
                                'args': {
                                        'epi': self._payload['pk_episode'],
                                        'pk_code': pk_code
                                }
                        })
                if len(queries) == 0:
                        return
                # run it all in one transaction
                gmPG2.run_rw_queries(queries = queries)
                return

        generic_codes = property(_get_generic_codes, _set_generic_codes)

        #--------------------------------------------------------
        def _get_has_narrative(self):
                cmd = """SELECT EXISTS (
                        SELECT 1 FROM clin.clin_narrative
                        WHERE
                                fk_episode = %(epi)s
                                        AND
                                fk_encounter IN (
                                        SELECT pk FROM clin.encounter WHERE fk_patient = %(pat)s
                                )
                )"""
                args = {
                        'pat': self._payload['pk_patient'],
                        'epi': self._payload['pk_episode']
                }
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return rows[0][0]

        has_narrative = property(_get_has_narrative)

        #--------------------------------------------------------
        def _get_health_issue(self):
                if self._payload['pk_health_issue'] is None:
                        return None

                from Gnumed.business.gmHealthIssue import cHealthIssue
                return cHealthIssue(self._payload['pk_health_issue'])

        health_issue = property(_get_health_issue)

Ancestors

Static methods

def from_problem(problem) ‑> cEpisode

Initialize episode from episody-type problem.

Expand source code
@classmethod
def from_problem(cls, problem) -> 'cEpisode':
        """Initialize episode from episody-type problem."""
        if isinstance(problem, cEpisode):
                return problem

        assert problem['type'] == 'episode', 'cannot convert [%s] to episode' % problem
        return cls(aPK_obj = problem['pk_episode'])

Instance variables

var best_guess_clinical_end_date
Expand source code
def _get_best_guess_clinical_end_date(self):
        return get_best_guess_clinical_end_date_for_episode(self.pk_obj)
var best_guess_clinical_start_date
Expand source code
def _get_best_guess_clinical_start_date(self):
        return get_best_guess_clinical_start_date_for_episode(pk_episode = self.pk_obj)
var diagnostic_certainty_description
Expand source code
def _get_diagnostic_certainty_description(self):
        from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
        return diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification'])
var formatted_clinical_duration
Expand source code
def _get_formatted_clinical_duration(self):
        return format_clinical_duration_of_episode (
                start = get_best_guess_clinical_start_date_for_episode(self.pk_obj),
                end = get_best_guess_clinical_end_date_for_episode(self.pk_obj)
        )
var formatted_revision_history
Expand source code
def _get_formatted_revision_history(self):
        cmd = """SELECT
                        '<N/A>'::TEXT as audit__action_applied,
                        NULL AS audit__action_when,
                        '<N/A>'::TEXT AS audit__action_by,
                        pk_audit,
                        row_version,
                        modified_when,
                        modified_by,
                        pk, fk_health_issue, description, is_open, fk_encounter,
                        diagnostic_certainty_classification,
                        summary
                FROM clin.episode
                WHERE pk = %(pk_episode)s
        UNION ALL (
                SELECT
                        audit_action as audit__action_applied,
                        audit_when as audit__action_when,
                        audit_by as audit__action_by,
                        pk_audit,
                        orig_version as row_version,
                        orig_when as modified_when,
                        orig_by as modified_by,
                        pk, fk_health_issue, description, is_open, fk_encounter,
                        diagnostic_certainty_classification,
                        summary
                FROM audit.log_episode
                WHERE pk = %(pk_episode)s
        )
        ORDER BY row_version DESC
        """
        args = {'pk_episode': self.pk_obj}
        title = _('Episode: %s%s%s') % (
                gmTools.u_left_double_angle_quote,
                self._payload['description'],
                gmTools.u_right_double_angle_quote
        )
        return '\n'.join(self._get_revision_history(cmd, args, title))
var generic_codes
Expand source code
def _get_generic_codes(self):
        if len(self._payload['pk_generic_codes']) == 0:
                return []

        cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
        args = {'pks': self._payload['pk_generic_codes']}
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]
var has_narrative
Expand source code
def _get_has_narrative(self):
        cmd = """SELECT EXISTS (
                SELECT 1 FROM clin.clin_narrative
                WHERE
                        fk_episode = %(epi)s
                                AND
                        fk_encounter IN (
                                SELECT pk FROM clin.encounter WHERE fk_patient = %(pat)s
                        )
        )"""
        args = {
                'pat': self._payload['pk_patient'],
                'epi': self._payload['pk_episode']
        }
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        return rows[0][0]
var health_issue
Expand source code
def _get_health_issue(self):
        if self._payload['pk_health_issue'] is None:
                return None

        from Gnumed.business.gmHealthIssue import cHealthIssue
        return cHealthIssue(self._payload['pk_health_issue'])
var latest_access_date
Expand source code
def _get_latest_access_date(self):
        cmd = """SELECT MAX(latest) FROM (
                -- last modification, latest = when last changed to the current state
                (SELECT c_epi.modified_when AS latest, 'clin.episode.modified_when'::text AS candidate FROM clin.episode c_epi WHERE c_epi.pk = %(pk)s)

                        UNION ALL

                -- last modification of encounter in which created, latest = initial creation of that encounter
                -- DO NOT USE: just because one corrects a typo does not mean the episode took longer
                --(SELECT c_enc.modified_when AS latest FROM clin.encounter c_enc WHERE c_enc.pk = (
                --      SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s
                --))

                -- end of encounter in which created, latest = explicitly set
                -- DO NOT USE: we can retrospectively create episodes which
                -- DO NOT USE: are long since finished
                --(SELECT c_enc.last_affirmed AS latest FROM clin.encounter c_enc WHERE c_enc.pk = (
                --      SELECT fk_encounter FROM clin.episode WHERE pk = %(pk)s
                --))

                -- latest end of encounters of clinical items linked to this episode
                (SELECT
                        MAX(last_affirmed) AS latest,
                        'clin.episode.pk = clin.clin_root_item,fk_episode -> .fk_encounter.last_affirmed'::text AS candidate
                 FROM clin.encounter
                 WHERE pk IN (
                        SELECT fk_encounter FROM clin.clin_root_item WHERE fk_episode = %(pk)s
                ))
                        UNION ALL

                -- latest explicit .clin_when of clinical items linked to this episode
                (SELECT
                        MAX(clin_when) AS latest,
                        'clin.episode.pk = clin.clin_root_item,fk_episode -> .clin_when'::text AS candidate
                 FROM clin.clin_root_item
                 WHERE fk_episode = %(pk)s
                )

                -- latest modification time of clinical items linked to this episode
                -- this CAN be used since if an item is linked to an episode it can be
                -- assumed the episode (should have) existed at the time of creation
                -- DO NOT USE, because typo fixes should not extend the episode
                --(SELECT MIN(modified_when) AS latest FROM clin.clin_root_item WHERE fk_episode = %(pk)s)

                -- not sure about this one:
                -- .pk -> clin.clin_root_item.fk_encounter.modified_when

        ) AS candidates"""
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': {'pk': self.pk_obj}}])
        return rows[0][0]

Methods

def add_code(self, pk_code=None)

must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)

Expand source code
def add_code(self, pk_code=None):
        """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""

        if pk_code in self._payload['pk_generic_codes']:
                return

        cmd = """
                INSERT INTO clin.lnk_code2episode
                        (fk_item, fk_generic_code)
                SELECT
                        %(item)s,
                        %(code)s
                WHERE NOT EXISTS (
                        SELECT 1 FROM clin.lnk_code2episode
                        WHERE
                                fk_item = %(item)s
                                        AND
                                fk_generic_code = %(code)s
                )"""
        args = {
                'item': self._payload['pk_episode'],
                'code': pk_code
        }
        gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
        return
def format_as_journal(self, left_margin=0, date_format='%Y %b %d, %a')
Expand source code
def format_as_journal(self, left_margin=0, date_format='%Y %b %d, %a'):
        rows = gmClinNarrative.get_as_journal (
                episodes = [self.pk_obj],
                order_by = 'pk_encounter, clin_when, scr, src_table'
                #order_by = u'pk_encounter, scr, clin_when, src_table'
        )

        if len(rows) == 0:
                return ''

        lines = []

        lines.append(_('Clinical data generated during encounters within this episode:'))

        left_margin = ' ' * left_margin

        prev_enc = None
        for row in rows:
                if row['pk_encounter'] != prev_enc:
                        lines.append('')
                        prev_enc = row['pk_encounter']

                when = row['clin_when'].strftime(date_format)
                top_row = '%s%s %s (%s) %s' % (
                        gmTools.u_box_top_left_arc,
                        gmTools.u_box_horiz_single,
                        gmSoapDefs.soap_cat2l10n_str[row['real_soap_cat']],
                        when,
                        gmTools.u_box_horiz_single * 5
                )
                soap = gmTools.wrap (
                        text = row['narrative'],
                        width = 60,
                        initial_indent = '  ',
                        subsequent_indent = '  ' + left_margin
                )
                row_ver = ''
                if row['row_version'] > 0:
                        row_ver = 'v%s: ' % row['row_version']
                bottom_row = '%s%s %s, %s%s %s' % (
                        ' ' * 40,
                        gmTools.u_box_horiz_light_heavy,
                        row['modified_by'],
                        row_ver,
                        row['modified_when'].strftime(date_format),
                        gmTools.u_box_horiz_heavy_light
                )

                lines.append(top_row)
                lines.append(soap)
                lines.append(bottom_row)

        eol_w_margin = '\n%s' % left_margin
        return left_margin + eol_w_margin.join(lines) + '\n'
def format_maximum_information(self, patient=None)
Expand source code
def format_maximum_information(self, patient=None):
        if patient is None:
                from Gnumed.business.gmPerson import gmCurrentPatient, cPatient
                if not gmCurrentPatient().connected:
                        patient = cPatient(self._payload['pk_patient'])
                else:
                        if self._payload['pk_patient'] == gmCurrentPatient().ID:
                                patient = gmCurrentPatient()
                        else:
                                patient = cPatient(self._payload['pk_patient'])

        return self.format (
                patient = patient,
                with_summary = True,
                with_codes = True,
                with_encounters = True,
                with_documents = True,
                with_hospital_stays = True,
                with_procedures = True,
                with_family_history = True,
                with_tests = False,             # does not inform on the episode itself
                with_vaccinations = True,
                with_health_issue = True,
                return_list = True
        )
def get_narrative(self, soap_cats=None, encounters=None, order_by=None)
Expand source code
def get_narrative(self, soap_cats=None, encounters=None, order_by = None):
        return gmClinNarrative.get_narrative (
                soap_cats = soap_cats,
                encounters = encounters,
                episodes = [self.pk_obj],
                order_by = order_by
        )
def remove_code(self, pk_code=None)

must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)

Expand source code
def remove_code(self, pk_code=None):
        """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
        cmd = "DELETE FROM clin.lnk_code2episode WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
        args = {
                'item': self._payload['pk_episode'],
                'code': pk_code
        }
        gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
        return True
def rename(self, description=None)

Method for episode editing, that is, episode renaming.

@param description - the new descriptive name for the encounter @type description - a string instance

Expand source code
def rename(self, description=None):
        """Method for episode editing, that is, episode renaming.

        @param description
                - the new descriptive name for the encounter
        @type description
                - a string instance
        """
        # sanity check
        if description.strip() == '':
                _log.error('<description> must be a non-empty string instance')
                return False
        # update the episode description
        old_description = self._payload['description']
        self._payload['description'] = description.strip()
        self._is_modified = True
        successful, data = self.save_payload()
        if not successful:
                _log.error('cannot rename episode [%s] to [%s]' % (self, description))
                self._payload['description'] = old_description
                return False
        return True

Inherited members

class cEpisodeMatchProvider

Find episodes for patient.

Expand source code
class cEpisodeMatchProvider(gmMatchProvider.cMatchProvider_SQL2):
        """Find episodes for patient."""

        _SQL_episode_start = _SQL_best_guess_clinical_start_date_for_episode % {'pk': 'c_vpe.pk_episode'}
        _SQL_episode_end = _SQL_best_guess_clinical_end_date_for_episode % {'pk': 'c_vpe.pk_episode'}

        _SQL_open_episodes = """
                SELECT
                        c_vpe.pk_episode,
                        c_vpe.description
                                AS episode,
                        c_vpe.health_issue,
                        1 AS rank,
                        (%s) AS episode_start,
                        NULL::timestamp with time zone AS episode_end
                FROM
                        clin.v_pat_episodes c_vpe
                WHERE
                        c_vpe.episode_open IS TRUE
                                AND
                        c_vpe.description %%(fragment_condition)s
                        %%(ctxt_pat)s
        """ % _SQL_episode_start

        _SQL_closed_episodes = """
                SELECT
                        c_vpe.pk_episode,
                        c_vpe.description
                                AS episode,
                        c_vpe.health_issue,
                        2 AS rank,
                        (%s) AS episode_start,
                        (%s) AS episode_end
                FROM
                        clin.v_pat_episodes c_vpe
                WHERE
                        c_vpe.episode_open IS FALSE
                                AND
                        c_vpe.description %%(fragment_condition)s
                        %%(ctxt_pat)s
        """ % (
                _SQL_episode_start,
                _SQL_episode_end
        )

        #--------------------------------------------------------
        def __init__(self):
                query = """
                        (
                                %s
                        ) UNION ALL (
                                %s
                        )
                        ORDER BY rank, episode
                        LIMIT 30""" % (
                                cEpisodeMatchProvider._SQL_open_episodes,
                                cEpisodeMatchProvider._SQL_closed_episodes
                        )
                ctxt = {'ctxt_pat': {'where_part': 'AND pk_patient = %(pat)s', 'placeholder': 'pat'}}
                super().__init__(queries = [query], context = ctxt)

        #--------------------------------------------------------
        def _find_matches(self, fragment_condition):
                try:
                        pat = self._context_vals['pat']
                except KeyError:
                        pat = None
                if not pat:
                        # not patient, no search
                        return (False, [])

                return super()._find_matches(fragment_condition)

        #--------------------------------------------------------
        def _rows2matches(self, rows):
                matches = []
                for row in rows:
                        match = {
                                'weight': 0,
                                'data': row['pk_episode']
                        }
                        label = '%s (%s)%s' % (
                                row['episode'],
                                format_clinical_duration_of_episode (
                                        start = row['episode_start'],
                                        end = row['episode_end']
                                )[0],
                                gmTools.coalesce (
                                        row['health_issue'],
                                        '',
                                        ' - %s'
                                )
                        )
                        match['list_label'] = label
                        match['field_label'] = label
                        matches.append(match)
                return matches

Ancestors

Inherited members