Module Gnumed.business.gmEncounter

GNUmed clinical encounter handling.

license: GPL v2 or later

Expand source code
# -*- coding: utf-8 -*-
"""GNUmed clinical encounter handling.

license: GPL v2 or later
"""
#============================================================
__author__ = "Carlos Moro <cfmoro1976@yahoo.es>, <karsten.hilbert@gmx.net>"

import sys
import logging
import inspect


if __name__ == '__main__':
        sys.path.insert(0, '../../')
        _ = lambda x:x
from Gnumed.pycommon import gmPG2
from Gnumed.pycommon import gmI18N
from Gnumed.pycommon import gmTools
from Gnumed.pycommon import gmDateTime
from Gnumed.pycommon import gmBusinessDBObject

from Gnumed.business import gmSoapDefs
from Gnumed.business import gmCoding
from Gnumed.business import gmPraxis
from Gnumed.business import gmOrganization


_log = logging.getLogger('gm.emr.enc')
#============================================================
SQL_get_encounters = "SELECT * FROM clin.v_pat_encounters WHERE %s"

class cEncounter(gmBusinessDBObject.cBusinessDBObject):
        """Represents one encounter."""

        _cmd_fetch_payload = SQL_get_encounters % 'pk_encounter = %s'
        _cmds_store_payload = [
                """UPDATE clin.encounter SET
                                started = %(started)s,
                                last_affirmed = %(last_affirmed)s,
                                fk_location = %(pk_org_unit)s,
                                fk_type = %(pk_type)s,
                                reason_for_encounter = gm.nullify_empty_string(%(reason_for_encounter)s),
                                assessment_of_encounter = gm.nullify_empty_string(%(assessment_of_encounter)s)
                        WHERE
                                pk = %(pk_encounter)s AND
                                xmin = %(xmin_encounter)s
                        """,
                # need to return all fields so we can survive in-place upgrades
                "SELECT * FROM clin.v_pat_encounters WHERE pk_encounter = %(pk_encounter)s"
        ]
        _updatable_fields = [
                'started',
                'last_affirmed',
                'pk_org_unit',
                'pk_type',
                'reason_for_encounter',
                'assessment_of_encounter'
        ]
        #--------------------------------------------------------
        def set_active(self):
                """Set the encounter as the active one.

                "Setting active" means making sure the encounter
                row has the youngest "last_affirmed" timestamp of
                all encounter rows for this patient.
                """
                self['last_affirmed'] = gmDateTime.pydt_now_here()
                self.save()

        #--------------------------------------------------------
        def lock(self, exclusive=False, link_obj=None):
                return lock_encounter(self.pk_obj, exclusive = exclusive, link_obj = link_obj)

        #--------------------------------------------------------
        def unlock(self, exclusive=False, link_obj=None):
                return unlock_encounter(self.pk_obj, exclusive = exclusive, link_obj = link_obj)

        #--------------------------------------------------------
        def transfer_clinical_data(self, source_episode=None, target_episode=None):
                """
                Moves every element currently linked to the current encounter
                and the source_episode onto target_episode.

                @param source_episode The episode the elements are currently linked to.
                @type target_episode A cEpisode intance.
                @param target_episode The episode the elements will be relinked to.
                @type target_episode A cEpisode intance.
                """
                if source_episode['pk_episode'] == target_episode['pk_episode']:
                        return True

                SQL = """
                        UPDATE clin.clin_root_item
                        SET fk_episode = %(trg)s
                        WHERE
                                fk_encounter = %(enc)s AND
                                fk_episode = %(src)s
                        """
                args = {
                        'trg': target_episode['pk_episode'],
                        'enc': self.pk_obj,
                        'src': source_episode['pk_episode']
                }
                gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
                self.refetch_payload()
                return True

        #--------------------------------------------------------
        def transfer_all_data_to_another_encounter(self, pk_target_encounter=None):
                if pk_target_encounter == self.pk_obj:
                        return True

                SQL = "SELECT clin.transfer_all_encounter_data(%(src)s, %(trg)s)"
                args = {
                        'src': self.pk_obj,
                        'trg': pk_target_encounter
                }
                gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
                return True

        #--------------------------------------------------------
        def same_payload(self, another_object=None):

                relevant_fields = [
                        'pk_org_unit',
                        'pk_type',
                        'pk_patient',
                        'reason_for_encounter',
                        'assessment_of_encounter'
                ]
                for field in relevant_fields:
                        if self._payload[field] != another_object[field]:
                                _log.debug('mismatch on [%s]: "%s" vs. "%s"', field, self._payload[field], another_object[field])
                                return False

                relevant_fields = [
                        'started',
                        'last_affirmed',
                ]
                for field in relevant_fields:
                        if self._payload[field] is None:
                                if another_object[field] is None:
                                        continue
                                _log.debug('mismatch on [%s]: here="%s", other="%s"', field, self._payload[field], another_object[field])
                                return False

                        if another_object[field] is None:
                                return False

                        # compares at seconds granularity
                        if self._payload[field].strftime('%Y-%m-%d %H:%M:%S') != another_object[field].strftime('%Y-%m-%d %H:%M:%S'):
                                _log.debug('mismatch on [%s]: here="%s", other="%s"', field, self._payload[field], another_object[field])
                                return False

                # compare codes
                # 1) RFE
                if another_object['pk_generic_codes_rfe'] is None:
                        if self._payload['pk_generic_codes_rfe'] is not None:
                                return False
                if another_object['pk_generic_codes_rfe'] is not None:
                        if self._payload['pk_generic_codes_rfe'] is None:
                                return False
                if (
                        (another_object['pk_generic_codes_rfe'] is None)
                                and
                        (self._payload['pk_generic_codes_rfe'] is None)
                ) is False:
                        if set(another_object['pk_generic_codes_rfe']) != set(self._payload['pk_generic_codes_rfe']):
                                return False
                # 2) AOE
                if another_object['pk_generic_codes_aoe'] is None:
                        if self._payload['pk_generic_codes_aoe'] is not None:
                                return False
                if another_object['pk_generic_codes_aoe'] is not None:
                        if self._payload['pk_generic_codes_aoe'] is None:
                                return False
                if (
                        (another_object['pk_generic_codes_aoe'] is None)
                                and
                        (self._payload['pk_generic_codes_aoe'] is None)
                ) is False:
                        if set(another_object['pk_generic_codes_aoe']) != set(self._payload['pk_generic_codes_aoe']):
                                return False

                return True

        #--------------------------------------------------------
        def has_clinical_data(self):
                SQL = """SELECT EXISTS (
                        SELECT 1 FROM clin.v_pat_items WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s
                                UNION ALL
                        SELECT 1 FROM blobs.v_doc_med WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s
                )"""
                args = {
                        'pat': self._payload['pk_patient'],
                        'enc': self.pk_obj
                }
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def has_narrative(self):
                SQL = "SELECT EXISTS (SELECT 1 FROM clin.v_pat_items WHERE pk_patient=%(pat)s and pk_encounter=%(enc)s)"
                args = {
                        'pat': self._payload['pk_patient'],
                        'enc': self.pk_obj
                }
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL,     'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def has_soap_narrative(self, soap_cats=None):
                """soap_cats: <space> = admin category"""

                if soap_cats is None:
                        soap_cats = 'soap '
                else:
                        soap_cats = soap_cats.casefold()

                cats = []
                for cat in soap_cats:
                        if cat in 'soapu':
                                cats.append(cat)
                                continue
                        if cat == ' ':
                                cats.append(None)
                SQL = """SELECT EXISTS (
                        SELECT 1 FROM clin.clin_narrative
                        WHERE
                                fk_encounter = %(enc)s
                                        AND
                                soap_cat = ANY(%(cats)s)
                        LIMIT 1
                )"""
                args = {'enc': self._payload['pk_encounter'], 'cats': cats}
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def has_documents(self):
                SQL = "SELECT EXISTS (SELECT 1 FROM blobs.v_doc_med WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s)"
                args = {
                        'pat': self._payload['pk_patient'],
                        'enc': self.pk_obj
                }
                rows = gmPG2.run_ro_queries (queries = [{'sql': SQL,'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def get_latest_soap(self, soap_cat=None, episode=None):

                if soap_cat is not None:
                        soap_cat = soap_cat.casefold()
                if episode is None:
                        epi_part = 'fk_episode is null'
                else:
                        epi_part = 'fk_episode = %(epi)s'
                SQL = """
                        SELECT narrative
                        FROM clin.clin_narrative
                        where
                                fk_encounter = %%(enc)s
                                        and
                                soap_cat = %%(cat)s
                                        and
                                %s
                        order by clin_when desc
                        limit 1""" % epi_part
                args = {'enc': self.pk_obj, 'cat': soap_cat, 'epi': episode}
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                if len(rows) == 0:
                        return None

                return rows[0][0]
        #--------------------------------------------------------
        def get_episodes(self, exclude=None):
                """exclude: list of episode PKs to exclude"""
                args = {'enc': self.pk_obj}
                extra_where_parts = ''
                if exclude is not None:
                        extra_where_parts = 'AND pk_episode <> ALL(%(excluded_pks)s)'
                        args['excluded_pks'] = exclude
                SQL = """
                        SELECT * FROM clin.v_pat_episodes
                        WHERE pk_episode IN (
                                        SELECT DISTINCT fk_episode
                                        FROM clin.clin_root_item
                                        WHERE fk_encounter = %%(enc)s

                                                UNION

                                        SELECT DISTINCT fk_episode
                                        FROM blobs.doc_med
                                        WHERE fk_encounter = %%(enc)s
                        ) %s""" % extra_where_parts
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                from Gnumed.business.gmEpisode import cEpisode
                return [ cEpisode(row = {'data': r, 'pk_field': 'pk_episode'})  for r in rows ]

        episodes = property(get_episodes)

        #--------------------------------------------------------
        def add_code(self, pk_code=None, field=None):
                """<pk_code> must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
                if field == 'rfe':
                        cmd = "INSERT INTO clin.lnk_code2rfe (fk_item, fk_generic_code) VALUES (%(item)s, %(code)s)"
                elif field == 'aoe':
                        cmd = "INSERT INTO clin.lnk_code2aoe (fk_item, fk_generic_code) VALUES (%(item)s, %(code)s)"
                else:
                        raise ValueError('<field> must be one of "rfe" or "aoe", not "%s"', field)
                args = {
                        'item': self._payload['pk_encounter'],
                        'code': pk_code
                }
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return True

        #--------------------------------------------------------
        def remove_code(self, pk_code=None, field=None):
                """<pk_code> must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
                if field == 'rfe':
                        cmd = "DELETE FROM clin.lnk_code2rfe WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
                elif field == 'aoe':
                        cmd = "DELETE FROM clin.lnk_code2aoe WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
                else:
                        raise ValueError('<field> must be one of "rfe" or "aoe", not "%s"', field)
                args = {
                        'item': self._payload['pk_encounter'],
                        'code': pk_code
                }
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return True

        #--------------------------------------------------------
        # data formatting
        #--------------------------------------------------------
        def format_soap(self, episodes=None, left_margin=0, soap_cats='soapu', emr=None, issues=None):

                lines = []
                for soap_cat in gmSoapDefs.soap_cats_str2list(soap_cats):
                        soap_cat_narratives = emr.get_clin_narrative (
                                episodes = episodes,
                                issues = issues,
                                encounters = [self._payload['pk_encounter']],
                                soap_cats = [soap_cat]
                        )
                        if soap_cat_narratives is None:
                                continue
                        if len(soap_cat_narratives) == 0:
                                continue

                        lines.append('%s%s %s %s' % (
                                gmTools.u_box_top_left_arc,
                                gmTools.u_box_horiz_single,
                                gmSoapDefs.soap_cat2l10n_str[soap_cat],
                                gmTools.u_box_horiz_single * 5
                        ))
                        for soap_entry in soap_cat_narratives:
                                txt = gmTools.wrap (
                                        text = soap_entry['narrative'],
                                        width = 75,
                                        initial_indent = '',
                                        subsequent_indent = (' ' * left_margin)
                                )
                                lines.append(txt)
                                txt = '%s%s %.8s, %s %s' % (
                                        ' ' * 40,
                                        gmTools.u_box_horiz_light_heavy,
                                        soap_entry['modified_by'],
                                        soap_entry['date'].strftime('%Y-%m-%d %H:%M'),
                                        gmTools.u_box_horiz_heavy_light
                                )
                                lines.append(txt)
                                lines.append('')

                return lines

        #--------------------------------------------------------
        def format_latex(self, date_format=None, soap_cats=None, soap_order=None):

                nothing2format = (
                        (self._payload['reason_for_encounter'] is None)
                                and
                        (self._payload['assessment_of_encounter'] is None)
                                and
                        (self.has_soap_narrative(soap_cats = 'soapu') is False)
                )
                if nothing2format:
                        return ''

                if date_format is None:
                        date_format = '%A, %b %d %Y'

                tex =  '% -------------------------------------------------------------\n'
                tex += '% much recommended: \\usepackage(tabu)\n'
                tex += '% much recommended: \\usepackage(longtable)\n'
                tex += '% best wrapped in: "\\begin{longtabu} to \\textwidth {lX[,L]}"\n'
                tex += '% -------------------------------------------------------------\n'
                tex += '\\hline \n'
                tex += '\\multicolumn{2}{l}{%s: %s ({\\footnotesize %s - %s})} \\tabularnewline \n' % (
                        gmTools.tex_escape_string(self._payload['l10n_type']),
                        gmTools.tex_escape_string(self._payload['started'].strftime(date_format)),
                        gmTools.tex_escape_string(self._payload['started'].strftime('%H:%M')),
                        gmTools.tex_escape_string(self._payload['last_affirmed'].strftime('%H:%M'))
                )
                tex += '\\hline \n'

                if self._payload['reason_for_encounter'] is not None:
                        tex += '%s & %s \\tabularnewline \n' % (
                                gmTools.tex_escape_string(_('RFE')),
                                gmTools.tex_escape_string(self._payload['reason_for_encounter'])
                        )
                if self._payload['assessment_of_encounter'] is not None:
                        tex += '%s & %s \\tabularnewline \n' % (
                                gmTools.tex_escape_string(_('AOE')),
                                gmTools.tex_escape_string(self._payload['assessment_of_encounter'])
                        )

                from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
                for epi in self.get_episodes():
                        soaps = epi.get_narrative(soap_cats = soap_cats, encounters = [self.pk_obj], order_by = soap_order)
                        if len(soaps) == 0:
                                continue
                        tex += '\\multicolumn{2}{l}{\\emph{%s: %s%s}} \\tabularnewline \n' % (
                                gmTools.tex_escape_string(_('Problem')),
                                gmTools.tex_escape_string(epi['description']),
                                gmTools.tex_escape_string (
                                        gmTools.coalesce (
                                                value2test = diagnostic_certainty_classification2str(epi['diagnostic_certainty_classification']),
                                                return_instead = '',
                                                template4value = ' {\\footnotesize [%s]}',
                                                none_equivalents = [None, '']
                                        )
                                )
                        )
                        if epi['pk_health_issue'] is not None:
                                tex += '\\multicolumn{2}{l}{\\emph{%s: %s%s}} \\tabularnewline \n' % (
                                        gmTools.tex_escape_string(_('Health issue')),
                                        gmTools.tex_escape_string(epi['health_issue']),
                                        gmTools.tex_escape_string (
                                                gmTools.coalesce (
                                                        value2test = diagnostic_certainty_classification2str(epi['diagnostic_certainty_classification_issue']),
                                                        return_instead = '',
                                                        template4value = ' {\\footnotesize [%s]}',
                                                        none_equivalents = [None, '']
                                                )
                                        )
                                )
                        for soap in soaps:
                                tex += '{\\small %s} & {\\small %s} \\tabularnewline \n' % (
                                        gmTools.tex_escape_string(gmSoapDefs.soap_cat2l10n[soap['soap_cat']]),
                                        gmTools.tex_escape_string(soap['narrative'], replace_eol = True)
                                )
                        tex += ' & \\tabularnewline \n'

                return tex

        #--------------------------------------------------------
        def __format_header_fancy(self, left_margin=0):
                lines = []

                lines.append('%s%s: %s - %s (@%s)%s [#%s]' % (
                        ' ' * left_margin,
                        self._payload['l10n_type'],
                        self._payload['started_original_tz'].strftime('%Y-%m-%d %H:%M'),
                        self._payload['last_affirmed_original_tz'].strftime('%H:%M'),
                        self._payload['source_time_zone'],
                        gmTools.coalesce (
                                self._payload['assessment_of_encounter'],
                                '',
                                ' %s%%s%s' % (gmTools.u_left_double_angle_quote, gmTools.u_right_double_angle_quote)
                        ),
                        self._payload['pk_encounter']
                ))

                lines.append(_('  your time: %s - %s  (@%s = %s%s)\n') % (
                        self._payload['started'].strftime('%Y-%m-%d %H:%M'),
                        self._payload['last_affirmed'].strftime('%H:%M'),
                        gmDateTime.current_local_timezone_name,
                        gmTools.bool2subst (
                                gmDateTime.dst_currently_in_effect,
                                gmDateTime.py_dst_timezone_name,
                                gmDateTime.py_timezone_name
                        ),
                        gmTools.bool2subst(gmDateTime.dst_currently_in_effect, ' - ' + _('daylight savings time in effect'), '')
                ))

                if self._payload['praxis_branch'] is not None:
                        lines.append(_('Location: %s (%s)') % (self._payload['praxis_branch'], self._payload['praxis']))

                if self._payload['reason_for_encounter'] is not None:
                        lines.append('%s: %s' % (_('RFE'), self._payload['reason_for_encounter']))
                        codes = self.generic_codes_rfe
                        for c in codes:
                                lines.append(' %s: %s (%s - %s)' % (
                                        c['code'],
                                        c['term'],
                                        c['name_short'],
                                        c['version']
                                ))
                        if len(codes) > 0:
                                lines.append('')

                if self._payload['assessment_of_encounter'] is not None:
                        lines.append('%s: %s' % (_('AOE'), self._payload['assessment_of_encounter']))
                        codes = self.generic_codes_aoe
                        for c in codes:
                                lines.append(' %s: %s (%s - %s)' % (
                                        c['code'],
                                        c['term'],
                                        c['name_short'],
                                        c['version']
                                ))
                        if len(codes) > 0:
                                lines.append('')
                        del codes
                return lines

        #--------------------------------------------------------
        def format_header(self, fancy_header=True, left_margin=0, with_rfe_aoe=False):
                lines = []

                if fancy_header:
                        return self.__format_header_fancy(left_margin = left_margin)

                now = gmDateTime.pydt_now_here()
                if now.strftime('%Y-%m-%d') == self._payload['started_original_tz'].strftime('%Y-%m-%d'):
                        start = '%s %s' % (
                                _('today'),
                                self._payload['started_original_tz'].strftime('%H:%M')
                        )
                else:
                        start = self._payload['started_original_tz'].strftime('%Y-%m-%d %H:%M')
                lines.append('%s%s: %s - %s%s%s' % (
                        ' ' * left_margin,
                        self._payload['l10n_type'],
                        start,
                        self._payload['last_affirmed_original_tz'].strftime('%H:%M'),
                        gmTools.coalesce(self._payload['assessment_of_encounter'], '', ' \u00BB%s\u00AB'),
                        gmTools.coalesce(self._payload['praxis_branch'], '', ' @%s')
                ))
                if with_rfe_aoe:
                        if self._payload['reason_for_encounter'] is not None:
                                lines.append('%s: %s' % (_('RFE'), self._payload['reason_for_encounter']))
                        codes = self.generic_codes_rfe
                        for c in codes:
                                lines.append(' %s: %s (%s - %s)' % (
                                        c['code'],
                                        c['term'],
                                        c['name_short'],
                                        c['version']
                                ))
                        if len(codes) > 0:
                                lines.append('')
                        if self._payload['assessment_of_encounter'] is not None:
                                lines.append('%s: %s' % (_('AOE'), self._payload['assessment_of_encounter']))
                        codes = self.generic_codes_aoe
                        if len(codes) > 0:
                                lines.append('')
                        for c in codes:
                                lines.append(' %s: %s (%s - %s)' % (
                                        c['code'],
                                        c['term'],
                                        c['name_short'],
                                        c['version']
                                ))
                        if len(codes) > 0:
                                lines.append('')
                        del codes

                return lines

        #--------------------------------------------------------
        def format_by_episode(self, episodes=None, issues=None, left_margin=0, patient=None, with_soap=False, with_tests=True, with_docs=True, with_vaccinations=True, with_family_history=True):

                if patient is not None:
                        emr = patient.emr

                lines = []
                if episodes is None:
                        episodes = [ e['pk_episode'] for e in self.episodes ]

                from Gnumed.business.gmEpisode import cEpisode
                for pk in episodes:
                        epi = cEpisode(aPK_obj = pk)
                        lines.append(_('\nEpisode %s%s%s%s:') % (
                                gmTools.u_left_double_angle_quote,
                                epi['description'],
                                gmTools.u_right_double_angle_quote,
                                gmTools.coalesce(epi['health_issue'], '', ' (%s)')
                        ))

                        # soap
                        if with_soap:
                                if patient.ID != self._payload['pk_patient']:
                                        msg = '<patient>.ID = %s but encounter %s belongs to patient %s' % (
                                                patient.ID,
                                                self._payload['pk_encounter'],
                                                self._payload['pk_patient']
                                        )
                                        raise ValueError(msg)
                                lines.extend(self.format_soap (
                                        episodes = [pk],
                                        left_margin = left_margin,
                                        soap_cats = None,               # meaning: all
                                        emr = emr,
                                        issues = issues
                                ))

                        # test results
                        if with_tests:
                                tests = emr.get_test_results_by_date (
                                        episodes = [pk],
                                        encounter = self._payload['pk_encounter']
                                )
                                if len(tests) > 0:
                                        lines.append('')
                                        lines.append(_('Measurements and Results:'))

                                for t in tests:
                                        lines.append(t.format())

                                del tests

                        # vaccinations
                        if with_vaccinations:
                                vaccs = emr.get_vaccinations (
                                        episodes = [pk],
                                        encounters = [ self._payload['pk_encounter'] ],
                                        order_by = 'date_given DESC, vaccine'
                                )
                                if len(vaccs) > 0:
                                        lines.append('')
                                        lines.append(_('Vaccinations:'))
                                for vacc in vaccs:
                                        lines.extend(vacc.format (
                                                with_indications = True,
                                                with_comment = True,
                                                with_reaction = True,
                                                date_format = '%Y-%m-%d'
                                        ))
                                del vaccs

                        # family history
                        if with_family_history:
                                fhx = emr.get_family_history(episodes = [pk])
                                if len(fhx) > 0:
                                        lines.append('')
                                        lines.append(_('Family History: %s') % len(fhx))
                                for f in fhx:
                                        lines.append(f.format (
                                                left_margin = (left_margin + 1),
                                                include_episode = False,
                                                include_comment = True
                                        ))
                                del fhx

                        # documents
                        if with_docs:
                                doc_folder = patient.get_document_folder()
                                docs = doc_folder.get_documents (
                                        pk_episodes = [pk],
                                        encounter = self._payload['pk_encounter']
                                )
                                if len(docs) > 0:
                                        lines.append('')
                                        lines.append(_('Documents:'))
                                for d in docs:
                                        lines.append(' ' + d.format(single_line = True))
                                del docs

                return lines

        #--------------------------------------------------------
        def format_maximum_information(self, patient=None):
                if patient is None:
                        from Gnumed.business.gmPerson import gmCurrentPatient, cPerson
                        if self._payload['pk_patient'] == gmCurrentPatient().ID:
                                patient = gmCurrentPatient()
                        else:
                                patient = cPerson(self._payload['pk_patient'])

                return self.format (
                        patient = patient,
                        fancy_header = True,
                        with_rfe_aoe = True,
                        with_soap = True,
                        with_docs = True,
                        with_tests = False,
                        with_vaccinations = True,
                        with_co_encountlet_hints = True,
                        with_family_history = True,
                        by_episode = False,
                        return_list = True
                )

        #--------------------------------------------------------
        def format(self, episodes=None, with_soap=False, left_margin=0, patient=None, issues=None, with_docs=True, with_tests=True, fancy_header=True, with_vaccinations=True, with_co_encountlet_hints=False, with_rfe_aoe=False, with_family_history=True, by_episode=False, return_list=False):
                """Format an encounter.

                Args:
                        episode: which episodes, touched upon by this encounter, to include information for
                        with_co_encountlet_hints:
                                - whether to include which *other* episodes were discussed during this encounter
                                - (only makes sense if episodes is not None, since that would preclude information)
                """
                lines = self.format_header (
                        fancy_header = fancy_header,
                        left_margin = left_margin,
                        with_rfe_aoe = with_rfe_aoe
                )

                if patient is None:
                        _log.debug('no patient, cannot load patient related data')
                        with_soap = False
                        with_tests = False
                        with_vaccinations = False
                        with_docs = False

                if by_episode:
                        lines.extend(self.format_by_episode (
                                episodes = episodes,
                                issues = issues,
                                left_margin = left_margin,
                                patient = patient,
                                with_soap = with_soap,
                                with_tests = with_tests,
                                with_docs = with_docs,
                                with_vaccinations = with_vaccinations,
                                with_family_history = with_family_history
                        ))
                else:
                        if with_soap:
                                lines.append('')
                                if patient.ID != self._payload['pk_patient']:
                                        msg = '<patient>.ID = %s but encounter %s belongs to patient %s' % (
                                                patient.ID,
                                                self._payload['pk_encounter'],
                                                self._payload['pk_patient']
                                        )
                                        raise ValueError(msg)
                                emr = patient.emr
                                lines.extend(self.format_soap (
                                        episodes = episodes,
                                        left_margin = left_margin,
                                        soap_cats = None,               # meaning: all
                                        emr = emr,
                                        issues = issues
                                ))

        #               # family history
        #               if with_family_history:
        #                       if episodes is not None:
        #                               fhx = emr.get_family_history(episodes = episodes)
        #                               if len(fhx) > 0:
        #                                       lines.append(u'')
        #                                       lines.append(_('Family History: %s') % len(fhx))
        #                               for f in fhx:
        #                                       lines.append(f.format (
        #                                               left_margin = (left_margin + 1),
        #                                               include_episode = False,
        #                                               include_comment = True
        #                                       ))
        #                               del fhx

                        # test results
                        if with_tests:
                                emr = patient.emr
                                tests = emr.get_test_results_by_date (
                                        episodes = episodes,
                                        encounter = self._payload['pk_encounter']
                                )
                                if len(tests) > 0:
                                        lines.append('')
                                        lines.append(_('Measurements and Results:'))
                                for t in tests:
                                        lines.append(t.format())
                                del tests

                        # vaccinations
                        if with_vaccinations:
                                emr = patient.emr
                                vaccs = emr.get_vaccinations (
                                        episodes = episodes,
                                        encounters = [ self._payload['pk_encounter'] ],
                                        order_by = 'date_given DESC, vaccine'
                                )
                                if len(vaccs) > 0:
                                        lines.append('')
                                        lines.append(_('Vaccinations:'))
                                for vacc in vaccs:
                                        lines.extend(vacc.format (
                                                with_indications = True,
                                                with_comment = True,
                                                with_reaction = True,
                                                date_format = '%Y-%m-%d'
                                        ))
                                del vaccs

                        # documents
                        if with_docs:
                                doc_folder = patient.get_document_folder()
                                docs = doc_folder.get_documents (
                                        pk_episodes = episodes,
                                        encounter = self._payload['pk_encounter']
                                )
                                if len(docs) > 0:
                                        lines.append('')
                                        lines.append(_('Documents:'))
                                for d in docs:
                                        lines.append(' ' + d.format(single_line = True))
                                del docs

                        # co-encountlets
                        if with_co_encountlet_hints:
                                if episodes is not None:
                                        other_epis = self.get_episodes(exclude = episodes)
                                        if len(other_epis) > 0:
                                                lines.append('')
                                                lines.append(_('%s other episodes touched upon during this encounter:') % len(other_epis))
                                        for epi in other_epis:
                                                lines.append(' %s%s%s%s' % (
                                                        gmTools.u_left_double_angle_quote,
                                                        epi['description'],
                                                        gmTools.u_right_double_angle_quote,
                                                        gmTools.coalesce(epi['health_issue'], '', ' (%s)')
                                                ))

                if return_list:
                        return lines

                eol_w_margin = '\n%s' % (' ' * left_margin)
                return '%s\n' % eol_w_margin.join(lines)

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def _get_generic_codes_rfe(self):
                if len(self._payload['pk_generic_codes_rfe']) == 0:
                        return []

                cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
                args = {'pks': self._payload['pk_generic_codes_rfe']}
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]

        def _set_generic_codes_rfe(self, pk_codes):
                queries = []
                # remove all codes
                if len(self._payload['pk_generic_codes_rfe']) > 0:
                        queries.append ({
                                'sql': 'DELETE FROM clin.lnk_code2rfe WHERE fk_item = %(enc)s AND fk_generic_code = ANY(%(codes)s)',
                                'args': {
                                        'enc': self._payload['pk_encounter'],
                                        'codes': self._payload['pk_generic_codes_rfe']
                                }
                        })
                # add new codes
                for pk_code in pk_codes:
                        queries.append ({
                                'sql': 'INSERT INTO clin.lnk_code2rfe (fk_item, fk_generic_code) VALUES (%(enc)s, %(pk_code)s)',
                                'args': {
                                        'enc': self._payload['pk_encounter'],
                                        'pk_code': pk_code
                                }
                        })
                if len(queries) == 0:
                        return
                # run it all in one transaction
                gmPG2.run_rw_queries(queries = queries)
                self.refetch_payload()
                return

        generic_codes_rfe = property(_get_generic_codes_rfe, _set_generic_codes_rfe)
        #--------------------------------------------------------
        def _get_generic_codes_aoe(self):
                if len(self._payload['pk_generic_codes_aoe']) == 0:
                        return []

                cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
                args = {'pks': self._payload['pk_generic_codes_aoe']}
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]

        def _set_generic_codes_aoe(self, pk_codes):
                queries = []
                # remove all codes
                if len(self._payload['pk_generic_codes_aoe']) > 0:
                        queries.append ({
                                'sql': 'DELETE FROM clin.lnk_code2aoe WHERE fk_item = %(enc)s AND fk_generic_code = ANY(%(codes)s)',
                                'args': {
                                        'enc': self._payload['pk_encounter'],
                                        'codes': self._payload['pk_generic_codes_aoe']
                                }
                        })
                # add new codes
                for pk_code in pk_codes:
                        queries.append ({
                                'sql': 'INSERT INTO clin.lnk_code2aoe (fk_item, fk_generic_code) VALUES (%(enc)s, %(pk_code)s)',
                                'args': {
                                        'enc': self._payload['pk_encounter'],
                                        'pk_code': pk_code
                                }
                        })
                if len(queries) == 0:
                        return
                # run it all in one transaction
                gmPG2.run_rw_queries(queries = queries)
                self.refetch_payload()
                return

        generic_codes_aoe = property(_get_generic_codes_aoe, _set_generic_codes_aoe)
        #--------------------------------------------------------
        def _get_praxis_branch(self):
                if self._payload['pk_org_unit'] is None:
                        return None
                return gmPraxis.get_praxis_branch_by_org_unit(pk_org_unit = self._payload['pk_org_unit'])

        praxis_branch = property(_get_praxis_branch)
        #--------------------------------------------------------
        def _get_org_unit(self):
                if self._payload['pk_org_unit'] is None:
                        return None
                return gmOrganization.cOrgUnit(aPK_obj = self._payload['pk_org_unit'])

        org_unit = property(_get_org_unit)
        #--------------------------------------------------------
        def _get_formatted_revision_history(self):
                cmd = """SELECT
                                '<N/A>'::TEXT as audit__action_applied,
                                NULL AS audit__action_when,
                                '<N/A>'::TEXT AS audit__action_by,
                                pk_audit,
                                row_version,
                                modified_when,
                                modified_by,
                                pk, fk_patient, fk_type, fk_location, source_time_zone, reason_for_encounter, assessment_of_encounter, started, last_affirmed
                        FROM clin.encounter
                        WHERE pk = %(pk_encounter)s
                UNION ALL (
                        SELECT
                                audit_action as audit__action_applied,
                                audit_when as audit__action_when,
                                audit_by as audit__action_by,
                                pk_audit,
                                orig_version as row_version,
                                orig_when as modified_when,
                                orig_by as modified_by,
                                pk, fk_patient, fk_type, fk_location, source_time_zone, reason_for_encounter, assessment_of_encounter, started, last_affirmed
                        FROM audit.log_encounter
                        WHERE pk = %(pk_encounter)s
                )
                ORDER BY row_version DESC
                """
                args = {'pk_encounter': self._payload['pk_encounter']}
                title = _('Encounter: %s%s%s') % (
                        gmTools.u_left_double_angle_quote,
                        self._payload['l10n_type'],
                        gmTools.u_right_double_angle_quote
                )
                return '\n'.join(self._get_revision_history(cmd, args, title))

        formatted_revision_history = property(_get_formatted_revision_history)

#-----------------------------------------------------------
def create_encounter(fk_patient=None, enc_type=None):
        """Creates a new encounter for a patient.

        fk_patient - patient PK
        enc_type - type of encounter
        """
        if enc_type is None:
                enc_type = 'in surgery'
        # insert new encounter
        queries = []
        try:
                enc_type = int(enc_type)
                cmd = """
                        INSERT INTO clin.encounter (fk_patient, fk_type, fk_location)
                        VALUES (%(pat)s, %(typ)s, %(prax)s) RETURNING pk"""
        except ValueError:
                enc_type = enc_type
                cmd = """
                        INSERT INTO clin.encounter (fk_patient, fk_location, fk_type)
                        VALUES (
                                %(pat)s,
                                %(prax)s,
                                coalesce (
                                        (SELECT pk FROM clin.encounter_type WHERE description = %(typ)s),
                                        -- pick the first available
                                        (SELECT pk FROM clin.encounter_type limit 1)
                                )
                        ) RETURNING pk"""
        praxis = gmPraxis.gmCurrentPraxisBranch()
        args = {'pat': fk_patient, 'typ': enc_type, 'prax': praxis['pk_org_unit']}
        queries.append({'sql': cmd, 'args': args})
        rows = gmPG2.run_rw_queries(queries = queries, return_data = True)
        return cEncounter(aPK_obj = rows[0]['pk'])

#------------------------------------------------------------
def lock_encounter(pk_encounter, exclusive=False, link_obj=None):
        """Used to protect against deletion of active encounter FROM another client."""
        return gmPG2.lock_row(link_obj = link_obj, table = 'clin.encounter', pk = pk_encounter, exclusive = exclusive)

#------------------------------------------------------------
def unlock_encounter(pk_encounter, exclusive=False, link_obj=None):
        unlocked = gmPG2.unlock_row(link_obj = link_obj, table = 'clin.encounter', pk = pk_encounter, exclusive = exclusive)
        if not unlocked:
                _log.warning('cannot unlock encounter [#%s]', pk_encounter)
                call_stack = inspect.stack()
                call_stack.reverse()
                for idx in range(1, len(call_stack)):
                        caller = call_stack[idx]
                        _log.error('%s[%s] @ [%s] in [%s]', ' ' * idx, caller[3], caller[2], caller[1])
                del call_stack
        return unlocked

#-----------------------------------------------------------
def delete_encounter(pk_encounter):
        """Deletes an encounter by PK.

        - attempts to obtain an exclusive lock which should
          fail if the encounter is the active encounter in
          this or any other client
        - catches DB exceptions which should mostly be related
          to clinical data already having been attached to
          the encounter thus making deletion fail
        """
        conn = gmPG2.get_connection(readonly = False)
        if not lock_encounter(pk_encounter, exclusive = True, link_obj = conn):
                _log.debug('cannot lock encounter [%s] for deletion, it seems in use', pk_encounter)
                return False

        SQL = "DELETE FROM clin.encounter WHERE pk = %(enc)s"
        args = {'enc': pk_encounter}
        try:
                gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
        except gmPG2.PG_ERROR_EXCEPTION as exc:
                _log.exception('cannot delete encounter [%s]', pk_encounter)
                gmPG2.log_pg_exception_details(exc)
                unlock_encounter(pk_encounter, exclusive = True, link_obj = conn)
                return False

        unlock_encounter(pk_encounter, exclusive = True, link_obj = conn)
        return True

#-----------------------------------------------------------
# encounter types handling
#-----------------------------------------------------------
def update_encounter_type(description=None, l10n_description=None):
        SQL = "SELECT i18n.upd_tx(%(desc)s, %(l10n_desc)s)"
        args = {'desc': description, 'l10n_desc': l10n_description}
        rows = gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}], return_data = True)
        success = rows[0][0]
        if not success:
                _log.warning('updating encounter type [%s] to [%s] failed', description, l10n_description)
        return {'description': description, 'l10n_description': l10n_description}

