Module Gnumed.business.gmEncounter

GNUmed clinical encounter handling.

license: GPL v2 or later

Functions

def create_encounter(fk_patient=None, enc_type=None)
Expand source code
def create_encounter(fk_patient=None, enc_type=None):
        """Creates a new encounter for a patient.

        fk_patient - patient PK
        enc_type - type of encounter
        """
        if enc_type is None:
                enc_type = 'in surgery'
        # insert new encounter
        queries = []
        try:
                enc_type = int(enc_type)
                cmd = """
                        INSERT INTO clin.encounter (fk_patient, fk_type, fk_location)
                        VALUES (%(pat)s, %(typ)s, %(prax)s) RETURNING pk"""
        except ValueError:
                enc_type = enc_type
                cmd = """
                        INSERT INTO clin.encounter (fk_patient, fk_location, fk_type)
                        VALUES (
                                %(pat)s,
                                %(prax)s,
                                coalesce (
                                        (SELECT pk FROM clin.encounter_type WHERE description = %(typ)s),
                                        -- pick the first available
                                        (SELECT pk FROM clin.encounter_type limit 1)
                                )
                        ) RETURNING pk"""
        praxis = gmPraxis.gmCurrentPraxisBranch()
        args = {'pat': fk_patient, 'typ': enc_type, 'prax': praxis['pk_org_unit']}
        queries.append({'sql': cmd, 'args': args})
        rows = gmPG2.run_rw_queries(queries = queries, return_data = True)
        return cEncounter(aPK_obj = rows[0]['pk'])

Creates a new encounter for a patient.

fk_patient - patient PK enc_type - type of encounter

def create_encounter_type(description=None, l10n_description=None)
Expand source code
def create_encounter_type(description=None, l10n_description=None):
        """This will attempt to create a NEW encounter type."""

        # need a system name, so derive one if necessary
        if description is None:
                description = l10n_description

        args = {
                'desc': description,
                'l10n_desc': l10n_description
        }

        _log.debug('creating encounter type: %s, %s', description, l10n_description)

        # does it exist already ?
        cmd = "SELECT description, _(description) FROM clin.encounter_type WHERE description = %(desc)s"
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])

        # yes
        if len(rows) > 0:
                # both system and l10n name are the same so all is well
                if (rows[0][0] == description) and (rows[0][1] == l10n_description):
                        _log.info('encounter type [%s] already exists with the proper translation')
                        return {'description': description, 'l10n_description': l10n_description}

                # or maybe there just wasn't a translation to
                # the current language for this type yet ?
                cmd = "SELECT EXISTS (SELECT 1 FROM i18n.translations WHERE orig = %(desc)s and lang = i18n.get_curr_lang())"
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])

                # there was, so fail
                if rows[0][0]:
                        _log.error('encounter type [%s] already exists but with another translation')
                        return None

                # else set it
                cmd = "SELECT i18n.upd_tx(%(desc)s, %(l10n_desc)s)"
                rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return {'description': description, 'l10n_description': l10n_description}

        # no
        queries = [
                {'sql': "INSERT INTO clin.encounter_type (description) VALUES (%(desc)s)", 'args': args},
                {'sql': "SELECT i18n.upd_tx(%(desc)s, %(l10n_desc)s)", 'args': args}
        ]
        rows = gmPG2.run_rw_queries(queries = queries)

        return {'description': description, 'l10n_description': l10n_description}

This will attempt to create a NEW encounter type.

def delete_encounter(pk_encounter)
Expand source code
def delete_encounter(pk_encounter):
        """Deletes an encounter by PK.

        - attempts to obtain an exclusive lock which should
          fail if the encounter is the active encounter in
          this or any other client
        - catches DB exceptions which should mostly be related
          to clinical data already having been attached to
          the encounter thus making deletion fail
        """
        conn = gmPG2.get_connection(readonly = False)
        if not lock_encounter(pk_encounter, exclusive = True, link_obj = conn):
                _log.debug('cannot lock encounter [%s] for deletion, it seems in use', pk_encounter)
                return False

        SQL = "DELETE FROM clin.encounter WHERE pk = %(enc)s"
        args = {'enc': pk_encounter}
        try:
                gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
        except gmPG2.PG_ERROR_EXCEPTION as exc:
                _log.exception('cannot delete encounter [%s]', pk_encounter)
                gmPG2.log_pg_exception_details(exc)
                unlock_encounter(pk_encounter, exclusive = True, link_obj = conn)
                return False

        unlock_encounter(pk_encounter, exclusive = True, link_obj = conn)
        return True

Deletes an encounter by PK.

  • attempts to obtain an exclusive lock which should fail if the encounter is the active encounter in this or any other client
  • catches DB exceptions which should mostly be related to clinical data already having been attached to the encounter thus making deletion fail
def delete_encounter_type(description: str = None)
Expand source code
def delete_encounter_type(description:str=None):
        deleted = False
        SQL = "DELETE FROM clin.encounter_type WHERE description = %(desc)s"
        args = {'desc': description}
        try:
                gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
                deleted = True
        except gmPG2.dbapi.IntegrityError as e:
                if e.pgcode != gmPG2.PG_error_codes.FOREIGN_KEY_VIOLATION:
                        raise

        return deleted
def get_encounter_type(description: str = None)
Expand source code
def get_encounter_type(description:str=None):
        SQL = 'SELECT * FROM clin.encounter_type WHERE description = %(desc)s'
        args = {'desc': description}
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        return rows
def get_encounter_types()
Expand source code
def get_encounter_types():
        SQL = """
                SELECT
                        _(description) AS l10n_description,
                        description
                FROM
                        clin.encounter_type
                ORDER BY
                        l10n_description
        """
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL}])
        return rows
def get_most_commonly_used_encounter_type()
Expand source code
def get_most_commonly_used_encounter_type():
        SQL = """
                SELECT
                        COUNT(*) AS type_count,
                        fk_type
                FROM clin.encounter
                GROUP BY fk_type
                ORDER BY type_count DESC
                LIMIT 1
        """
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL}])
        if len(rows) == 0:
                return None

        return rows[0]['fk_type']
def lock_encounter(pk_encounter, exclusive=False, link_obj=None)
Expand source code
def lock_encounter(pk_encounter, exclusive=False, link_obj=None):
        """Used to protect against deletion of active encounter FROM another client."""
        return gmPG2.lock_row(link_obj = link_obj, table = 'clin.encounter', pk = pk_encounter, exclusive = exclusive)

Used to protect against deletion of active encounter FROM another client.

def unlock_encounter(pk_encounter, exclusive=False, link_obj=None)
Expand source code
def unlock_encounter(pk_encounter, exclusive=False, link_obj=None):
        unlocked = gmPG2.unlock_row(link_obj = link_obj, table = 'clin.encounter', pk = pk_encounter, exclusive = exclusive)
        if not unlocked:
                _log.warning('cannot unlock encounter [#%s]', pk_encounter)
                call_stack = inspect.stack()
                call_stack.reverse()
                for idx in range(1, len(call_stack)):
                        caller = call_stack[idx]
                        _log.error('%s[%s] @ [%s] in [%s]', ' ' * idx, caller[3], caller[2], caller[1])
                del call_stack
        return unlocked
def update_encounter_type(description=None, l10n_description=None)
Expand source code
def update_encounter_type(description=None, l10n_description=None):
        SQL = "SELECT i18n.upd_tx(%(desc)s, %(l10n_desc)s)"
        args = {'desc': description, 'l10n_desc': l10n_description}
        rows = gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}], return_data = True)
        success = rows[0][0]
        if not success:
                _log.warning('updating encounter type [%s] to [%s] failed', description, l10n_description)
        return {'description': description, 'l10n_description': l10n_description}

Classes