#-----------------------------------------------------------
def create_encounter_type(description=None, l10n_description=None):
        """This will attempt to create a NEW encounter type."""

        # need a system name, so derive one if necessary
        if description is None:
                description = l10n_description

        args = {
                'desc': description,
                'l10n_desc': l10n_description
        }

        _log.debug('creating encounter type: %s, %s', description, l10n_description)

        # does it exist already ?
        cmd = "SELECT description, _(description) FROM clin.encounter_type WHERE description = %(desc)s"
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])

        # yes
        if len(rows) > 0:
                # both system and l10n name are the same so all is well
                if (rows[0][0] == description) and (rows[0][1] == l10n_description):
                        _log.info('encounter type [%s] already exists with the proper translation')
                        return {'description': description, 'l10n_description': l10n_description}

                # or maybe there just wasn't a translation to
                # the current language for this type yet ?
                cmd = "SELECT EXISTS (SELECT 1 FROM i18n.translations WHERE orig = %(desc)s and lang = i18n.get_curr_lang())"
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])

                # there was, so fail
                if rows[0][0]:
                        _log.error('encounter type [%s] already exists but with another translation')
                        return None

                # else set it
                cmd = "SELECT i18n.upd_tx(%(desc)s, %(l10n_desc)s)"
                rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return {'description': description, 'l10n_description': l10n_description}

        # no
        queries = [
                {'sql': "INSERT INTO clin.encounter_type (description) VALUES (%(desc)s)", 'args': args},
                {'sql': "SELECT i18n.upd_tx(%(desc)s, %(l10n_desc)s)", 'args': args}
        ]
        rows = gmPG2.run_rw_queries(queries = queries)

        return {'description': description, 'l10n_description': l10n_description}

#-----------------------------------------------------------
def get_most_commonly_used_encounter_type():
        SQL = """
                SELECT
                        COUNT(*) AS type_count,
                        fk_type
                FROM clin.encounter
                GROUP BY fk_type
                ORDER BY type_count DESC
                LIMIT 1
        """
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL}])
        if len(rows) == 0:
                return None

        return rows[0]['fk_type']

#-----------------------------------------------------------
def get_encounter_types():
        SQL = """
                SELECT
                        _(description) AS l10n_description,
                        description
                FROM
                        clin.encounter_type
                ORDER BY
                        l10n_description
        """
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL}])
        return rows

#-----------------------------------------------------------
def get_encounter_type(description:str=None):
        SQL = 'SELECT * FROM clin.encounter_type WHERE description = %(desc)s'
        args = {'desc': description}
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        return rows

#-----------------------------------------------------------
def delete_encounter_type(description:str=None):
        deleted = False
        SQL = "DELETE FROM clin.encounter_type WHERE description = %(desc)s"
        args = {'desc': description}
        try:
                gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
                deleted = True
        except gmPG2.dbapi.IntegrityError as e:
                if e.pgcode != gmPG2.PG_error_codes.FOREIGN_KEY_VIOLATION:
                        raise

        return deleted

#============================================================