class cEncounter (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
Expand source code
class cEncounter(gmBusinessDBObject.cBusinessDBObject):
        """Represents one encounter."""

        _cmd_fetch_payload = SQL_get_encounters % 'pk_encounter = %s'
        _cmds_store_payload = [
                """UPDATE clin.encounter SET
                                started = %(started)s,
                                last_affirmed = %(last_affirmed)s,
                                fk_location = %(pk_org_unit)s,
                                fk_type = %(pk_type)s,
                                reason_for_encounter = gm.nullify_empty_string(%(reason_for_encounter)s),
                                assessment_of_encounter = gm.nullify_empty_string(%(assessment_of_encounter)s)
                        WHERE
                                pk = %(pk_encounter)s AND
                                xmin = %(xmin_encounter)s
                        """,
                # need to return all fields so we can survive in-place upgrades
                "SELECT * FROM clin.v_pat_encounters WHERE pk_encounter = %(pk_encounter)s"
        ]
        _updatable_fields = [
                'started',
                'last_affirmed',
                'pk_org_unit',
                'pk_type',
                'reason_for_encounter',
                'assessment_of_encounter'
        ]
        #--------------------------------------------------------
        def set_active(self):
                """Set the encounter as the active one.

                "Setting active" means making sure the encounter
                row has the youngest "last_affirmed" timestamp of
                all encounter rows for this patient.
                """
                self['last_affirmed'] = gmDateTime.pydt_now_here()
                self.save()

        #--------------------------------------------------------
        def lock(self, exclusive=False, link_obj=None):
                return lock_encounter(self.pk_obj, exclusive = exclusive, link_obj = link_obj)

        #--------------------------------------------------------
        def unlock(self, exclusive=False, link_obj=None):
                return unlock_encounter(self.pk_obj, exclusive = exclusive, link_obj = link_obj)

        #--------------------------------------------------------
        def transfer_clinical_data(self, source_episode=None, target_episode=None):
                """
                Moves every element currently linked to the current encounter
                and the source_episode onto target_episode.

                @param source_episode The episode the elements are currently linked to.
                @type target_episode A cEpisode intance.
                @param target_episode The episode the elements will be relinked to.
                @type target_episode A cEpisode intance.
                """
                if source_episode['pk_episode'] == target_episode['pk_episode']:
                        return True

                SQL = """
                        UPDATE clin.clin_root_item
                        SET fk_episode = %(trg)s
                        WHERE
                                fk_encounter = %(enc)s AND
                                fk_episode = %(src)s
                        """
                args = {
                        'trg': target_episode['pk_episode'],
                        'enc': self.pk_obj,
                        'src': source_episode['pk_episode']
                }
                gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
                self.refetch_payload()
                return True

        #--------------------------------------------------------
        def transfer_all_data_to_another_encounter(self, pk_target_encounter=None):
                if pk_target_encounter == self.pk_obj:
                        return True

                SQL = "SELECT clin.transfer_all_encounter_data(%(src)s, %(trg)s)"
                args = {
                        'src': self.pk_obj,
                        'trg': pk_target_encounter
                }
                gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
                return True

        #--------------------------------------------------------
        def same_payload(self, another_object=None):

                relevant_fields = [
                        'pk_org_unit',
                        'pk_type',
                        'pk_patient',
                        'reason_for_encounter',
                        'assessment_of_encounter'
                ]
                for field in relevant_fields:
                        if self._payload[field] != another_object[field]:
                                _log.debug('mismatch on [%s]: "%s" vs. "%s"', field, self._payload[field], another_object[field])
                                return False

                relevant_fields = [
                        'started',
                        'last_affirmed',
                ]
                for field in relevant_fields:
                        if self._payload[field] is None:
                                if another_object[field] is None:
                                        continue
                                _log.debug('mismatch on [%s]: here="%s", other="%s"', field, self._payload[field], another_object[field])
                                return False

                        if another_object[field] is None:
                                return False

                        # compares at seconds granularity
                        if self._payload[field].strftime('%Y-%m-%d %H:%M:%S') != another_object[field].strftime('%Y-%m-%d %H:%M:%S'):
                                _log.debug('mismatch on [%s]: here="%s", other="%s"', field, self._payload[field], another_object[field])
                                return False

                # compare codes
                # 1) RFE
                if another_object['pk_generic_codes_rfe'] is None:
                        if self._payload['pk_generic_codes_rfe'] is not None:
                                return False
                if another_object['pk_generic_codes_rfe'] is not None:
                        if self._payload['pk_generic_codes_rfe'] is None:
                                return False
                if (
                        (another_object['pk_generic_codes_rfe'] is None)
                                and
                        (self._payload['pk_generic_codes_rfe'] is None)
                ) is False:
                        if set(another_object['pk_generic_codes_rfe']) != set(self._payload['pk_generic_codes_rfe']):
                                return False
                # 2) AOE
                if another_object['pk_generic_codes_aoe'] is None:
                        if self._payload['pk_generic_codes_aoe'] is not None:
                                return False
                if another_object['pk_generic_codes_aoe'] is not None:
                        if self._payload['pk_generic_codes_aoe'] is None:
                                return False
                if (
                        (another_object['pk_generic_codes_aoe'] is None)
                                and
                        (self._payload['pk_generic_codes_aoe'] is None)
                ) is False:
                        if set(another_object['pk_generic_codes_aoe']) != set(self._payload['pk_generic_codes_aoe']):
                                return False

                return True

        #--------------------------------------------------------
        def has_clinical_data(self):
                SQL = """SELECT EXISTS (
                        SELECT 1 FROM clin.v_pat_items WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s
                                UNION ALL
                        SELECT 1 FROM blobs.v_doc_med WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s
                )"""
                args = {
                        'pat': self._payload['pk_patient'],
                        'enc': self.pk_obj
                }
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def has_narrative(self):
                SQL = "SELECT EXISTS (SELECT 1 FROM clin.v_pat_items WHERE pk_patient=%(pat)s and pk_encounter=%(enc)s)"
                args = {
                        'pat': self._payload['pk_patient'],
                        'enc': self.pk_obj
                }
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL,     'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def has_soap_narrative(self, soap_cats=None):
                """soap_cats: <space> = admin category"""

                if soap_cats is None:
                        soap_cats = 'soap '
                else:
                        soap_cats = soap_cats.casefold()

                cats = []
                for cat in soap_cats:
                        if cat in 'soapu':
                                cats.append(cat)
                                continue
                        if cat == ' ':
                                cats.append(None)
                SQL = """SELECT EXISTS (
                        SELECT 1 FROM clin.clin_narrative
                        WHERE
                                fk_encounter = %(enc)s
                                        AND
                                soap_cat = ANY(%(cats)s)
                        LIMIT 1
                )"""
                args = {'enc': self._payload['pk_encounter'], 'cats': cats}
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def has_documents(self):
                SQL = "SELECT EXISTS (SELECT 1 FROM blobs.v_doc_med WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s)"
                args = {
                        'pat': self._payload['pk_patient'],
                        'enc': self.pk_obj
                }
                rows = gmPG2.run_ro_queries (queries = [{'sql': SQL,'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def get_latest_soap(self, soap_cat=None, episode=None):

                if soap_cat is not None:
                        soap_cat = soap_cat.casefold()
                if episode is None:
                        epi_part = 'fk_episode is null'
                else:
                        epi_part = 'fk_episode = %(epi)s'
                SQL = """
                        SELECT narrative
                        FROM clin.clin_narrative
                        where
                                fk_encounter = %%(enc)s
                                        and
                                soap_cat = %%(cat)s
                                        and
                                %s
                        order by clin_when desc
                        limit 1""" % epi_part
                args = {'enc': self.pk_obj, 'cat': soap_cat, 'epi': episode}
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                if len(rows) == 0:
                        return None

                return rows[0][0]
        #--------------------------------------------------------
        def get_episodes(self, exclude=None):
                """exclude: list of episode PKs to exclude"""
                args = {'enc': self.pk_obj}
                extra_where_parts = ''
                if exclude is not None:
                        extra_where_parts = 'AND pk_episode <> ALL(%(excluded_pks)s)'
                        args['excluded_pks'] = exclude
                SQL = """
                        SELECT * FROM clin.v_pat_episodes
                        WHERE pk_episode IN (
                                        SELECT DISTINCT fk_episode
                                        FROM clin.clin_root_item
                                        WHERE fk_encounter = %%(enc)s

                                                UNION

                                        SELECT DISTINCT fk_episode
                                        FROM blobs.doc_med
                                        WHERE fk_encounter = %%(enc)s
                        ) %s""" % extra_where_parts
                rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
                from Gnumed.business.gmEpisode import cEpisode
                return [ cEpisode(row = {'data': r, 'pk_field': 'pk_episode'})  for r in rows ]

        episodes = property(get_episodes)

        #--------------------------------------------------------
        def add_code(self, pk_code=None, field=None):
                """<pk_code> must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
                if field == 'rfe':
                        cmd = "INSERT INTO clin.lnk_code2rfe (fk_item, fk_generic_code) VALUES (%(item)s, %(code)s)"
                elif field == 'aoe':
                        cmd = "INSERT INTO clin.lnk_code2aoe (fk_item, fk_generic_code) VALUES (%(item)s, %(code)s)"
                else:
                        raise ValueError('<field> must be one of "rfe" or "aoe", not "%s"', field)
                args = {
                        'item': self._payload['pk_encounter'],
                        'code': pk_code
                }
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return True

        #--------------------------------------------------------
        def remove_code(self, pk_code=None, field=None):
                """<pk_code> must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
                if field == 'rfe':
                        cmd = "DELETE FROM clin.lnk_code2rfe WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
                elif field == 'aoe':
                        cmd = "DELETE FROM clin.lnk_code2aoe WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
                else:
                        raise ValueError('<field> must be one of "rfe" or "aoe", not "%s"', field)
                args = {
                        'item': self._payload['pk_encounter'],
                        'code': pk_code
                }
                gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
                return True

        #--------------------------------------------------------
        # data formatting
        #--------------------------------------------------------
        def format_soap(self, episodes=None, left_margin=0, soap_cats='soapu', emr=None, issues=None):

                lines = []
                for soap_cat in gmSoapDefs.soap_cats_str2list(soap_cats):
                        soap_cat_narratives = emr.get_clin_narrative (
                                episodes = episodes,
                                issues = issues,
                                encounters = [self._payload['pk_encounter']],
                                soap_cats = [soap_cat]
                        )
                        if soap_cat_narratives is None:
                                continue
                        if len(soap_cat_narratives) == 0:
                                continue

                        lines.append('%s%s %s %s' % (
                                gmTools.u_box_top_left_arc,
                                gmTools.u_box_horiz_single,
                                gmSoapDefs.soap_cat2l10n_str[soap_cat],
                                gmTools.u_box_horiz_single * 5
                        ))
                        for soap_entry in soap_cat_narratives:
                                txt = gmTools.wrap (
                                        text = soap_entry['narrative'],
                                        width = 75,
                                        initial_indent = '',
                                        subsequent_indent = (' ' * left_margin)
                                )
                                lines.append(txt)
                                txt = '%s%s %.8s, %s %s' % (
                                        ' ' * 40,
                                        gmTools.u_box_horiz_light_heavy,
                                        soap_entry['modified_by'],
                                        soap_entry['date'].strftime('%Y-%m-%d %H:%M'),
                                        gmTools.u_box_horiz_heavy_light
                                )
                                lines.append(txt)
                                lines.append('')

                return lines

        #--------------------------------------------------------
        def format_latex(self, date_format=None, soap_cats=None, soap_order=None):

                nothing2format = (
                        (self._payload['reason_for_encounter'] is None)
                                and
                        (self._payload['assessment_of_encounter'] is None)
                                and
                        (self.has_soap_narrative(soap_cats = 'soapu') is False)
                )
                if nothing2format:
                        return ''

                if date_format is None:
                        date_format = '%A, %b %d %Y'

                tex =  '% -------------------------------------------------------------\n'
                tex += '% much recommended: \\usepackage(tabu)\n'
                tex += '% much recommended: \\usepackage(longtable)\n'
                tex += '% best wrapped in: "\\begin{longtabu} to \\textwidth {lX[,L]}"\n'
                tex += '% -------------------------------------------------------------\n'
                tex += '\\hline \n'
                tex += '\\multicolumn{2}{l}{%s: %s ({\\footnotesize %s - %s})} \\tabularnewline \n' % (
                        gmTools.tex_escape_string(self._payload['l10n_type']),
                        gmTools.tex_escape_string(self._payload['started'].strftime(date_format)),
                        gmTools.tex_escape_string(self._payload['started'].strftime('%H:%M')),
                        gmTools.tex_escape_string(self._payload['last_affirmed'].strftime('%H:%M'))
                )
                tex += '\\hline \n'

                if self._payload['reason_for_encounter'] is not None:
                        tex += '%s & %s \\tabularnewline \n' % (
                                gmTools.tex_escape_string(_('RFE')),
                                gmTools.tex_escape_string(self._payload['reason_for_encounter'])
                        )
                if self._payload['assessment_of_encounter'] is not None:
                        tex += '%s & %s \\tabularnewline \n' % (
                                gmTools.tex_escape_string(_('AOE')),
                                gmTools.tex_escape_string(self._payload['assessment_of_encounter'])
                        )

                from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
                for epi in self.get_episodes():
                        soaps = epi.get_narrative(soap_cats = soap_cats, encounters = [self.pk_obj], order_by = soap_order)
                        if len(soaps) == 0:
                                continue
                        tex += '\\multicolumn{2}{l}{\\emph{%s: %s%s}} \\tabularnewline \n' % (
                                gmTools.tex_escape_string(_('Problem')),
                                gmTools.tex_escape_string(epi['description']),
                                gmTools.tex_escape_string (
                                        gmTools.coalesce (
                                                value2test = diagnostic_certainty_classification2str(epi['diagnostic_certainty_classification']),
                                                return_instead = '',
                                                template4value = ' {\\footnotesize [%s]}',
                                                none_equivalents = [None, '']
                                        )
                                )
                        )
                        if epi['pk_health_issue'] is not None:
                                tex += '\\multicolumn{2}{l}{\\emph{%s: %s%s}} \\tabularnewline \n' % (
                                        gmTools.tex_escape_string(_('Health issue')),
                                        gmTools.tex_escape_string(epi['health_issue']),
                                        gmTools.tex_escape_string (
                                                gmTools.coalesce (
                                                        value2test = diagnostic_certainty_classification2str(epi['diagnostic_certainty_classification_issue']),
                                                        return_instead = '',
                                                        template4value = ' {\\footnotesize [%s]}',
                                                        none_equivalents = [None, '']
                                                )
                                        )
                                )
                        for soap in soaps:
                                tex += '{\\small %s} & {\\small %s} \\tabularnewline \n' % (
                                        gmTools.tex_escape_string(gmSoapDefs.soap_cat2l10n[soap['soap_cat']]),
                                        gmTools.tex_escape_string(soap['narrative'], replace_eol = True)
                                )
                        tex += ' & \\tabularnewline \n'

                return tex

        #--------------------------------------------------------
        def __format_header_fancy(self, left_margin=0):
                lines = []

                lines.append('%s%s: %s - %s (@%s)%s [#%s]' % (
                        ' ' * left_margin,
                        self._payload['l10n_type'],
                        self._payload['started_original_tz'].strftime('%Y-%m-%d %H:%M'),
                        self._payload['last_affirmed_original_tz'].strftime('%H:%M'),
                        self._payload['source_time_zone'],
                        gmTools.coalesce (
                                self._payload['assessment_of_encounter'],
                                '',
                                ' %s%%s%s' % (gmTools.u_left_double_angle_quote, gmTools.u_right_double_angle_quote)
                        ),
                        self._payload['pk_encounter']
                ))

                lines.append(_('  your time: %s - %s  (@%s = %s%s)\n') % (
                        self._payload['started'].strftime('%Y-%m-%d %H:%M'),
                        self._payload['last_affirmed'].strftime('%H:%M'),
                        gmDateTime.current_local_timezone_name,
                        gmTools.bool2subst (
                                gmDateTime.dst_currently_in_effect,
                                gmDateTime.py_dst_timezone_name,
                                gmDateTime.py_timezone_name
                        ),
                        gmTools.bool2subst(gmDateTime.dst_currently_in_effect, ' - ' + _('daylight savings time in effect'), '')
                ))

                if self._payload['praxis_branch'] is not None:
                        lines.append(_('Location: %s (%s)') % (self._payload['praxis_branch'], self._payload['praxis']))

                if self._payload['reason_for_encounter'] is not None:
                        lines.append('%s: %s' % (_('RFE'), self._payload['reason_for_encounter']))
                        codes = self.generic_codes_rfe
                        for c in codes:
                                lines.append(' %s: %s (%s - %s)' % (
                                        c['code'],
                                        c['term'],
                                        c['name_short'],
                                        c['version']
                                ))
                        if len(codes) > 0:
                                lines.append('')

                if self._payload['assessment_of_encounter'] is not None:
                        lines.append('%s: %s' % (_('AOE'), self._payload['assessment_of_encounter']))
                        codes = self.generic_codes_aoe
                        for c in codes:
                                lines.append(' %s: %s (%s - %s)' % (
                                        c['code'],
                                        c['term'],
                                        c['name_short'],
                                        c['version']
                                ))
                        if len(codes) > 0:
                                lines.append('')
                        del codes
                return lines

        #--------------------------------------------------------
        def format_header(self, fancy_header=True, left_margin=0, with_rfe_aoe=False):
                lines = []

                if fancy_header:
                        return self.__format_header_fancy(left_margin = left_margin)

                now = gmDateTime.pydt_now_here()
                if now.strftime('%Y-%m-%d') == self._payload['started_original_tz'].strftime('%Y-%m-%d'):
                        start = '%s %s' % (
                                _('today'),
                                self._payload['started_original_tz'].strftime('%H:%M')
                        )
                else:
                        start = self._payload['started_original_tz'].strftime('%Y-%m-%d %H:%M')
                lines.append('%s%s: %s - %s%s%s' % (
                        ' ' * left_margin,
                        self._payload['l10n_type'],
                        start,
                        self._payload['last_affirmed_original_tz'].strftime('%H:%M'),
                        gmTools.coalesce(self._payload['assessment_of_encounter'], '', ' \u00BB%s\u00AB'),
                        gmTools.coalesce(self._payload['praxis_branch'], '', ' @%s')
                ))
                if with_rfe_aoe:
                        if self._payload['reason_for_encounter'] is not None:
                                lines.append('%s: %s' % (_('RFE'), self._payload['reason_for_encounter']))
                        codes = self.generic_codes_rfe
                        for c in codes:
                                lines.append(' %s: %s (%s - %s)' % (
                                        c['code'],
                                        c['term'],
                                        c['name_short'],
                                        c['version']
                                ))
                        if len(codes) > 0:
                                lines.append('')
                        if self._payload['assessment_of_encounter'] is not None:
                                lines.append('%s: %s' % (_('AOE'), self._payload['assessment_of_encounter']))
                        codes = self.generic_codes_aoe
                        if len(codes) > 0:
                                lines.append('')
                        for c in codes:
                                lines.append(' %s: %s (%s - %s)' % (
                                        c['code'],
                                        c['term'],
                                        c['name_short'],
                                        c['version']
                                ))
                        if len(codes) > 0:
                                lines.append('')
                        del codes

                return lines

        #--------------------------------------------------------
        def format_by_episode(self, episodes=None, issues=None, left_margin=0, patient=None, with_soap=False, with_tests=True, with_docs=True, with_vaccinations=True, with_family_history=True):

                if patient is not None:
                        emr = patient.emr

                lines = []
                if episodes is None:
                        episodes = [ e['pk_episode'] for e in self.episodes ]

                from Gnumed.business.gmEpisode import cEpisode
                for pk in episodes:
                        epi = cEpisode(aPK_obj = pk)
                        lines.append(_('\nEpisode %s%s%s%s:') % (
                                gmTools.u_left_double_angle_quote,
                                epi['description'],
                                gmTools.u_right_double_angle_quote,
                                gmTools.coalesce(epi['health_issue'], '', ' (%s)')
                        ))

                        # soap
                        if with_soap:
                                if patient.ID != self._payload['pk_patient']:
                                        msg = '<patient>.ID = %s but encounter %s belongs to patient %s' % (
                                                patient.ID,
                                                self._payload['pk_encounter'],
                                                self._payload['pk_patient']
                                        )
                                        raise ValueError(msg)
                                lines.extend(self.format_soap (
                                        episodes = [pk],
                                        left_margin = left_margin,
                                        soap_cats = None,               # meaning: all
                                        emr = emr,
                                        issues = issues
                                ))

                        # test results
                        if with_tests:
                                tests = emr.get_test_results_by_date (
                                        episodes = [pk],
                                        encounter = self._payload['pk_encounter']
                                )
                                if len(tests) > 0:
                                        lines.append('')
                                        lines.append(_('Measurements and Results:'))

                                for t in tests:
                                        lines.append(t.format())

                                del tests

                        # vaccinations
                        if with_vaccinations:
                                vaccs = emr.get_vaccinations (
                                        episodes = [pk],
                                        encounters = [ self._payload['pk_encounter'] ],
                                        order_by = 'date_given DESC, vaccine'
                                )
                                if len(vaccs) > 0:
                                        lines.append('')
                                        lines.append(_('Vaccinations:'))
                                for vacc in vaccs:
                                        lines.extend(vacc.format (
                                                with_indications = True,
                                                with_comment = True,
                                                with_reaction = True,
                                                date_format = '%Y-%m-%d'
                                        ))
                                del vaccs

                        # family history
                        if with_family_history:
                                fhx = emr.get_family_history(episodes = [pk])
                                if len(fhx) > 0:
                                        lines.append('')
                                        lines.append(_('Family History: %s') % len(fhx))
                                for f in fhx:
                                        lines.append(f.format (
                                                left_margin = (left_margin + 1),
                                                include_episode = False,
                                                include_comment = True
                                        ))
                                del fhx

                        # documents
                        if with_docs:
                                doc_folder = patient.get_document_folder()
                                docs = doc_folder.get_documents (
                                        pk_episodes = [pk],
                                        encounter = self._payload['pk_encounter']
                                )
                                if len(docs) > 0:
                                        lines.append('')
                                        lines.append(_('Documents:'))
                                for d in docs:
                                        lines.append(' ' + d.format(single_line = True))
                                del docs

                return lines

        #--------------------------------------------------------
        def format_maximum_information(self, patient=None):
                if patient is None:
                        from Gnumed.business.gmPerson import gmCurrentPatient, cPerson
                        if self._payload['pk_patient'] == gmCurrentPatient().ID:
                                patient = gmCurrentPatient()
                        else:
                                patient = cPerson(self._payload['pk_patient'])

                return self.format (
                        patient = patient,
                        fancy_header = True,
                        with_rfe_aoe = True,
                        with_soap = True,
                        with_docs = True,
                        with_tests = False,
                        with_vaccinations = True,
                        with_co_encountlet_hints = True,
                        with_family_history = True,
                        by_episode = False,
                        return_list = True
                )

        #--------------------------------------------------------
        def format(self, episodes=None, with_soap=False, left_margin=0, patient=None, issues=None, with_docs=True, with_tests=True, fancy_header=True, with_vaccinations=True, with_co_encountlet_hints=False, with_rfe_aoe=False, with_family_history=True, by_episode=False, return_list=False):
                """Format an encounter.

                Args:
                        episode: which episodes, touched upon by this encounter, to include information for
                        with_co_encountlet_hints:
                                - whether to include which *other* episodes were discussed during this encounter
                                - (only makes sense if episodes is not None, since that would preclude information)
                """
                lines = self.format_header (
                        fancy_header = fancy_header,
                        left_margin = left_margin,
                        with_rfe_aoe = with_rfe_aoe
                )

                if patient is None:
                        _log.debug('no patient, cannot load patient related data')
                        with_soap = False
                        with_tests = False
                        with_vaccinations = False
                        with_docs = False

                if by_episode:
                        lines.extend(self.format_by_episode (
                                episodes = episodes,
                                issues = issues,
                                left_margin = left_margin,
                                patient = patient,
                                with_soap = with_soap,
                                with_tests = with_tests,
                                with_docs = with_docs,
                                with_vaccinations = with_vaccinations,
                                with_family_history = with_family_history
                        ))
                else:
                        if with_soap:
                                lines.append('')
                                if patient.ID != self._payload['pk_patient']:
                                        msg = '<patient>.ID = %s but encounter %s belongs to patient %s' % (
                                                patient.ID,
                                                self._payload['pk_encounter'],
                                                self._payload['pk_patient']
                                        )
                                        raise ValueError(msg)
                                emr = patient.emr
                                lines.extend(self.format_soap (
                                        episodes = episodes,
                                        left_margin = left_margin,
                                        soap_cats = None,               # meaning: all
                                        emr = emr,
                                        issues = issues
                                ))

        #               # family history
        #               if with_family_history:
        #                       if episodes is not None:
        #                               fhx = emr.get_family_history(episodes = episodes)
        #                               if len(fhx) > 0:
        #                                       lines.append(u'')
        #                                       lines.append(_('Family History: %s') % len(fhx))
        #                               for f in fhx:
        #                                       lines.append(f.format (
        #                                               left_margin = (left_margin + 1),
        #                                               include_episode = False,
        #                                               include_comment = True
        #                                       ))
        #                               del fhx

                        # test results
                        if with_tests:
                                emr = patient.emr
                                tests = emr.get_test_results_by_date (
                                        episodes = episodes,
                                        encounter = self._payload['pk_encounter']
                                )
                                if len(tests) > 0:
                                        lines.append('')
                                        lines.append(_('Measurements and Results:'))
                                for t in tests:
                                        lines.append(t.format())
                                del tests

                        # vaccinations
                        if with_vaccinations:
                                emr = patient.emr
                                vaccs = emr.get_vaccinations (
                                        episodes = episodes,
                                        encounters = [ self._payload['pk_encounter'] ],
                                        order_by = 'date_given DESC, vaccine'
                                )
                                if len(vaccs) > 0:
                                        lines.append('')
                                        lines.append(_('Vaccinations:'))
                                for vacc in vaccs:
                                        lines.extend(vacc.format (
                                                with_indications = True,
                                                with_comment = True,
                                                with_reaction = True,
                                                date_format = '%Y-%m-%d'
                                        ))
                                del vaccs

                        # documents
                        if with_docs:
                                doc_folder = patient.get_document_folder()
                                docs = doc_folder.get_documents (
                                        pk_episodes = episodes,
                                        encounter = self._payload['pk_encounter']
                                )
                                if len(docs) > 0:
                                        lines.append('')
                                        lines.append(_('Documents:'))
                                for d in docs:
                                        lines.append(' ' + d.format(single_line = True))
                                del docs

                        # co-encountlets
                        if with_co_encountlet_hints:
                                if episodes is not None:
                                        other_epis = self.get_episodes(exclude = episodes)
                                        if len(other_epis) > 0:
                                                lines.append('')
                                                lines.append(_('%s other episodes touched upon during this encounter:') % len(other_epis))
                                        for epi in other_epis:
                                                lines.append(' %s%s%s%s' % (
                                                        gmTools.u_left_double_angle_quote,
                                                        epi['description'],
                                                        gmTools.u_right_double_angle_quote,
                                                        gmTools.coalesce(epi['health_issue'], '', ' (%s)')
                                                ))

                if return_list:
                        return lines

                eol_w_margin = '\n%s' % (' ' * left_margin)
                return '%s\n' % eol_w_margin.join(lines)

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def _get_generic_codes_rfe(self):
                if len(self._payload['pk_generic_codes_rfe']) == 0:
                        return []

                cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
                args = {'pks': self._payload['pk_generic_codes_rfe']}
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]

        def _set_generic_codes_rfe(self, pk_codes):
                queries = []
                # remove all codes
                if len(self._payload['pk_generic_codes_rfe']) > 0:
                        queries.append ({
                                'sql': 'DELETE FROM clin.lnk_code2rfe WHERE fk_item = %(enc)s AND fk_generic_code = ANY(%(codes)s)',
                                'args': {
                                        'enc': self._payload['pk_encounter'],
                                        'codes': self._payload['pk_generic_codes_rfe']
                                }
                        })
                # add new codes
                for pk_code in pk_codes:
                        queries.append ({
                                'sql': 'INSERT INTO clin.lnk_code2rfe (fk_item, fk_generic_code) VALUES (%(enc)s, %(pk_code)s)',
                                'args': {
                                        'enc': self._payload['pk_encounter'],
                                        'pk_code': pk_code
                                }
                        })
                if len(queries) == 0:
                        return
                # run it all in one transaction
                gmPG2.run_rw_queries(queries = queries)
                self.refetch_payload()
                return

        generic_codes_rfe = property(_get_generic_codes_rfe, _set_generic_codes_rfe)
        #--------------------------------------------------------
        def _get_generic_codes_aoe(self):
                if len(self._payload['pk_generic_codes_aoe']) == 0:
                        return []

                cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
                args = {'pks': self._payload['pk_generic_codes_aoe']}
                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]

        def _set_generic_codes_aoe(self, pk_codes):
                queries = []
                # remove all codes
                if len(self._payload['pk_generic_codes_aoe']) > 0:
                        queries.append ({
                                'sql': 'DELETE FROM clin.lnk_code2aoe WHERE fk_item = %(enc)s AND fk_generic_code = ANY(%(codes)s)',
                                'args': {
                                        'enc': self._payload['pk_encounter'],
                                        'codes': self._payload['pk_generic_codes_aoe']
                                }
                        })
                # add new codes
                for pk_code in pk_codes:
                        queries.append ({
                                'sql': 'INSERT INTO clin.lnk_code2aoe (fk_item, fk_generic_code) VALUES (%(enc)s, %(pk_code)s)',
                                'args': {
                                        'enc': self._payload['pk_encounter'],
                                        'pk_code': pk_code
                                }
                        })
                if len(queries) == 0:
                        return
                # run it all in one transaction
                gmPG2.run_rw_queries(queries = queries)
                self.refetch_payload()
                return

        generic_codes_aoe = property(_get_generic_codes_aoe, _set_generic_codes_aoe)
        #--------------------------------------------------------
        def _get_praxis_branch(self):
                if self._payload['pk_org_unit'] is None:
                        return None
                return gmPraxis.get_praxis_branch_by_org_unit(pk_org_unit = self._payload['pk_org_unit'])

        praxis_branch = property(_get_praxis_branch)
        #--------------------------------------------------------
        def _get_org_unit(self):
                if self._payload['pk_org_unit'] is None:
                        return None
                return gmOrganization.cOrgUnit(aPK_obj = self._payload['pk_org_unit'])

        org_unit = property(_get_org_unit)
        #--------------------------------------------------------
        def _get_formatted_revision_history(self):
                cmd = """SELECT
                                '<N/A>'::TEXT as audit__action_applied,
                                NULL AS audit__action_when,
                                '<N/A>'::TEXT AS audit__action_by,
                                pk_audit,
                                row_version,
                                modified_when,
                                modified_by,
                                pk, fk_patient, fk_type, fk_location, source_time_zone, reason_for_encounter, assessment_of_encounter, started, last_affirmed
                        FROM clin.encounter
                        WHERE pk = %(pk_encounter)s
                UNION ALL (
                        SELECT
                                audit_action as audit__action_applied,
                                audit_when as audit__action_when,
                                audit_by as audit__action_by,
                                pk_audit,
                                orig_version as row_version,
                                orig_when as modified_when,
                                orig_by as modified_by,
                                pk, fk_patient, fk_type, fk_location, source_time_zone, reason_for_encounter, assessment_of_encounter, started, last_affirmed
                        FROM audit.log_encounter
                        WHERE pk = %(pk_encounter)s
                )
                ORDER BY row_version DESC
                """
                args = {'pk_encounter': self._payload['pk_encounter']}
                title = _('Encounter: %s%s%s') % (
                        gmTools.u_left_double_angle_quote,
                        self._payload['l10n_type'],
                        gmTools.u_right_double_angle_quote
                )
                return '\n'.join(self._get_revision_history(cmd, args, title))

        formatted_revision_history = property(_get_formatted_revision_history)

Represents one encounter.

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)

Args

aPK_obj
retrieve data from backend
  • an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
  • a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
must hold the fields
  • data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key column OR
  • pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    }
    
    rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
    objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
    