#============================================================
# main - unit testing
#------------------------------------------------------------
if __name__ == '__main__':

        if len(sys.argv) < 2:
                sys.exit()

        if sys.argv[1] != 'test':
                sys.exit()

        gmI18N.activate_locale()
        gmI18N.install_domain('gnumed')

        #--------------------------------------------------------
        # define tests
        #--------------------------------------------------------
        def test_encounter():
                print("\nencounter test")
                print("--------------")
                encounter = cEncounter(aPK_obj=1)
                print(encounter)
                fields = encounter.get_fields()
                for field in fields:
                        print(field, ':', encounter[field])
                print("updatable:", encounter.get_updatable_fields())
                #print encounter.formatted_revision_history
                print(encounter.transfer_all_data_to_another_encounter(pk_target_encounter = 2))

        #--------------------------------------------------------
        def test_encounter2latex():
                encounter = cEncounter(aPK_obj=1)
                print(encounter)
                print("")
                print(encounter.format_latex())

        #--------------------------------------------------------
        gmPG2.request_login_params(setup_pool = True)

        #test_encounter()
        test_encounter2latex()

Functions

def create_encounter(fk_patient=None, enc_type=None)

Creates a new encounter for a patient.

fk_patient - patient PK enc_type - type of encounter

Expand source code
def create_encounter(fk_patient=None, enc_type=None):
        """Creates a new encounter for a patient.

        fk_patient - patient PK
        enc_type - type of encounter
        """
        if enc_type is None:
                enc_type = 'in surgery'
        # insert new encounter
        queries = []
        try:
                enc_type = int(enc_type)
                cmd = """
                        INSERT INTO clin.encounter (fk_patient, fk_type, fk_location)
                        VALUES (%(pat)s, %(typ)s, %(prax)s) RETURNING pk"""
        except ValueError:
                enc_type = enc_type
                cmd = """
                        INSERT INTO clin.encounter (fk_patient, fk_location, fk_type)
                        VALUES (
                                %(pat)s,
                                %(prax)s,
                                coalesce (
                                        (SELECT pk FROM clin.encounter_type WHERE description = %(typ)s),
                                        -- pick the first available
                                        (SELECT pk FROM clin.encounter_type limit 1)
                                )
                        ) RETURNING pk"""
        praxis = gmPraxis.gmCurrentPraxisBranch()
        args = {'pat': fk_patient, 'typ': enc_type, 'prax': praxis['pk_org_unit']}
        queries.append({'sql': cmd, 'args': args})
        rows = gmPG2.run_rw_queries(queries = queries, return_data = True)
        return cEncounter(aPK_obj = rows[0]['pk'])
def create_encounter_type(description=None, l10n_description=None)

This will attempt to create a NEW encounter type.

Expand source code
def create_encounter_type(description=None, l10n_description=None):
        """This will attempt to create a NEW encounter type."""

        # need a system name, so derive one if necessary
        if description is None:
                description = l10n_description

        args = {
                'desc': description,
                'l10n_desc': l10n_description
        }

        _log.debug('creating encounter type: %s, %s', description, l10n_description)

        # does it exist already ?
        cmd = "SELECT description, _(description) FROM clin.encounter_type WHERE description = %(desc)s"
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])

        # yes
        if len(rows) > 0:
                # both system and l10n name are the same so all is well
                if (rows[0][0] == description) and (rows[0][1] == l10n_description):
                        _log.info('encounter type [%s] already exists with the proper translation')
                        return {'description': description, 'l10n_description': l10n_description}

                # or maybe there just wasn't a translation to
                # the current language for this type yet ?
                cmd = "SELECT EXISTS (SELECT 1 FROM i18n.translations WHERE orig = %(desc)s and lang = i18n.get_curr_lang())"
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])

                # there was, so fail
                if rows[0][0]:
                        _log.error('encounter type [%s] already exists but with another translation')
                        return None

                # else set it
                cmd = "SELECT i18n.upd_tx(%(desc)s, %(l10n_desc)s)"
                rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return {'description': description, 'l10n_description': l10n_description}

        # no
        queries = [
                {'sql': "INSERT INTO clin.encounter_type (description) VALUES (%(desc)s)", 'args': args},
                {'sql': "SELECT i18n.upd_tx(%(desc)s, %(l10n_desc)s)", 'args': args}
        ]
        rows = gmPG2.run_rw_queries(queries = queries)

        return {'description': description, 'l10n_description': l10n_description}
def delete_encounter(pk_encounter)

Deletes an encounter by PK.

  • attempts to obtain an exclusive lock which should fail if the encounter is the active encounter in this or any other client
  • catches DB exceptions which should mostly be related to clinical data already having been attached to the encounter thus making deletion fail
Expand source code
def delete_encounter(pk_encounter):
        """Deletes an encounter by PK.

        - attempts to obtain an exclusive lock which should
          fail if the encounter is the active encounter in
          this or any other client
        - catches DB exceptions which should mostly be related
          to clinical data already having been attached to
          the encounter thus making deletion fail
        """
        conn = gmPG2.get_connection(readonly = False)
        if not lock_encounter(pk_encounter, exclusive = True, link_obj = conn):
                _log.debug('cannot lock encounter [%s] for deletion, it seems in use', pk_encounter)
                return False

        SQL = "DELETE FROM clin.encounter WHERE pk = %(enc)s"
        args = {'enc': pk_encounter}
        try:
                gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
        except gmPG2.PG_ERROR_EXCEPTION as exc:
                _log.exception('cannot delete encounter [%s]', pk_encounter)
                gmPG2.log_pg_exception_details(exc)
                unlock_encounter(pk_encounter, exclusive = True, link_obj = conn)
                return False

        unlock_encounter(pk_encounter, exclusive = True, link_obj = conn)
        return True
def delete_encounter_type(description: str = None)
Expand source code
def delete_encounter_type(description:str=None):
        deleted = False
        SQL = "DELETE FROM clin.encounter_type WHERE description = %(desc)s"
        args = {'desc': description}
        try:
                gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
                deleted = True
        except gmPG2.dbapi.IntegrityError as e:
                if e.pgcode != gmPG2.PG_error_codes.FOREIGN_KEY_VIOLATION:
                        raise

        return deleted
def get_encounter_type(description: str = None)
Expand source code
def get_encounter_type(description:str=None):
        SQL = 'SELECT * FROM clin.encounter_type WHERE description = %(desc)s'
        args = {'desc': description}
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        return rows
def get_encounter_types()
Expand source code
def get_encounter_types():
        SQL = """
                SELECT
                        _(description) AS l10n_description,
                        description
                FROM
                        clin.encounter_type
                ORDER BY
                        l10n_description
        """
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL}])
        return rows
def get_most_commonly_used_encounter_type()
Expand source code
def get_most_commonly_used_encounter_type():
        SQL = """
                SELECT
                        COUNT(*) AS type_count,
                        fk_type
                FROM clin.encounter
                GROUP BY fk_type
                ORDER BY type_count DESC
                LIMIT 1
        """
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL}])
        if len(rows) == 0:
                return None

        return rows[0]['fk_type']
def lock_encounter(pk_encounter, exclusive=False, link_obj=None)

Used to protect against deletion of active encounter FROM another client.

Expand source code
def lock_encounter(pk_encounter, exclusive=False, link_obj=None):
        """Used to protect against deletion of active encounter FROM another client."""
        return gmPG2.lock_row(link_obj = link_obj, table = 'clin.encounter', pk = pk_encounter, exclusive = exclusive)
def unlock_encounter(pk_encounter, exclusive=False, link_obj=None)
Expand source code
def unlock_encounter(pk_encounter, exclusive=False, link_obj=None):
        unlocked = gmPG2.unlock_row(link_obj = link_obj, table = 'clin.encounter', pk = pk_encounter, exclusive = exclusive)
        if not unlocked:
                _log.warning('cannot unlock encounter [#%s]', pk_encounter)
                call_stack = inspect.stack()
                call_stack.reverse()
                for idx in range(1, len(call_stack)):
                        caller = call_stack[idx]
                        _log.error('%s[%s] @ [%s] in [%s]', ' ' * idx, caller[3], caller[2], caller[1])
                del call_stack
        return unlocked
def update_encounter_type(description=None, l10n_description=None)
Expand source code
def update_encounter_type(description=None, l10n_description=None):
        SQL = "SELECT i18n.upd_tx(%(desc)s, %(l10n_desc)s)"
        args = {'desc': description, 'l10n_desc': l10n_description}
        rows = gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}], return_data = True)
        success = rows[0][0]
        if not success:
                _log.warning('updating encounter type [%s] to [%s] failed', description, l10n_description)
        return {'description': description, 'l10n_description': l10n_description}

Classes

class cEncounter (aPK_obj: int | dict = None, row: dict = None, link_obj=None)

Represents one encounter.

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)

Args

aPK_obj
retrieve data from backend
  • an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
  • a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
must hold the fields
  • data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key column OR
  • pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    }
    
    rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
    objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
    
Expand source code
class cEncounter(gmBusinessDBObject.cBusinessDBObject):
        """Represents one encounter."""

        _cmd_fetch_payload = SQL_get_encounters % 'pk_encounter = %s'
        _cmds_store_payload = [
                """UPDATE clin.encounter SET
                                started = %(started)s,
                                last_affirmed = %(last_affirmed)s,
                                fk_location = %(pk_org_unit)s,
                                fk_type = %(pk_type)s,
                                reason_for_encounter = gm.nullify_empty_string(%(reason_for_encounter)s),
                                assessment_of_encounter = gm.nullify_empty_string(%(assessment_of_encounter)s)
                        WHERE
                                pk = %(pk_encounter)s AND
                                xmin = %(xmin_encounter)s
                        """,
                # need to return all fields so we can survive in-place upgrades
                "SELECT * FROM clin.v_pat_encounters WHERE pk_encounter = %(pk_encounter)s"
        ]
        _updatable_fields = [
                'started',
                'last_affirmed',
                'pk_org_unit',
                'pk_type',
                'reason_for_encounter',
                'assessment_of_encounter'
        ]
        #--------------------------------------------------------
        def set_active(self):
                """Set the encounter as the active one.

                "Setting active" means making sure the encounter
                row has the youngest "last_affirmed" timestamp of
                all encounter rows for this patient.
                """
                self['last_affirmed'] = gmDateTime.pydt_now_here()
                self.save()

        #--------------------------------------------------------
        def lock(self, exclusive=False, link_obj=None):
                return lock_encounter(self.pk_obj, exclusive = exclusive, link_obj = link_obj)

        #--------------------------------------------------------
        def unlock(self, exclusive=False, link_obj=None):
                return unlock_encounter(self.pk_obj, exclusive = exclusive, link_obj = link_obj)

        #--------------------------------------------------------
        def transfer_clinical_data(self, source_episode=None, target_episode=None):
                """
                Moves every element currently linked to the current encounter
                and the source_episode onto target_episode.

                @param source_episode The episode the elements are currently linked to.
                @type target_episode A cEpisode intance.
                @param target_episode The episode the elements will be relinked to.
                @type target_episode A cEpisode intance.
                """
                if source_episode['pk_episode'] == target_episode['pk_episode']:
                        return True

                SQL = """
                        UPDATE clin.clin_root_item
                        SET fk_episode = %(trg)s
                        WHERE
                                fk_encounter = %(enc)s AND
                                fk_episode = %(src)s
                        """
                args = {
                        'trg': target_episode['pk_episode'],
                        'enc': self.pk_obj,
                        'src': source_episode['pk_episode']
                }
                gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
                self.refetch_payload()
                return True

        #--------------------------------------------------------
        def transfer_all_data_to_another_encounter(self, pk_target_encounter=None):
                if pk_target_encounter == self.pk_obj:
                        return True

                SQL = "SELECT clin.transfer_all_encounter_data(%(src)s, %(trg)s)"
                args = {
                        'src': self.pk_obj,
                        'trg': pk_target_encounter
                }
                gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
                return True

        #--------------------------------------------------------
        def same_payload(self, another_object=None):

                relevant_fields = [
                        'pk_org_unit',
                        'pk_type',
                        'pk_patient',
                        'reason_for_encounter',
                        'assessment_of_encounter'
                ]
                for field in relevant_fields:
                        if self._payload[field] != another_object[field]:
                                _log.debug('mismatch on [%s]: "%s" vs. "%s"', field, self._payload[field], another_object[field])
                                return False

                relevant_fields = [
                        'started',
                        'last_affirmed',
                ]
                for field in relevant_fields:
                        if self._payload[field] is None:
                                if another_object[field] is None:
                                        continue
                                _log.debug('mismatch on [%s]: here="%s", other="%s"', field, self._payload[field], another_object[field])
                                return False

                        if another_object[field] is None:
                                return False

                        # compares at seconds granularity
                        if self._payload[field].strftime('%Y-%m-%d %H:%M:%S') != another_object[field].strftime('%Y-%m-%d %H:%M:%S'):
                                _log.debug('mismatch on [%s]: here="%s", other="%s"', field, self._payload[field], another_object[field])
                                return False

                # compare codes
                # 1) RFE
                if another_object['pk_generic_codes_rfe'] is None:
                        if self._payload['pk_generic_codes_rfe'] is not None:
                                return False
                if another_object['pk_generic_codes_rfe'] is not None:
                        if self._payload['pk_generic_codes_rfe'] is None:
                                return False
                if (
                        (another_object['pk_generic_codes_rfe'] is None)
                                and
                        (self._payload['pk_generic_codes_rfe'] is None)
                ) is False:
                        if set(another_object['pk_generic_codes_rfe']) != set(self._payload['pk_generic_codes_rfe']):
                                return False
                # 2) AOE
                if another_object['pk_generic_codes_aoe'] is None:
                        if self._payload['pk_generic_codes_aoe'] is not None:
                                return False
                if another_object['pk_generic_codes_aoe'] is not None:
                        if self._payload['pk_generic_codes_aoe'] is None:
                                return False
                if (
                        (another_object['pk_generic_codes_aoe'] is None)
                                and
                        (self._payload['pk_generic_codes_aoe'] is None)
                ) is False:
                        if set(another_object['pk_generic_codes_aoe']) != set(self._payload['pk_generic_codes_aoe']):
                                return False

                return True

        #--------------------------------------------------------
        def has_clinical_data(self):
                SQL = """SELECT EXISTS (
                        SELECT 1 FROM clin.v_pat_items WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s
                                UNION ALL
                        SELECT 1 FROM blobs.v_doc_med WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s
                )"""
                args = {
                        'pat': self._payload['pk_patient'],
                        'enc': self.pk_obj
                }
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def has_narrative(self):
                SQL = "SELECT EXISTS (SELECT 1 FROM clin.v_pat_items WHERE pk_patient=%(pat)s and pk_encounter=%(enc)s)"
                args = {
                        'pat': self._payload['pk_patient'],
                        'enc': self.pk_obj
                }
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL,     'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def has_soap_narrative(self, soap_cats=None):
                """soap_cats: <space> = admin category"""

                if soap_cats is None:
                        soap_cats = 'soap '
                else:
                        soap_cats = soap_cats.casefold()

                cats = []
                for cat in soap_cats:
                        if cat in 'soapu':
                                cats.append(cat)
                                continue
                        if cat == ' ':
                                cats.append(None)
                SQL = """SELECT EXISTS (
                        SELECT 1 FROM clin.clin_narrative
                        WHERE
                                fk_encounter = %(enc)s
                                        AND
                                soap_cat = ANY(%(cats)s)
                        LIMIT 1
                )"""
                args = {'enc': self._payload['pk_encounter'], 'cats': cats}
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def has_documents(self):
                SQL = "SELECT EXISTS (SELECT 1 FROM blobs.v_doc_med WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s)"
                args = {
                        'pat': self._payload['pk_patient'],
                        'enc': self.pk_obj
                }
                rows = gmPG2.run_ro_queries (queries = [{'sql': SQL,'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def get_latest_soap(self, soap_cat=None, episode=None):

                if soap_cat is not None:
                        soap_cat = soap_cat.casefold()
                if episode is None:
                        epi_part = 'fk_episode is null'
                else:
                        epi_part = 'fk_episode = %(epi)s'
                SQL = """
                        SELECT narrative
                        FROM clin.clin_narrative
                        where
                                fk_encounter = %%(enc)s
                                        and
                                soap_cat = %%(cat)s
                                        and
                                %s
                        order by clin_when desc
                        limit 1""" % epi_part
                args = {'enc': self.pk_obj, 'cat': soap_cat, 'epi': episode}
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                if len(rows) == 0:
                        return None

                return rows[0][0]
        #--------------------------------------------------------
        def get_episodes(self, exclude=None):
                """exclude: list of episode PKs to exclude"""
                args = {'enc': self.pk_obj}
                extra_where_parts = ''
                if exclude is not None:
                        extra_where_parts = 'AND pk_episode <> ALL(%(excluded_pks)s)'
                        args['excluded_pks'] = exclude
                SQL = """
                        SELECT * FROM clin.v_pat_episodes
                        WHERE pk_episode IN (
                                        SELECT DISTINCT fk_episode
                                        FROM clin.clin_root_item
                                        WHERE fk_encounter = %%(enc)s

                                                UNION

                                        SELECT DISTINCT fk_episode
                                        FROM blobs.doc_med
                                        WHERE fk_encounter = %%(enc)s
                        ) %s""" % extra_where_parts
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                from Gnumed.business.gmEpisode import cEpisode
                return [ cEpisode(row = {'data': r, 'pk_field': 'pk_episode'})  for r in rows ]

        episodes = property(get_episodes)

        #--------------------------------------------------------
        def add_code(self, pk_code=None, field=None):
                """<pk_code> must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
                if field == 'rfe':
                        cmd = "INSERT INTO clin.lnk_code2rfe (fk_item, fk_generic_code) VALUES (%(item)s, %(code)s)"
                elif field == 'aoe':
                        cmd = "INSERT INTO clin.lnk_code2aoe (fk_item, fk_generic_code) VALUES (%(item)s, %(code)s)"
                else:
                        raise ValueError('<field> must be one of "rfe" or "aoe", not "%s"', field)
                args = {
                        'item': self._payload['pk_encounter'],
                        'code': pk_code
                }
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return True

        #--------------------------------------------------------
        def remove_code(self, pk_code=None, field=None):
                """<pk_code> must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
                if field == 'rfe':
                        cmd = "DELETE FROM clin.lnk_code2rfe WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
                elif field == 'aoe':
                        cmd = "DELETE FROM clin.lnk_code2aoe WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
                else:
                        raise ValueError('<field> must be one of "rfe" or "aoe", not "%s"', field)
                args = {
                        'item': self._payload['pk_encounter'],
                        'code': pk_code
                }
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return True

        #--------------------------------------------------------
        # data formatting
        #--------------------------------------------------------
        def format_soap(self, episodes=None, left_margin=0, soap_cats='soapu', emr=None, issues=None):

                lines = []
                for soap_cat in gmSoapDefs.soap_cats_str2list(soap_cats):
                        soap_cat_narratives = emr.get_clin_narrative (
                                episodes = episodes,
                                issues = issues,
                                encounters = [self._payload['pk_encounter']],
                                soap_cats = [soap_cat]
                        )
                        if soap_cat_narratives is None:
                                continue
                        if len(soap_cat_narratives) == 0:
                                continue

                        lines.append('%s%s %s %s' % (
                                gmTools.u_box_top_left_arc,
                                gmTools.u_box_horiz_single,
                                gmSoapDefs.soap_cat2l10n_str[soap_cat],
                                gmTools.u_box_horiz_single * 5
                        ))
                        for soap_entry in soap_cat_narratives:
                                txt = gmTools.wrap (
                                        text = soap_entry['narrative'],
                                        width = 75,
                                        initial_indent = '',
                                        subsequent_indent = (' ' * left_margin)
                                )
                                lines.append(txt)
                                txt = '%s%s %.8s, %s %s' % (
                                        ' ' * 40,
                                        gmTools.u_box_horiz_light_heavy,
                                        soap_entry['modified_by'],
                                        soap_entry['date'].strftime('%Y-%m-%d %H:%M'),
                                        gmTools.u_box_horiz_heavy_light
                                )
                                lines.append(txt)
                                lines.append('')

                return lines

        #--------------------------------------------------------
        def format_latex(self, date_format=None, soap_cats=None, soap_order=None):

                nothing2format = (
                        (self._payload['reason_for_encounter'] is None)
                                and
                        (self._payload['assessment_of_encounter'] is None)
                                and
                        (self.has_soap_narrative(soap_cats = 'soapu') is False)
                )
                if nothing2format:
                        return ''

                if date_format is None:
                        date_format = '%A, %b %d %Y'

                tex =  '% -------------------------------------------------------------\n'
                tex += '% much recommended: \\usepackage(tabu)\n'
                tex += '% much recommended: \\usepackage(longtable)\n'
                tex += '% best wrapped in: "\\begin{longtabu} to \\textwidth {lX[,L]}"\n'
                tex += '% -------------------------------------------------------------\n'
                tex += '\\hline \n'
                tex += '\\multicolumn{2}{l}{%s: %s ({\\footnotesize %s - %s})} \\tabularnewline \n' % (
                        gmTools.tex_escape_string(self._payload['l10n_type']),
                        gmTools.tex_escape_string(self._payload['started'].strftime(date_format)),
                        gmTools.tex_escape_string(self._payload['started'].strftime('%H:%M')),
                        gmTools.tex_escape_string(self._payload['last_affirmed'].strftime('%H:%M'))
                )
                tex += '\\hline \n'

                if self._payload['reason_for_encounter'] is not None:
                        tex += '%s & %s \\tabularnewline \n' % (
                                gmTools.tex_escape_string(_('RFE')),
                                gmTools.tex_escape_string(self._payload['reason_for_encounter'])
                        )
                if self._payload['assessment_of_encounter'] is not None:
                        tex += '%s & %s \\tabularnewline \n' % (
                                gmTools.tex_escape_string(_('AOE')),
                                gmTools.tex_escape_string(self._payload['assessment_of_encounter'])
                        )

                from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
                for epi in self.get_episodes():
                        soaps = epi.get_narrative(soap_cats = soap_cats, encounters = [self.pk_obj], order_by = soap_order)
                        if len(soaps) == 0:
                                continue
                        tex += '\\multicolumn{2}{l}{\\emph{%s: %s%s}} \\tabularnewline \n' % (
                                gmTools.tex_escape_string(_('Problem')),
                                gmTools.tex_escape_string(epi['description']),
                                gmTools.tex_escape_string (
                                        gmTools.coalesce (
                                                value2test = diagnostic_certainty_classification2str(epi['diagnostic_certainty_classification']),
                                                return_instead = '',
                                                template4value = ' {\\footnotesize [%s]}',
                                                none_equivalents = [None, '']
                                        )
                                )
                        )
                        if epi['pk_health_issue'] is not None:
                                tex += '\\multicolumn{2}{l}{\\emph{%s: %s%s}} \\tabularnewline \n' % (
                                        gmTools.tex_escape_string(_('Health issue')),
                                        gmTools.tex_escape_string(epi['health_issue']),
                                        gmTools.tex_escape_string (
                                                gmTools.coalesce (
                                                        value2test = diagnostic_certainty_classification2str(epi['diagnostic_certainty_classification_issue']),
                                                        return_instead = '',
                                                        template4value = ' {\\footnotesize [%s]}',
                                                        none_equivalents = [None, '']
                                                )
                                        )
                                )
                        for soap in soaps:
                                tex += '{\\small %s} & {\\small %s} \\tabularnewline \n' % (
                                        gmTools.tex_escape_string(gmSoapDefs.soap_cat2l10n[soap['soap_cat']]),
                                        gmTools.tex_escape_string(soap['narrative'], replace_eol = True)
                                )
                        tex += ' & \\tabularnewline \n'

                return tex

        #--------------------------------------------------------
        def __format_header_fancy(self, left_margin=0):
                lines = []

                lines.append('%s%s: %s - %s (@%s)%s [#%s]' % (
                        ' ' * left_margin,
                        self._payload['l10n_type'],
                        self._payload['started_original_tz'].strftime('%Y-%m-%d %H:%M'),
                        self._payload['last_affirmed_original_tz'].strftime('%H:%M'),
                        self._payload['source_time_zone'],
                        gmTools.coalesce (
                                self._payload['assessment_of_encounter'],
                                '',
                                ' %s%%s%s' % (gmTools.u_left_double_angle_quote, gmTools.u_right_double_angle_quote)
                        ),
                        self._payload['pk_encounter']
                ))

                lines.append(_('  your time: %s - %s  (@%s = %s%s)\n') % (
                        self._payload['started'].strftime('%Y-%m-%d %H:%M'),
                        self._payload['last_affirmed'].strftime('%H:%M'),
                        gmDateTime.current_local_timezone_name,
                        gmTools.bool2subst (
                                gmDateTime.dst_currently_in_effect,
                                gmDateTime.py_dst_timezone_name,
                                gmDateTime.py_timezone_name
                        ),
                        gmTools.bool2subst(gmDateTime.dst_currently_in_effect, ' - ' + _('daylight savings time in effect'), '')
                ))

                if self._payload['praxis_branch'] is not None:
                        lines.append(_('Location: %s (%s)') % (self._payload['praxis_branch'], self._payload['praxis']))

                if self._payload['reason_for_encounter'] is not None:
                        lines.append('%s: %s' % (_('RFE'), self._payload['reason_for_encounter']))
                        codes = self.generic_codes_rfe
                        for c in codes:
                                lines.append(' %s: %s (%s - %s)' % (
                                        c['code'],
                                        c['term'],
                                        c['name_short'],
                                        c['version']
                                ))
                        if len(codes) > 0:
                                lines.append('')

                if self._payload['assessment_of_encounter'] is not None:
                        lines.append('%s: %s' % (_('AOE'), self._payload['assessment_of_encounter']))
                        codes = self.generic_codes_aoe
                        for c in codes:
                                lines.append(' %s: %s (%s - %s)' % (
                                        c['code'],
                                        c['term'],
                                        c['name_short'],
                                        c['version']
                                ))
                        if len(codes) > 0:
                                lines.append('')
                        del codes
                return lines

        #--------------------------------------------------------
        def format_header(self, fancy_header=True, left_margin=0, with_rfe_aoe=False):
                lines = []

                if fancy_header:
                        return self.__format_header_fancy(left_margin = left_margin)

                now = gmDateTime.pydt_now_here()
                if now.strftime('%Y-%m-%d') == self._payload['started_original_tz'].strftime('%Y-%m-%d'):
                        start = '%s %s' % (
                                _('today'),
                                self._payload['started_original_tz'].strftime('%H:%M')
                        )
                else:
                        start = self._payload['started_original_tz'].strftime('%Y-%m-%d %H:%M')
                lines.append('%s%s: %s - %s%s%s' % (
                        ' ' * left_margin,
                        self._payload['l10n_type'],
                        start,
                        self._payload['last_affirmed_original_tz'].strftime('%H:%M'),
                        gmTools.coalesce(self._payload['assessment_of_encounter'], '', ' \u00BB%s\u00AB'),
                        gmTools.coalesce(self._payload['praxis_branch'], '', ' @%s')
                ))
                if with_rfe_aoe:
                        if self._payload['reason_for_encounter'] is not None:
                                lines.append('%s: %s' % (_('RFE'), self._payload['reason_for_encounter']))
                        codes = self.generic_codes_rfe
                        for c in codes:
                                lines.append(' %s: %s (%s - %s)' % (
                                        c['code'],
                                        c['term'],
                                        c['name_short'],
                                        c['version']
                                ))
                        if len(codes) > 0:
                                lines.append('')
                        if self._payload['assessment_of_encounter'] is not None:
                                lines.append('%s: %s' % (_('AOE'), self._payload['assessment_of_encounter']))
                        codes = self.generic_codes_aoe
                        if len(codes) > 0:
                                lines.append('')
                        for c in codes:
                                lines.append(' %s: %s (%s - %s)' % (
                                        c['code'],
                                        c['term'],
                                        c['name_short'],
                                        c['version']
                                ))
                        if len(codes) > 0:
                                lines.append('')
                        del codes

                return lines

        #--------------------------------------------------------
        def format_by_episode(self, episodes=None, issues=None, left_margin=0, patient=None, with_soap=False, with_tests=True, with_docs=True, with_vaccinations=True, with_family_history=True):

                if patient is not None:
                        emr = patient.emr

                lines = []
                if episodes is None:
                        episodes = [ e['pk_episode'] for e in self.episodes ]

                from Gnumed.business.gmEpisode import cEpisode
                for pk in episodes:
                        epi = cEpisode(aPK_obj = pk)
                        lines.append(_('\nEpisode %s%s%s%s:') % (
                                gmTools.u_left_double_angle_quote,
                                epi['description'],
                                gmTools.u_right_double_angle_quote,
                                gmTools.coalesce(epi['health_issue'], '', ' (%s)')
                        ))

                        # soap
                        if with_soap:
                                if patient.ID != self._payload['pk_patient']:
                                        msg = '<patient>.ID = %s but encounter %s belongs to patient %s' % (
                                                patient.ID,
                                                self._payload['pk_encounter'],
                                                self._payload['pk_patient']
                                        )
                                        raise ValueError(msg)
                                lines.extend(self.format_soap (
                                        episodes = [pk],
                                        left_margin = left_margin,
                                        soap_cats = None,               # meaning: all
                                        emr = emr,
                                        issues = issues
                                ))

                        # test results
                        if with_tests:
                                tests = emr.get_test_results_by_date (
                                        episodes = [pk],
                                        encounter = self._payload['pk_encounter']
                                )
                                if len(tests) > 0:
                                        lines.append('')
                                        lines.append(_('Measurements and Results:'))

                                for t in tests:
                                        lines.append(t.format())

                                del tests

                        # vaccinations
                        if with_vaccinations:
                                vaccs = emr.get_vaccinations (
                                        episodes = [pk],
                                        encounters = [ self._payload['pk_encounter'] ],
                                        order_by = 'date_given DESC, vaccine'
                                )
                                if len(vaccs) > 0:
                                        lines.append('')
                                        lines.append(_('Vaccinations:'))
                                for vacc in vaccs:
                                        lines.extend(vacc.format (
                                                with_indications = True,
                                                with_comment = True,
                                                with_reaction = True,
                                                date_format = '%Y-%m-%d'
                                        ))
                                del vaccs

                        # family history
                        if with_family_history:
                                fhx = emr.get_family_history(episodes = [pk])
                                if len(fhx) > 0:
                                        lines.append('')
                                        lines.append(_('Family History: %s') % len(fhx))
                                for f in fhx:
                                        lines.append(f.format (
                                                left_margin = (left_margin + 1),
                                                include_episode = False,
                                                include_comment = True
                                        ))
                                del fhx

                        # documents
                        if with_docs:
                                doc_folder = patient.get_document_folder()
                                docs = doc_folder.get_documents (
                                        pk_episodes = [pk],
                                        encounter = self._payload['pk_encounter']
                                )
                                if len(docs) > 0:
                                        lines.append('')
                                        lines.append(_('Documents:'))
                                for d in docs:
                                        lines.append(' ' + d.format(single_line = True))
                                del docs

                return lines

        #--------------------------------------------------------
        def format_maximum_information(self, patient=None):
                if patient is None:
                        from Gnumed.business.gmPerson import gmCurrentPatient, cPerson
                        if self._payload['pk_patient'] == gmCurrentPatient().ID:
                                patient = gmCurrentPatient()
                        else:
                                patient = cPerson(self._payload['pk_patient'])

                return self.format (
                        patient = patient,
                        fancy_header = True,
                        with_rfe_aoe = True,
                        with_soap = True,
                        with_docs = True,
                        with_tests = False,
                        with_vaccinations = True,
                        with_co_encountlet_hints = True,
                        with_family_history = True,
                        by_episode = False,
                        return_list = True
                )

        #--------------------------------------------------------
        def format(self, episodes=None, with_soap=False, left_margin=0, patient=None, issues=None, with_docs=True, with_tests=True, fancy_header=True, with_vaccinations=True, with_co_encountlet_hints=False, with_rfe_aoe=False, with_family_history=True, by_episode=False, return_list=False):
                """Format an encounter.

                Args:
                        episode: which episodes, touched upon by this encounter, to include information for
                        with_co_encountlet_hints:
                                - whether to include which *other* episodes were discussed during this encounter
                                - (only makes sense if episodes is not None, since that would preclude information)
                """
                lines = self.format_header (
                        fancy_header = fancy_header,
                        left_margin = left_margin,
                        with_rfe_aoe = with_rfe_aoe
                )

                if patient is None:
                        _log.debug('no patient, cannot load patient related data')
                        with_soap = False
                        with_tests = False
                        with_vaccinations = False
                        with_docs = False

                if by_episode:
                        lines.extend(self.format_by_episode (
                                episodes = episodes,
                                issues = issues,
                                left_margin = left_margin,
                                patient = patient,
                                with_soap = with_soap,
                                with_tests = with_tests,
                                with_docs = with_docs,
                                with_vaccinations = with_vaccinations,
                                with_family_history = with_family_history
                        ))
                else:
                        if with_soap:
                                lines.append('')
                                if patient.ID != self._payload['pk_patient']:
                                        msg = '<patient>.ID = %s but encounter %s belongs to patient %s' % (
                                                patient.ID,
                                                self._payload['pk_encounter'],
                                                self._payload['pk_patient']
                                        )
                                        raise ValueError(msg)
                                emr = patient.emr
                                lines.extend(self.format_soap (
                                        episodes = episodes,
                                        left_margin = left_margin,
                                        soap_cats = None,               # meaning: all
                                        emr = emr,
                                        issues = issues
                                ))

        #               # family history
        #               if with_family_history:
        #                       if episodes is not None:
        #                               fhx = emr.get_family_history(episodes = episodes)
        #                               if len(fhx) > 0:
        #                                       lines.append(u'')
        #                                       lines.append(_('Family History: %s') % len(fhx))
        #                               for f in fhx:
        #                                       lines.append(f.format (
        #                                               left_margin = (left_margin + 1),
        #                                               include_episode = False,
        #                                               include_comment = True
        #                                       ))
        #                               del fhx

                        # test results
                        if with_tests:
                                emr = patient.emr
                                tests = emr.get_test_results_by_date (
                                        episodes = episodes,
                                        encounter = self._payload['pk_encounter']
                                )
                                if len(tests) > 0:
                                        lines.append('')
                                        lines.append(_('Measurements and Results:'))
                                for t in tests:
                                        lines.append(t.format())
                                del tests

                        # vaccinations
                        if with_vaccinations:
                                emr = patient.emr
                                vaccs = emr.get_vaccinations (
                                        episodes = episodes,
                                        encounters = [ self._payload['pk_encounter'] ],
                                        order_by = 'date_given DESC, vaccine'
                                )
                                if len(vaccs) > 0:
                                        lines.append('')
                                        lines.append(_('Vaccinations:'))
                                for vacc in vaccs:
                                        lines.extend(vacc.format (
                                                with_indications = True,
                                                with_comment = True,
                                                with_reaction = True,
                                                date_format = '%Y-%m-%d'
                                        ))
                                del vaccs

                        # documents
                        if with_docs:
                                doc_folder = patient.get_document_folder()
                                docs = doc_folder.get_documents (
                                        pk_episodes = episodes,
                                        encounter = self._payload['pk_encounter']
                                )
                                if len(docs) > 0:
                                        lines.append('')
                                        lines.append(_('Documents:'))
                                for d in docs:
                                        lines.append(' ' + d.format(single_line = True))
                                del docs

                        # co-encountlets
                        if with_co_encountlet_hints:
                                if episodes is not None:
                                        other_epis = self.get_episodes(exclude = episodes)
                                        if len(other_epis) > 0:
                                                lines.append('')
                                                lines.append(_('%s other episodes touched upon during this encounter:') % len(other_epis))
                                        for epi in other_epis:
                                                lines.append(' %s%s%s%s' % (
                                                        gmTools.u_left_double_angle_quote,
                                                        epi['description'],
                                                        gmTools.u_right_double_angle_quote,
                                                        gmTools.coalesce(epi['health_issue'], '', ' (%s)')
                                                ))

                if return_list:
                        return lines

                eol_w_margin = '\n%s' % (' ' * left_margin)
                return '%s\n' % eol_w_margin.join(lines)

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def _get_generic_codes_rfe(self):
                if len(self._payload['pk_generic_codes_rfe']) == 0:
                        return []

                cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
                args = {'pks': self._payload['pk_generic_codes_rfe']}
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]

        def _set_generic_codes_rfe(self, pk_codes):
                queries = []
                # remove all codes
                if len(self._payload['pk_generic_codes_rfe']) > 0:
                        queries.append ({
                                'sql': 'DELETE FROM clin.lnk_code2rfe WHERE fk_item = %(enc)s AND fk_generic_code = ANY(%(codes)s)',
                                'args': {
                                        'enc': self._payload['pk_encounter'],
                                        'codes': self._payload['pk_generic_codes_rfe']
                                }
                        })
                # add new codes
                for pk_code in pk_codes:
                        queries.append ({
                                'sql': 'INSERT INTO clin.lnk_code2rfe (fk_item, fk_generic_code) VALUES (%(enc)s, %(pk_code)s)',
                                'args': {
                                        'enc': self._payload['pk_encounter'],
                                        'pk_code': pk_code
                                }
                        })
                if len(queries) == 0:
                        return
                # run it all in one transaction
                gmPG2.run_rw_queries(queries = queries)
                self.refetch_payload()
                return

        generic_codes_rfe = property(_get_generic_codes_rfe, _set_generic_codes_rfe)
        #--------------------------------------------------------
        def _get_generic_codes_aoe(self):
                if len(self._payload['pk_generic_codes_aoe']) == 0:
                        return []

                cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
                args = {'pks': self._payload['pk_generic_codes_aoe']}
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]

        def _set_generic_codes_aoe(self, pk_codes):
                queries = []
                # remove all codes
                if len(self._payload['pk_generic_codes_aoe']) > 0:
                        queries.append ({
                                'sql': 'DELETE FROM clin.lnk_code2aoe WHERE fk_item = %(enc)s AND fk_generic_code = ANY(%(codes)s)',
                                'args': {
                                        'enc': self._payload['pk_encounter'],
                                        'codes': self._payload['pk_generic_codes_aoe']
                                }
                        })
                # add new codes
                for pk_code in pk_codes:
                        queries.append ({
                                'sql': 'INSERT INTO clin.lnk_code2aoe (fk_item, fk_generic_code) VALUES (%(enc)s, %(pk_code)s)',
                                'args': {
                                        'enc': self._payload['pk_encounter'],
                                        'pk_code': pk_code
                                }
                        })
                if len(queries) == 0:
                        return
                # run it all in one transaction
                gmPG2.run_rw_queries(queries = queries)
                self.refetch_payload()
                return

        generic_codes_aoe = property(_get_generic_codes_aoe, _set_generic_codes_aoe)
        #--------------------------------------------------------
        def _get_praxis_branch(self):
                if self._payload['pk_org_unit'] is None:
                        return None
                return gmPraxis.get_praxis_branch_by_org_unit(pk_org_unit = self._payload['pk_org_unit'])

        praxis_branch = property(_get_praxis_branch)
        #--------------------------------------------------------
        def _get_org_unit(self):
                if self._payload['pk_org_unit'] is None:
                        return None
                return gmOrganization.cOrgUnit(aPK_obj = self._payload['pk_org_unit'])

        org_unit = property(_get_org_unit)
        #--------------------------------------------------------
        def _get_formatted_revision_history(self):
                cmd = """SELECT
                                '<N/A>'::TEXT as audit__action_applied,
                                NULL AS audit__action_when,
                                '<N/A>'::TEXT AS audit__action_by,
                                pk_audit,
                                row_version,
                                modified_when,
                                modified_by,
                                pk, fk_patient, fk_type, fk_location, source_time_zone, reason_for_encounter, assessment_of_encounter, started, last_affirmed
                        FROM clin.encounter
                        WHERE pk = %(pk_encounter)s
                UNION ALL (
                        SELECT
                                audit_action as audit__action_applied,
                                audit_when as audit__action_when,
                                audit_by as audit__action_by,
                                pk_audit,
                                orig_version as row_version,
                                orig_when as modified_when,
                                orig_by as modified_by,
                                pk, fk_patient, fk_type, fk_location, source_time_zone, reason_for_encounter, assessment_of_encounter, started, last_affirmed
                        FROM audit.log_encounter
                        WHERE pk = %(pk_encounter)s
                )
                ORDER BY row_version DESC
                """
                args = {'pk_encounter': self._payload['pk_encounter']}
                title = _('Encounter: %s%s%s') % (
                        gmTools.u_left_double_angle_quote,
                        self._payload['l10n_type'],
                        gmTools.u_right_double_angle_quote
                )
                return '\n'.join(self._get_revision_history(cmd, args, title))

        formatted_revision_history = property(_get_formatted_revision_history)

Ancestors

Instance variables

var episodes

exclude: list of episode PKs to exclude

Expand source code
def get_episodes(self, exclude=None):
        """exclude: list of episode PKs to exclude"""
        args = {'enc': self.pk_obj}
        extra_where_parts = ''
        if exclude is not None:
                extra_where_parts = 'AND pk_episode <> ALL(%(excluded_pks)s)'
                args['excluded_pks'] = exclude
        SQL = """
                SELECT * FROM clin.v_pat_episodes
                WHERE pk_episode IN (
                                SELECT DISTINCT fk_episode
                                FROM clin.clin_root_item
                                WHERE fk_encounter = %%(enc)s

                                        UNION

                                SELECT DISTINCT fk_episode
                                FROM blobs.doc_med
                                WHERE fk_encounter = %%(enc)s
                ) %s""" % extra_where_parts
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        from Gnumed.business.gmEpisode import cEpisode
        return [ cEpisode(row = {'data': r, 'pk_field': 'pk_episode'})  for r in rows ]
var formatted_revision_history
Expand source code
def _get_formatted_revision_history(self):
        cmd = """SELECT
                        '<N/A>'::TEXT as audit__action_applied,
                        NULL AS audit__action_when,
                        '<N/A>'::TEXT AS audit__action_by,
                        pk_audit,
                        row_version,
                        modified_when,
                        modified_by,
                        pk, fk_patient, fk_type, fk_location, source_time_zone, reason_for_encounter, assessment_of_encounter, started, last_affirmed
                FROM clin.encounter
                WHERE pk = %(pk_encounter)s
        UNION ALL (
                SELECT
                        audit_action as audit__action_applied,
                        audit_when as audit__action_when,
                        audit_by as audit__action_by,
                        pk_audit,
                        orig_version as row_version,
                        orig_when as modified_when,
                        orig_by as modified_by,
                        pk, fk_patient, fk_type, fk_location, source_time_zone, reason_for_encounter, assessment_of_encounter, started, last_affirmed
                FROM audit.log_encounter
                WHERE pk = %(pk_encounter)s
        )
        ORDER BY row_version DESC
        """
        args = {'pk_encounter': self._payload['pk_encounter']}
        title = _('Encounter: %s%s%s') % (
                gmTools.u_left_double_angle_quote,
                self._payload['l10n_type'],
                gmTools.u_right_double_angle_quote
        )
        return '\n'.join(self._get_revision_history(cmd, args, title))
var generic_codes_aoe
Expand source code
def _get_generic_codes_aoe(self):
        if len(self._payload['pk_generic_codes_aoe']) == 0:
                return []

        cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
        args = {'pks': self._payload['pk_generic_codes_aoe']}
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]
var generic_codes_rfe
Expand source code
def _get_generic_codes_rfe(self):
        if len(self._payload['pk_generic_codes_rfe']) == 0:
                return []

        cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
        args = {'pks': self._payload['pk_generic_codes_rfe']}
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]
var org_unit
Expand source code
def _get_org_unit(self):
        if self._payload['pk_org_unit'] is None:
                return None
        return gmOrganization.cOrgUnit(aPK_obj = self._payload['pk_org_unit'])