Ancestors

Instance variables

prop episodes
Expand source code
def get_episodes(self, exclude=None):
        """exclude: list of episode PKs to exclude"""
        args = {'enc': self.pk_obj}
        extra_where_parts = ''
        if exclude is not None:
                extra_where_parts = 'AND pk_episode <> ALL(%(excluded_pks)s)'
                args['excluded_pks'] = exclude
        SQL = """
                SELECT * FROM clin.v_pat_episodes
                WHERE pk_episode IN (
                                SELECT DISTINCT fk_episode
                                FROM clin.clin_root_item
                                WHERE fk_encounter = %%(enc)s

                                        UNION

                                SELECT DISTINCT fk_episode
                                FROM blobs.doc_med
                                WHERE fk_encounter = %%(enc)s
                ) %s""" % extra_where_parts
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        from Gnumed.business.gmEpisode import cEpisode
        return [ cEpisode(row = {'data': r, 'pk_field': 'pk_episode'})  for r in rows ]

exclude: list of episode PKs to exclude

prop formatted_revision_history
Expand source code
def _get_formatted_revision_history(self):
        cmd = """SELECT
                        '<N/A>'::TEXT as audit__action_applied,
                        NULL AS audit__action_when,
                        '<N/A>'::TEXT AS audit__action_by,
                        pk_audit,
                        row_version,
                        modified_when,
                        modified_by,
                        pk, fk_patient, fk_type, fk_location, source_time_zone, reason_for_encounter, assessment_of_encounter, started, last_affirmed
                FROM clin.encounter
                WHERE pk = %(pk_encounter)s
        UNION ALL (
                SELECT
                        audit_action as audit__action_applied,
                        audit_when as audit__action_when,
                        audit_by as audit__action_by,
                        pk_audit,
                        orig_version as row_version,
                        orig_when as modified_when,
                        orig_by as modified_by,
                        pk, fk_patient, fk_type, fk_location, source_time_zone, reason_for_encounter, assessment_of_encounter, started, last_affirmed
                FROM audit.log_encounter
                WHERE pk = %(pk_encounter)s
        )
        ORDER BY row_version DESC
        """
        args = {'pk_encounter': self._payload['pk_encounter']}
        title = _('Encounter: %s%s%s') % (
                gmTools.u_left_double_angle_quote,
                self._payload['l10n_type'],
                gmTools.u_right_double_angle_quote
        )
        return '\n'.join(self._get_revision_history(cmd, args, title))