var praxis_branch
Expand source code
def _get_praxis_branch(self):
        if self._payload['pk_org_unit'] is None:
                return None
        return gmPraxis.get_praxis_branch_by_org_unit(pk_org_unit = self._payload['pk_org_unit'])

Methods

def add_code(self, pk_code=None, field=None)

must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)

Expand source code
def add_code(self, pk_code=None, field=None):
        """<pk_code> must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
        if field == 'rfe':
                cmd = "INSERT INTO clin.lnk_code2rfe (fk_item, fk_generic_code) VALUES (%(item)s, %(code)s)"
        elif field == 'aoe':
                cmd = "INSERT INTO clin.lnk_code2aoe (fk_item, fk_generic_code) VALUES (%(item)s, %(code)s)"
        else:
                raise ValueError('<field> must be one of "rfe" or "aoe", not "%s"', field)
        args = {
                'item': self._payload['pk_encounter'],
                'code': pk_code
        }
        gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
        return True
def format(self, episodes=None, with_soap=False, left_margin=0, patient=None, issues=None, with_docs=True, with_tests=True, fancy_header=True, with_vaccinations=True, with_co_encountlet_hints=False, with_rfe_aoe=False, with_family_history=True, by_episode=False, return_list=False)

Format an encounter.

Args

episode
which episodes, touched upon by this encounter, to include information for

with_co_encountlet_hints: - whether to include which other episodes were discussed during this encounter - (only makes sense if episodes is not None, since that would preclude information)

Expand source code
def format(self, episodes=None, with_soap=False, left_margin=0, patient=None, issues=None, with_docs=True, with_tests=True, fancy_header=True, with_vaccinations=True, with_co_encountlet_hints=False, with_rfe_aoe=False, with_family_history=True, by_episode=False, return_list=False):
        """Format an encounter.

        Args:
                episode: which episodes, touched upon by this encounter, to include information for
                with_co_encountlet_hints:
                        - whether to include which *other* episodes were discussed during this encounter
                        - (only makes sense if episodes is not None, since that would preclude information)
        """
        lines = self.format_header (
                fancy_header = fancy_header,
                left_margin = left_margin,
                with_rfe_aoe = with_rfe_aoe
        )

        if patient is None:
                _log.debug('no patient, cannot load patient related data')
                with_soap = False
                with_tests = False
                with_vaccinations = False
                with_docs = False

        if by_episode:
                lines.extend(self.format_by_episode (
                        episodes = episodes,
                        issues = issues,
                        left_margin = left_margin,
                        patient = patient,
                        with_soap = with_soap,
                        with_tests = with_tests,
                        with_docs = with_docs,
                        with_vaccinations = with_vaccinations,
                        with_family_history = with_family_history
                ))
        else:
                if with_soap:
                        lines.append('')
                        if patient.ID != self._payload['pk_patient']:
                                msg = '<patient>.ID = %s but encounter %s belongs to patient %s' % (
                                        patient.ID,
                                        self._payload['pk_encounter'],
                                        self._payload['pk_patient']
                                )
                                raise ValueError(msg)
                        emr = patient.emr
                        lines.extend(self.format_soap (
                                episodes = episodes,
                                left_margin = left_margin,
                                soap_cats = None,               # meaning: all
                                emr = emr,
                                issues = issues
                        ))

#               # family history
#               if with_family_history:
#                       if episodes is not None:
#                               fhx = emr.get_family_history(episodes = episodes)
#                               if len(fhx) > 0:
#                                       lines.append(u'')
#                                       lines.append(_('Family History: %s') % len(fhx))
#                               for f in fhx:
#                                       lines.append(f.format (
#                                               left_margin = (left_margin + 1),
#                                               include_episode = False,
#                                               include_comment = True
#                                       ))
#                               del fhx

                # test results
                if with_tests:
                        emr = patient.emr
                        tests = emr.get_test_results_by_date (
                                episodes = episodes,
                                encounter = self._payload['pk_encounter']
                        )
                        if len(tests) > 0:
                                lines.append('')
                                lines.append(_('Measurements and Results:'))
                        for t in tests:
                                lines.append(t.format())
                        del tests

                # vaccinations
                if with_vaccinations:
                        emr = patient.emr
                        vaccs = emr.get_vaccinations (
                                episodes = episodes,
                                encounters = [ self._payload['pk_encounter'] ],
                                order_by = 'date_given DESC, vaccine'
                        )
                        if len(vaccs) > 0:
                                lines.append('')
                                lines.append(_('Vaccinations:'))
                        for vacc in vaccs:
                                lines.extend(vacc.format (
                                        with_indications = True,
                                        with_comment = True,
                                        with_reaction = True,
                                        date_format = '%Y-%m-%d'
                                ))
                        del vaccs

                # documents
                if with_docs:
                        doc_folder = patient.get_document_folder()
                        docs = doc_folder.get_documents (
                                pk_episodes = episodes,
                                encounter = self._payload['pk_encounter']
                        )
                        if len(docs) > 0:
                                lines.append('')
                                lines.append(_('Documents:'))
                        for d in docs:
                                lines.append(' ' + d.format(single_line = True))
                        del docs

                # co-encountlets
                if with_co_encountlet_hints:
                        if episodes is not None:
                                other_epis = self.get_episodes(exclude = episodes)
                                if len(other_epis) > 0:
                                        lines.append('')
                                        lines.append(_('%s other episodes touched upon during this encounter:') % len(other_epis))
                                for epi in other_epis:
                                        lines.append(' %s%s%s%s' % (
                                                gmTools.u_left_double_angle_quote,
                                                epi['description'],
                                                gmTools.u_right_double_angle_quote,
                                                gmTools.coalesce(epi['health_issue'], '', ' (%s)')
                                        ))

        if return_list:
                return lines

        eol_w_margin = '\n%s' % (' ' * left_margin)
        return '%s\n' % eol_w_margin.join(lines)
def format_by_episode(self, episodes=None, issues=None, left_margin=0, patient=None, with_soap=False, with_tests=True, with_docs=True, with_vaccinations=True, with_family_history=True)
Expand source code
def format_by_episode(self, episodes=None, issues=None, left_margin=0, patient=None, with_soap=False, with_tests=True, with_docs=True, with_vaccinations=True, with_family_history=True):

        if patient is not None:
                emr = patient.emr

        lines = []
        if episodes is None:
                episodes = [ e['pk_episode'] for e in self.episodes ]

        from Gnumed.business.gmEpisode import cEpisode
        for pk in episodes:
                epi = cEpisode(aPK_obj = pk)
                lines.append(_('\nEpisode %s%s%s%s:') % (
                        gmTools.u_left_double_angle_quote,
                        epi['description'],
                        gmTools.u_right_double_angle_quote,
                        gmTools.coalesce(epi['health_issue'], '', ' (%s)')
                ))

                # soap
                if with_soap:
                        if patient.ID != self._payload['pk_patient']:
                                msg = '<patient>.ID = %s but encounter %s belongs to patient %s' % (
                                        patient.ID,
                                        self._payload['pk_encounter'],
                                        self._payload['pk_patient']
                                )
                                raise ValueError(msg)
                        lines.extend(self.format_soap (
                                episodes = [pk],
                                left_margin = left_margin,
                                soap_cats = None,               # meaning: all
                                emr = emr,
                                issues = issues
                        ))

                # test results
                if with_tests:
                        tests = emr.get_test_results_by_date (
                                episodes = [pk],
                                encounter = self._payload['pk_encounter']
                        )
                        if len(tests) > 0:
                                lines.append('')
                                lines.append(_('Measurements and Results:'))

                        for t in tests:
                                lines.append(t.format())

                        del tests

                # vaccinations
                if with_vaccinations:
                        vaccs = emr.get_vaccinations (
                                episodes = [pk],
                                encounters = [ self._payload['pk_encounter'] ],
                                order_by = 'date_given DESC, vaccine'
                        )
                        if len(vaccs) > 0:
                                lines.append('')
                                lines.append(_('Vaccinations:'))
                        for vacc in vaccs:
                                lines.extend(vacc.format (
                                        with_indications = True,
                                        with_comment = True,
                                        with_reaction = True,
                                        date_format = '%Y-%m-%d'
                                ))
                        del vaccs

                # family history
                if with_family_history:
                        fhx = emr.get_family_history(episodes = [pk])
                        if len(fhx) > 0:
                                lines.append('')
                                lines.append(_('Family History: %s') % len(fhx))
                        for f in fhx:
                                lines.append(f.format (
                                        left_margin = (left_margin + 1),
                                        include_episode = False,
                                        include_comment = True
                                ))
                        del fhx

                # documents
                if with_docs:
                        doc_folder = patient.get_document_folder()
                        docs = doc_folder.get_documents (
                                pk_episodes = [pk],
                                encounter = self._payload['pk_encounter']
                        )
                        if len(docs) > 0:
                                lines.append('')
                                lines.append(_('Documents:'))
                        for d in docs:
                                lines.append(' ' + d.format(single_line = True))
                        del docs

        return lines
def format_header(self, fancy_header=True, left_margin=0, with_rfe_aoe=False)
Expand source code
def format_header(self, fancy_header=True, left_margin=0, with_rfe_aoe=False):
        lines = []

        if fancy_header:
                return self.__format_header_fancy(left_margin = left_margin)

        now = gmDateTime.pydt_now_here()
        if now.strftime('%Y-%m-%d') == self._payload['started_original_tz'].strftime('%Y-%m-%d'):
                start = '%s %s' % (
                        _('today'),
                        self._payload['started_original_tz'].strftime('%H:%M')
                )
        else:
                start = self._payload['started_original_tz'].strftime('%Y-%m-%d %H:%M')
        lines.append('%s%s: %s - %s%s%s' % (
                ' ' * left_margin,
                self._payload['l10n_type'],
                start,
                self._payload['last_affirmed_original_tz'].strftime('%H:%M'),
                gmTools.coalesce(self._payload['assessment_of_encounter'], '', ' \u00BB%s\u00AB'),
                gmTools.coalesce(self._payload['praxis_branch'], '', ' @%s')
        ))
        if with_rfe_aoe:
                if self._payload['reason_for_encounter'] is not None:
                        lines.append('%s: %s' % (_('RFE'), self._payload['reason_for_encounter']))
                codes = self.generic_codes_rfe
                for c in codes:
                        lines.append(' %s: %s (%s - %s)' % (
                                c['code'],
                                c['term'],
                                c['name_short'],
                                c['version']
                        ))
                if len(codes) > 0:
                        lines.append('')
                if self._payload['assessment_of_encounter'] is not None:
                        lines.append('%s: %s' % (_('AOE'), self._payload['assessment_of_encounter']))
                codes = self.generic_codes_aoe
                if len(codes) > 0:
                        lines.append('')
                for c in codes:
                        lines.append(' %s: %s (%s - %s)' % (
                                c['code'],
                                c['term'],
                                c['name_short'],
                                c['version']
                        ))
                if len(codes) > 0:
                        lines.append('')
                del codes

        return lines
def format_latex(self, date_format=None, soap_cats=None, soap_order=None)
Expand source code
def format_latex(self, date_format=None, soap_cats=None, soap_order=None):

        nothing2format = (
                (self._payload['reason_for_encounter'] is None)
                        and
                (self._payload['assessment_of_encounter'] is None)
                        and
                (self.has_soap_narrative(soap_cats = 'soapu') is False)
        )
        if nothing2format:
                return ''

        if date_format is None:
                date_format = '%A, %b %d %Y'

        tex =  '% -------------------------------------------------------------\n'
        tex += '% much recommended: \\usepackage(tabu)\n'
        tex += '% much recommended: \\usepackage(longtable)\n'
        tex += '% best wrapped in: "\\begin{longtabu} to \\textwidth {lX[,L]}"\n'
        tex += '% -------------------------------------------------------------\n'
        tex += '\\hline \n'
        tex += '\\multicolumn{2}{l}{%s: %s ({\\footnotesize %s - %s})} \\tabularnewline \n' % (
                gmTools.tex_escape_string(self._payload['l10n_type']),
                gmTools.tex_escape_string(self._payload['started'].strftime(date_format)),
                gmTools.tex_escape_string(self._payload['started'].strftime('%H:%M')),
                gmTools.tex_escape_string(self._payload['last_affirmed'].strftime('%H:%M'))
        )
        tex += '\\hline \n'

        if self._payload['reason_for_encounter'] is not None:
                tex += '%s & %s \\tabularnewline \n' % (
                        gmTools.tex_escape_string(_('RFE')),
                        gmTools.tex_escape_string(self._payload['reason_for_encounter'])
                )
        if self._payload['assessment_of_encounter'] is not None:
                tex += '%s & %s \\tabularnewline \n' % (
                        gmTools.tex_escape_string(_('AOE')),
                        gmTools.tex_escape_string(self._payload['assessment_of_encounter'])
                )

        from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
        for epi in self.get_episodes():
                soaps = epi.get_narrative(soap_cats = soap_cats, encounters = [self.pk_obj], order_by = soap_order)
                if len(soaps) == 0:
                        continue
                tex += '\\multicolumn{2}{l}{\\emph{%s: %s%s}} \\tabularnewline \n' % (
                        gmTools.tex_escape_string(_('Problem')),
                        gmTools.tex_escape_string(epi['description']),
                        gmTools.tex_escape_string (
                                gmTools.coalesce (
                                        value2test = diagnostic_certainty_classification2str(epi['diagnostic_certainty_classification']),
                                        return_instead = '',
                                        template4value = ' {\\footnotesize [%s]}',
                                        none_equivalents = [None, '']
                                )
                        )
                )
                if epi['pk_health_issue'] is not None:
                        tex += '\\multicolumn{2}{l}{\\emph{%s: %s%s}} \\tabularnewline \n' % (
                                gmTools.tex_escape_string(_('Health issue')),
                                gmTools.tex_escape_string(epi['health_issue']),
                                gmTools.tex_escape_string (
                                        gmTools.coalesce (
                                                value2test = diagnostic_certainty_classification2str(epi['diagnostic_certainty_classification_issue']),
                                                return_instead = '',
                                                template4value = ' {\\footnotesize [%s]}',
                                                none_equivalents = [None, '']
                                        )
                                )
                        )
                for soap in soaps:
                        tex += '{\\small %s} & {\\small %s} \\tabularnewline \n' % (
                                gmTools.tex_escape_string(gmSoapDefs.soap_cat2l10n[soap['soap_cat']]),
                                gmTools.tex_escape_string(soap['narrative'], replace_eol = True)
                        )
                tex += ' & \\tabularnewline \n'

        return tex
def format_maximum_information(self, patient=None)
Expand source code
def format_maximum_information(self, patient=None):
        if patient is None:
                from Gnumed.business.gmPerson import gmCurrentPatient, cPerson
                if self._payload['pk_patient'] == gmCurrentPatient().ID:
                        patient = gmCurrentPatient()
                else:
                        patient = cPerson(self._payload['pk_patient'])

        return self.format (
                patient = patient,
                fancy_header = True,
                with_rfe_aoe = True,
                with_soap = True,
                with_docs = True,
                with_tests = False,
                with_vaccinations = True,
                with_co_encountlet_hints = True,
                with_family_history = True,
                by_episode = False,
                return_list = True
        )
def format_soap(self, episodes=None, left_margin=0, soap_cats='soapu', emr=None, issues=None)
Expand source code
def format_soap(self, episodes=None, left_margin=0, soap_cats='soapu', emr=None, issues=None):

        lines = []
        for soap_cat in gmSoapDefs.soap_cats_str2list(soap_cats):
                soap_cat_narratives = emr.get_clin_narrative (
                        episodes = episodes,
                        issues = issues,
                        encounters = [self._payload['pk_encounter']],
                        soap_cats = [soap_cat]
                )
                if soap_cat_narratives is None:
                        continue
                if len(soap_cat_narratives) == 0:
                        continue

                lines.append('%s%s %s %s' % (
                        gmTools.u_box_top_left_arc,
                        gmTools.u_box_horiz_single,
                        gmSoapDefs.soap_cat2l10n_str[soap_cat],
                        gmTools.u_box_horiz_single * 5
                ))
                for soap_entry in soap_cat_narratives:
                        txt = gmTools.wrap (
                                text = soap_entry['narrative'],
                                width = 75,
                                initial_indent = '',
                                subsequent_indent = (' ' * left_margin)
                        )
                        lines.append(txt)
                        txt = '%s%s %.8s, %s %s' % (
                                ' ' * 40,
                                gmTools.u_box_horiz_light_heavy,
                                soap_entry['modified_by'],
                                soap_entry['date'].strftime('%Y-%m-%d %H:%M'),
                                gmTools.u_box_horiz_heavy_light
                        )
                        lines.append(txt)
                        lines.append('')

        return lines
def get_episodes(self, exclude=None)

exclude: list of episode PKs to exclude

Expand source code
def get_episodes(self, exclude=None):
        """exclude: list of episode PKs to exclude"""
        args = {'enc': self.pk_obj}
        extra_where_parts = ''
        if exclude is not None:
                extra_where_parts = 'AND pk_episode <> ALL(%(excluded_pks)s)'
                args['excluded_pks'] = exclude
        SQL = """
                SELECT * FROM clin.v_pat_episodes
                WHERE pk_episode IN (
                                SELECT DISTINCT fk_episode
                                FROM clin.clin_root_item
                                WHERE fk_encounter = %%(enc)s

                                        UNION

                                SELECT DISTINCT fk_episode
                                FROM blobs.doc_med
                                WHERE fk_encounter = %%(enc)s
                ) %s""" % extra_where_parts
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        from Gnumed.business.gmEpisode import cEpisode
        return [ cEpisode(row = {'data': r, 'pk_field': 'pk_episode'})  for r in rows ]
def get_latest_soap(self, soap_cat=None, episode=None)
Expand source code
def get_latest_soap(self, soap_cat=None, episode=None):

        if soap_cat is not None:
                soap_cat = soap_cat.casefold()
        if episode is None:
                epi_part = 'fk_episode is null'
        else:
                epi_part = 'fk_episode = %(epi)s'
        SQL = """
                SELECT narrative
                FROM clin.clin_narrative
                where
                        fk_encounter = %%(enc)s
                                and
                        soap_cat = %%(cat)s
                                and
                        %s
                order by clin_when desc
                limit 1""" % epi_part
        args = {'enc': self.pk_obj, 'cat': soap_cat, 'epi': episode}
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        if len(rows) == 0:
                return None

        return rows[0][0]
def has_clinical_data(self)
Expand source code
def has_clinical_data(self):
        SQL = """SELECT EXISTS (
                SELECT 1 FROM clin.v_pat_items WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s
                        UNION ALL
                SELECT 1 FROM blobs.v_doc_med WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s
        )"""
        args = {
                'pat': self._payload['pk_patient'],
                'enc': self.pk_obj
        }
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        return rows[0][0]
def has_documents(self)
Expand source code
def has_documents(self):
        SQL = "SELECT EXISTS (SELECT 1 FROM blobs.v_doc_med WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s)"
        args = {
                'pat': self._payload['pk_patient'],
                'enc': self.pk_obj
        }
        rows = gmPG2.run_ro_queries (queries = [{'sql': SQL,'args': args}])
        return rows[0][0]
def has_narrative(self)
Expand source code
def has_narrative(self):
        SQL = "SELECT EXISTS (SELECT 1 FROM clin.v_pat_items WHERE pk_patient=%(pat)s and pk_encounter=%(enc)s)"
        args = {
                'pat': self._payload['pk_patient'],
                'enc': self.pk_obj
        }
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL,     'args': args}])
        return rows[0][0]
def has_soap_narrative(self, soap_cats=None)

soap_cats: = admin category

Expand source code
def has_soap_narrative(self, soap_cats=None):
        """soap_cats: <space> = admin category"""

        if soap_cats is None:
                soap_cats = 'soap '
        else:
                soap_cats = soap_cats.casefold()

        cats = []
        for cat in soap_cats:
                if cat in 'soapu':
                        cats.append(cat)
                        continue
                if cat == ' ':
                        cats.append(None)
        SQL = """SELECT EXISTS (
                SELECT 1 FROM clin.clin_narrative
                WHERE
                        fk_encounter = %(enc)s
                                AND
                        soap_cat = ANY(%(cats)s)
                LIMIT 1
        )"""
        args = {'enc': self._payload['pk_encounter'], 'cats': cats}
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        return rows[0][0]
def lock(self, exclusive=False, link_obj=None)
Expand source code
def lock(self, exclusive=False, link_obj=None):
        return lock_encounter(self.pk_obj, exclusive = exclusive, link_obj = link_obj)
def remove_code(self, pk_code=None, field=None)

must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)

Expand source code
def remove_code(self, pk_code=None, field=None):
        """<pk_code> must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
        if field == 'rfe':
                cmd = "DELETE FROM clin.lnk_code2rfe WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
        elif field == 'aoe':
                cmd = "DELETE FROM clin.lnk_code2aoe WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
        else:
                raise ValueError('<field> must be one of "rfe" or "aoe", not "%s"', field)
        args = {
                'item': self._payload['pk_encounter'],
                'code': pk_code
        }
        gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
        return True
def set_active(self)

Set the encounter as the active one.

"Setting active" means making sure the encounter row has the youngest "last_affirmed" timestamp of all encounter rows for this patient.

Expand source code
def set_active(self):
        """Set the encounter as the active one.

        "Setting active" means making sure the encounter
        row has the youngest "last_affirmed" timestamp of
        all encounter rows for this patient.
        """
        self['last_affirmed'] = gmDateTime.pydt_now_here()
        self.save()
def transfer_all_data_to_another_encounter(self, pk_target_encounter=None)
Expand source code
def transfer_all_data_to_another_encounter(self, pk_target_encounter=None):
        if pk_target_encounter == self.pk_obj:
                return True

        SQL = "SELECT clin.transfer_all_encounter_data(%(src)s, %(trg)s)"
        args = {
                'src': self.pk_obj,
                'trg': pk_target_encounter
        }
        gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
        return True
def transfer_clinical_data(self, source_episode=None, target_episode=None)

Moves every element currently linked to the current encounter and the source_episode onto target_episode.

@param source_episode The episode the elements are currently linked to. @type target_episode A cEpisode intance. @param target_episode The episode the elements will be relinked to. @type target_episode A cEpisode intance.

Expand source code
def transfer_clinical_data(self, source_episode=None, target_episode=None):
        """
        Moves every element currently linked to the current encounter
        and the source_episode onto target_episode.

        @param source_episode The episode the elements are currently linked to.
        @type target_episode A cEpisode intance.
        @param target_episode The episode the elements will be relinked to.
        @type target_episode A cEpisode intance.
        """
        if source_episode['pk_episode'] == target_episode['pk_episode']:
                return True

        SQL = """
                UPDATE clin.clin_root_item
                SET fk_episode = %(trg)s
                WHERE
                        fk_encounter = %(enc)s AND
                        fk_episode = %(src)s
                """
        args = {
                'trg': target_episode['pk_episode'],
                'enc': self.pk_obj,
                'src': source_episode['pk_episode']
        }
        gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
        self.refetch_payload()
        return True
def unlock(self, exclusive=False, link_obj=None)
Expand source code
def unlock(self, exclusive=False, link_obj=None):
        return unlock_encounter(self.pk_obj, exclusive = exclusive, link_obj = link_obj)

Inherited members