prop generic_codes_aoe
Expand source code
def _get_generic_codes_aoe(self):
        if len(self._payload['pk_generic_codes_aoe']) == 0:
                return []

        cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
        args = {'pks': self._payload['pk_generic_codes_aoe']}
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]
prop generic_codes_rfe
Expand source code
def _get_generic_codes_rfe(self):
        if len(self._payload['pk_generic_codes_rfe']) == 0:
                return []

        cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
        args = {'pks': self._payload['pk_generic_codes_rfe']}
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]
prop org_unit
Expand source code
def _get_org_unit(self):
        if self._payload['pk_org_unit'] is None:
                return None
        return gmOrganization.cOrgUnit(aPK_obj = self._payload['pk_org_unit'])
prop praxis_branch
Expand source code
def _get_praxis_branch(self):
        if self._payload['pk_org_unit'] is None:
                return None
        return gmPraxis.get_praxis_branch_by_org_unit(pk_org_unit = self._payload['pk_org_unit'])

Methods

def add_code(self, pk_code=None, field=None)
Expand source code
def add_code(self, pk_code=None, field=None):
        """<pk_code> must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
        if field == 'rfe':
                cmd = "INSERT INTO clin.lnk_code2rfe (fk_item, fk_generic_code) VALUES (%(item)s, %(code)s)"
        elif field == 'aoe':
                cmd = "INSERT INTO clin.lnk_code2aoe (fk_item, fk_generic_code) VALUES (%(item)s, %(code)s)"
        else:
                raise ValueError('<field> must be one of "rfe" or "aoe", not "%s"', field)
        args = {
                'item': self._payload['pk_encounter'],
                'code': pk_code
        }
        gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
        return True

must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)

def format(self,
episodes=None,
with_soap=False,
left_margin=0,
patient=None,
issues=None,
with_docs=True,
with_tests=True,
fancy_header=True,
with_vaccinations=True,
with_co_encountlet_hints=False,
with_rfe_aoe=False,
with_family_history=True,
by_episode=False,
return_list=False)
Expand source code
def format(self, episodes=None, with_soap=False, left_margin=0, patient=None, issues=None, with_docs=True, with_tests=True, fancy_header=True, with_vaccinations=True, with_co_encountlet_hints=False, with_rfe_aoe=False, with_family_history=True, by_episode=False, return_list=False):
        """Format an encounter.

        Args:
                episode: which episodes, touched upon by this encounter, to include information for
                with_co_encountlet_hints:
                        - whether to include which *other* episodes were discussed during this encounter
                        - (only makes sense if episodes is not None, since that would preclude information)
        """
        lines = self.format_header (
                fancy_header = fancy_header,
                left_margin = left_margin,
                with_rfe_aoe = with_rfe_aoe
        )

        if patient is None:
                _log.debug('no patient, cannot load patient related data')
                with_soap = False
                with_tests = False
                with_vaccinations = False
                with_docs = False

        if by_episode:
                lines.extend(self.format_by_episode (
                        episodes = episodes,
                        issues = issues,
                        left_margin = left_margin,
                        patient = patient,
                        with_soap = with_soap,
                        with_tests = with_tests,
                        with_docs = with_docs,
                        with_vaccinations = with_vaccinations,
                        with_family_history = with_family_history
                ))
        else:
                if with_soap:
                        lines.append('')
                        if patient.ID != self._payload['pk_patient']:
                                msg = '<patient>.ID = %s but encounter %s belongs to patient %s' % (
                                        patient.ID,
                                        self._payload['pk_encounter'],
                                        self._payload['pk_patient']
                                )
                                raise ValueError(msg)
                        emr = patient.emr
                        lines.extend(self.format_soap (
                                episodes = episodes,
                                left_margin = left_margin,
                                soap_cats = None,               # meaning: all
                                emr = emr,
                                issues = issues
                        ))

#               # family history
#               if with_family_history:
#                       if episodes is not None:
#                               fhx = emr.get_family_history(episodes = episodes)
#                               if len(fhx) > 0:
#                                       lines.append(u'')
#                                       lines.append(_('Family History: %s') % len(fhx))
#                               for f in fhx:
#                                       lines.append(f.format (
#                                               left_margin = (left_margin + 1),
#                                               include_episode = False,
#                                               include_comment = True
#                                       ))
#                               del fhx

                # test results
                if with_tests:
                        emr = patient.emr
                        tests = emr.get_test_results_by_date (
                                episodes = episodes,
                                encounter = self._payload['pk_encounter']
                        )
                        if len(tests) > 0:
                                lines.append('')
                                lines.append(_('Measurements and Results:'))
                        for t in tests:
                                lines.append(t.format())
                        del tests

                # vaccinations
                if with_vaccinations:
                        emr = patient.emr
                        vaccs = emr.get_vaccinations (
                                episodes = episodes,
                                encounters = [ self._payload['pk_encounter'] ],
                                order_by = 'date_given DESC, vaccine'
                        )
                        if len(vaccs) > 0:
                                lines.append('')
                                lines.append(_('Vaccinations:'))
                        for vacc in vaccs:
                                lines.extend(vacc.format (
                                        with_indications = True,
                                        with_comment = True,
                                        with_reaction = True,
                                        date_format = '%Y-%m-%d'
                                ))
                        del vaccs

                # documents
                if with_docs:
                        doc_folder = patient.get_document_folder()
                        docs = doc_folder.get_documents (
                                pk_episodes = episodes,
                                encounter = self._payload['pk_encounter']
                        )
                        if len(docs) > 0:
                                lines.append('')
                                lines.append(_('Documents:'))
                        for d in docs:
                                lines.append(' ' + d.format(single_line = True))
                        del docs

                # co-encountlets
                if with_co_encountlet_hints:
                        if episodes is not None:
                                other_epis = self.get_episodes(exclude = episodes)
                                if len(other_epis) > 0:
                                        lines.append('')
                                        lines.append(_('%s other episodes touched upon during this encounter:') % len(other_epis))
                                for epi in other_epis:
                                        lines.append(' %s%s%s%s' % (
                                                gmTools.u_left_double_angle_quote,
                                                epi['description'],
                                                gmTools.u_right_double_angle_quote,
                                                gmTools.coalesce(epi['health_issue'], '', ' (%s)')
                                        ))

        if return_list:
                return lines

        eol_w_margin = '\n%s' % (' ' * left_margin)
        return '%s\n' % eol_w_margin.join(lines)

Format an encounter.

Args

episode
which episodes, touched upon by this encounter, to include information for

with_co_encountlet_hints: - whether to include which other episodes were discussed during this encounter - (only makes sense if episodes is not None, since that would preclude information)

def format_by_episode(self,
episodes=None,
issues=None,
left_margin=0,
patient=None,
with_soap=False,
with_tests=True,
with_docs=True,
with_vaccinations=True,
with_family_history=True)
Expand source code
def format_by_episode(self, episodes=None, issues=None, left_margin=0, patient=None, with_soap=False, with_tests=True, with_docs=True, with_vaccinations=True, with_family_history=True):

        if patient is not None:
                emr = patient.emr

        lines = []
        if episodes is None:
                episodes = [ e['pk_episode'] for e in self.episodes ]

        from Gnumed.business.gmEpisode import cEpisode
        for pk in episodes:
                epi = cEpisode(aPK_obj = pk)
                lines.append(_('\nEpisode %s%s%s%s:') % (
                        gmTools.u_left_double_angle_quote,
                        epi['description'],
                        gmTools.u_right_double_angle_quote,
                        gmTools.coalesce(epi['health_issue'], '', ' (%s)')
                ))

                # soap
                if with_soap:
                        if patient.ID != self._payload['pk_patient']:
                                msg = '<patient>.ID = %s but encounter %s belongs to patient %s' % (
                                        patient.ID,
                                        self._payload['pk_encounter'],
                                        self._payload['pk_patient']
                                )
                                raise ValueError(msg)
                        lines.extend(self.format_soap (
                                episodes = [pk],
                                left_margin = left_margin,
                                soap_cats = None,               # meaning: all
                                emr = emr,
                                issues = issues
                        ))

                # test results
                if with_tests:
                        tests = emr.get_test_results_by_date (
                                episodes = [pk],
                                encounter = self._payload['pk_encounter']
                        )
                        if len(tests) > 0:
                                lines.append('')
                                lines.append(_('Measurements and Results:'))

                        for t in tests:
                                lines.append(t.format())

                        del tests

                # vaccinations
                if with_vaccinations:
                        vaccs = emr.get_vaccinations (
                                episodes = [pk],
                                encounters = [ self._payload['pk_encounter'] ],
                                order_by = 'date_given DESC, vaccine'
                        )
                        if len(vaccs) > 0:
                                lines.append('')
                                lines.append(_('Vaccinations:'))
                        for vacc in vaccs:
                                lines.extend(vacc.format (
                                        with_indications = True,
                                        with_comment = True,
                                        with_reaction = True,
                                        date_format = '%Y-%m-%d'
                                ))
                        del vaccs

                # family history
                if with_family_history:
                        fhx = emr.get_family_history(episodes = [pk])
                        if len(fhx) > 0:
                                lines.append('')
                                lines.append(_('Family History: %s') % len(fhx))
                        for f in fhx:
                                lines.append(f.format (
                                        left_margin = (left_margin + 1),
                                        include_episode = False,
                                        include_comment = True
                                ))
                        del fhx

                # documents
                if with_docs:
                        doc_folder = patient.get_document_folder()
                        docs = doc_folder.get_documents (
                                pk_episodes = [pk],
                                encounter = self._payload['pk_encounter']
                        )
                        if len(docs) > 0:
                                lines.append('')
                                lines.append(_('Documents:'))
                        for d in docs:
                                lines.append(' ' + d.format(single_line = True))
                        del docs

        return lines
def format_header(self, fancy_header=True, left_margin=0, with_rfe_aoe=False)
Expand source code
def format_header(self, fancy_header=True, left_margin=0, with_rfe_aoe=False):
        lines = []

        if fancy_header:
                return self.__format_header_fancy(left_margin = left_margin)

        now = gmDateTime.pydt_now_here()
        if now.strftime('%Y-%m-%d') == self._payload['started_original_tz'].strftime('%Y-%m-%d'):
                start = '%s %s' % (
                        _('today'),
                        self._payload['started_original_tz'].strftime('%H:%M')
                )
        else:
                start = self._payload['started_original_tz'].strftime('%Y-%m-%d %H:%M')
        lines.append('%s%s: %s - %s%s%s' % (
                ' ' * left_margin,
                self._payload['l10n_type'],
                start,
                self._payload['last_affirmed_original_tz'].strftime('%H:%M'),
                gmTools.coalesce(self._payload['assessment_of_encounter'], '', ' \u00BB%s\u00AB'),
                gmTools.coalesce(self._payload['praxis_branch'], '', ' @%s')
        ))
        if with_rfe_aoe:
                if self._payload['reason_for_encounter'] is not None:
                        lines.append('%s: %s' % (_('RFE'), self._payload['reason_for_encounter']))
                codes = self.generic_codes_rfe
                for c in codes:
                        lines.append(' %s: %s (%s - %s)' % (
                                c['code'],
                                c['term'],
                                c['name_short'],
                                c['version']
                        ))
                if len(codes) > 0:
                        lines.append('')
                if self._payload['assessment_of_encounter'] is not None:
                        lines.append('%s: %s' % (_('AOE'), self._payload['assessment_of_encounter']))
                codes = self.generic_codes_aoe
                if len(codes) > 0:
                        lines.append('')
                for c in codes:
                        lines.append(' %s: %s (%s - %s)' % (
                                c['code'],
                                c['term'],
                                c['name_short'],
                                c['version']
                        ))
                if len(codes) > 0:
                        lines.append('')
                del codes

        return lines
def format_latex(self, date_format=None, soap_cats=None, soap_order=None)
Expand source code
def format_latex(self, date_format=None, soap_cats=None, soap_order=None):

        nothing2format = (
                (self._payload['reason_for_encounter'] is None)
                        and
                (self._payload['assessment_of_encounter'] is None)
                        and
                (self.has_soap_narrative(soap_cats = 'soapu') is False)
        )
        if nothing2format:
                return ''

        if date_format is None:
                date_format = '%A, %b %d %Y'

        tex =  '% -------------------------------------------------------------\n'
        tex += '% much recommended: \\usepackage(tabu)\n'
        tex += '% much recommended: \\usepackage(longtable)\n'
        tex += '% best wrapped in: "\\begin{longtabu} to \\textwidth {lX[,L]}"\n'
        tex += '% -------------------------------------------------------------\n'
        tex += '\\hline \n'
        tex += '\\multicolumn{2}{l}{%s: %s ({\\footnotesize %s - %s})} \\tabularnewline \n' % (
                gmTools.tex_escape_string(self._payload['l10n_type']),
                gmTools.tex_escape_string(self._payload['started'].strftime(date_format)),
                gmTools.tex_escape_string(self._payload['started'].strftime('%H:%M')),
                gmTools.tex_escape_string(self._payload['last_affirmed'].strftime('%H:%M'))
        )
        tex += '\\hline \n'

        if self._payload['reason_for_encounter'] is not None:
                tex += '%s & %s \\tabularnewline \n' % (
                        gmTools.tex_escape_string(_('RFE')),
                        gmTools.tex_escape_string(self._payload['reason_for_encounter'])
                )
        if self._payload['assessment_of_encounter'] is not None:
                tex += '%s & %s \\tabularnewline \n' % (
                        gmTools.tex_escape_string(_('AOE')),
                        gmTools.tex_escape_string(self._payload['assessment_of_encounter'])
                )

        from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
        for epi in self.get_episodes():
                soaps = epi.get_narrative(soap_cats = soap_cats, encounters = [self.pk_obj], order_by = soap_order)
                if len(soaps) == 0:
                        continue
                tex += '\\multicolumn{2}{l}{\\emph{%s: %s%s}} \\tabularnewline \n' % (
                        gmTools.tex_escape_string(_('Problem')),
                        gmTools.tex_escape_string(epi['description']),
                        gmTools.tex_escape_string (
                                gmTools.coalesce (
                                        value2test = diagnostic_certainty_classification2str(epi['diagnostic_certainty_classification']),
                                        return_instead = '',
                                        template4value = ' {\\footnotesize [%s]}',
                                        none_equivalents = [None, '']
                                )
                        )
                )
                if epi['pk_health_issue'] is not None:
                        tex += '\\multicolumn{2}{l}{\\emph{%s: %s%s}} \\tabularnewline \n' % (
                                gmTools.tex_escape_string(_('Health issue')),
                                gmTools.tex_escape_string(epi['health_issue']),
                                gmTools.tex_escape_string (
                                        gmTools.coalesce (
                                                value2test = diagnostic_certainty_classification2str(epi['diagnostic_certainty_classification_issue']),
                                                return_instead = '',
                                                template4value = ' {\\footnotesize [%s]}',
                                                none_equivalents = [None, '']
                                        )
                                )
                        )
                for soap in soaps:
                        tex += '{\\small %s} & {\\small %s} \\tabularnewline \n' % (
                                gmTools.tex_escape_string(gmSoapDefs.soap_cat2l10n[soap['soap_cat']]),
                                gmTools.tex_escape_string(soap['narrative'], replace_eol = True)
                        )
                tex += ' & \\tabularnewline \n'

        return tex
def format_maximum_information(self, patient=None)
Expand source code
def format_maximum_information(self, patient=None):
        if patient is None:
                from Gnumed.business.gmPerson import gmCurrentPatient, cPerson
                if self._payload['pk_patient'] == gmCurrentPatient().ID:
                        patient = gmCurrentPatient()
                else:
                        patient = cPerson(self._payload['pk_patient'])

        return self.format (
                patient = patient,
                fancy_header = True,
                with_rfe_aoe = True,
                with_soap = True,
                with_docs = True,
                with_tests = False,
                with_vaccinations = True,
                with_co_encountlet_hints = True,
                with_family_history = True,
                by_episode = False,
                return_list = True
        )
def format_soap(self, episodes=None, left_margin=0, soap_cats='soapu', emr=None, issues=None)
Expand source code
def format_soap(self, episodes=None, left_margin=0, soap_cats='soapu', emr=None, issues=None):

        lines = []
        for soap_cat in gmSoapDefs.soap_cats_str2list(soap_cats):
                soap_cat_narratives = emr.get_clin_narrative (
                        episodes = episodes,
                        issues = issues,
                        encounters = [self._payload['pk_encounter']],
                        soap_cats = [soap_cat]
                )
                if soap_cat_narratives is None:
                        continue
                if len(soap_cat_narratives) == 0:
                        continue

                lines.append('%s%s %s %s' % (
                        gmTools.u_box_top_left_arc,
                        gmTools.u_box_horiz_single,
                        gmSoapDefs.soap_cat2l10n_str[soap_cat],
                        gmTools.u_box_horiz_single * 5
                ))
                for soap_entry in soap_cat_narratives:
                        txt = gmTools.wrap (
                                text = soap_entry['narrative'],
                                width = 75,
                                initial_indent = '',
                                subsequent_indent = (' ' * left_margin)
                        )
                        lines.append(txt)
                        txt = '%s%s %.8s, %s %s' % (
                                ' ' * 40,
                                gmTools.u_box_horiz_light_heavy,
                                soap_entry['modified_by'],
                                soap_entry['date'].strftime('%Y-%m-%d %H:%M'),
                                gmTools.u_box_horiz_heavy_light
                        )
                        lines.append(txt)
                        lines.append('')

        return lines
def get_episodes(self, exclude=None)
Expand source code
def get_episodes(self, exclude=None):
        """exclude: list of episode PKs to exclude"""
        args = {'enc': self.pk_obj}
        extra_where_parts = ''
        if exclude is not None:
                extra_where_parts = 'AND pk_episode <> ALL(%(excluded_pks)s)'
                args['excluded_pks'] = exclude
        SQL = """
                SELECT * FROM clin.v_pat_episodes
                WHERE pk_episode IN (
                                SELECT DISTINCT fk_episode
                                FROM clin.clin_root_item
                                WHERE fk_encounter = %%(enc)s

                                        UNION

                                SELECT DISTINCT fk_episode
                                FROM blobs.doc_med
                                WHERE fk_encounter = %%(enc)s
                ) %s""" % extra_where_parts
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        from Gnumed.business.gmEpisode import cEpisode
        return [ cEpisode(row = {'data': r, 'pk_field': 'pk_episode'})  for r in rows ]

exclude: list of episode PKs to exclude

def get_latest_soap(self, soap_cat=None, episode=None)
Expand source code
def get_latest_soap(self, soap_cat=None, episode=None):

        if soap_cat is not None:
                soap_cat = soap_cat.casefold()
        if episode is None:
                epi_part = 'fk_episode is null'
        else:
                epi_part = 'fk_episode = %(epi)s'
        SQL = """
                SELECT narrative
                FROM clin.clin_narrative
                where
                        fk_encounter = %%(enc)s
                                and
                        soap_cat = %%(cat)s
                                and
                        %s
                order by clin_when desc
                limit 1""" % epi_part
        args = {'enc': self.pk_obj, 'cat': soap_cat, 'epi': episode}
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        if len(rows) == 0:
                return None

        return rows[0][0]
def has_clinical_data(self)
Expand source code
def has_clinical_data(self):
        SQL = """SELECT EXISTS (
                SELECT 1 FROM clin.v_pat_items WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s
                        UNION ALL
                SELECT 1 FROM blobs.v_doc_med WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s
        )"""
        args = {
                'pat': self._payload['pk_patient'],
                'enc': self.pk_obj
        }
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        return rows[0][0]
def has_documents(self)
Expand source code
def has_documents(self):
        SQL = "SELECT EXISTS (SELECT 1 FROM blobs.v_doc_med WHERE pk_patient = %(pat)s and pk_encounter = %(enc)s)"
        args = {
                'pat': self._payload['pk_patient'],
                'enc': self.pk_obj
        }
        rows = gmPG2.run_ro_queries (queries = [{'sql': SQL,'args': args}])
        return rows[0][0]
def has_narrative(self)
Expand source code
def has_narrative(self):
        SQL = "SELECT EXISTS (SELECT 1 FROM clin.v_pat_items WHERE pk_patient=%(pat)s and pk_encounter=%(enc)s)"
        args = {
                'pat': self._payload['pk_patient'],
                'enc': self.pk_obj
        }
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL,     'args': args}])
        return rows[0][0]
def has_soap_narrative(self, soap_cats=None)
Expand source code
def has_soap_narrative(self, soap_cats=None):
        """soap_cats: <space> = admin category"""

        if soap_cats is None:
                soap_cats = 'soap '
        else:
                soap_cats = soap_cats.casefold()

        cats = []
        for cat in soap_cats:
                if cat in 'soapu':
                        cats.append(cat)
                        continue
                if cat == ' ':
                        cats.append(None)
        SQL = """SELECT EXISTS (
                SELECT 1 FROM clin.clin_narrative
                WHERE
                        fk_encounter = %(enc)s
                                AND
                        soap_cat = ANY(%(cats)s)
                LIMIT 1
        )"""
        args = {'enc': self._payload['pk_encounter'], 'cats': cats}
        rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
        return rows[0][0]

soap_cats: = admin category

def lock(self, exclusive=False, link_obj=None)
Expand source code
def lock(self, exclusive=False, link_obj=None):
        return lock_encounter(self.pk_obj, exclusive = exclusive, link_obj = link_obj)
def remove_code(self, pk_code=None, field=None)
Expand source code
def remove_code(self, pk_code=None, field=None):
        """<pk_code> must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
        if field == 'rfe':
                cmd = "DELETE FROM clin.lnk_code2rfe WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
        elif field == 'aoe':
                cmd = "DELETE FROM clin.lnk_code2aoe WHERE fk_item = %(item)s AND fk_generic_code = %(code)s"
        else:
                raise ValueError('<field> must be one of "rfe" or "aoe", not "%s"', field)
        args = {
                'item': self._payload['pk_encounter'],
                'code': pk_code
        }
        gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
        return True

must be a value FROM ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)

def set_active(self)
Expand source code
def set_active(self):
        """Set the encounter as the active one.

        "Setting active" means making sure the encounter
        row has the youngest "last_affirmed" timestamp of
        all encounter rows for this patient.
        """
        self['last_affirmed'] = gmDateTime.pydt_now_here()
        self.save()

Set the encounter as the active one.

"Setting active" means making sure the encounter row has the youngest "last_affirmed" timestamp of all encounter rows for this patient.

def transfer_all_data_to_another_encounter(self, pk_target_encounter=None)
Expand source code
def transfer_all_data_to_another_encounter(self, pk_target_encounter=None):
        if pk_target_encounter == self.pk_obj:
                return True

        SQL = "SELECT clin.transfer_all_encounter_data(%(src)s, %(trg)s)"
        args = {
                'src': self.pk_obj,
                'trg': pk_target_encounter
        }
        gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
        return True
def transfer_clinical_data(self, source_episode=None, target_episode=None)
Expand source code
def transfer_clinical_data(self, source_episode=None, target_episode=None):
        """
        Moves every element currently linked to the current encounter
        and the source_episode onto target_episode.

        @param source_episode The episode the elements are currently linked to.
        @type target_episode A cEpisode intance.
        @param target_episode The episode the elements will be relinked to.
        @type target_episode A cEpisode intance.
        """
        if source_episode['pk_episode'] == target_episode['pk_episode']:
                return True

        SQL = """
                UPDATE clin.clin_root_item
                SET fk_episode = %(trg)s
                WHERE
                        fk_encounter = %(enc)s AND
                        fk_episode = %(src)s
                """
        args = {
                'trg': target_episode['pk_episode'],
                'enc': self.pk_obj,
                'src': source_episode['pk_episode']
        }
        gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
        self.refetch_payload()
        return True

Moves every element currently linked to the current encounter and the source_episode onto target_episode.

@param source_episode The episode the elements are currently linked to. @type target_episode A cEpisode intance. @param target_episode The episode the elements will be relinked to. @type target_episode A cEpisode intance.

def unlock(self, exclusive=False, link_obj=None)
Expand source code
def unlock(self, exclusive=False, link_obj=None):
        return unlock_encounter(self.pk_obj, exclusive = exclusive, link_obj = link_obj)

Inherited members