Module Gnumed.business.gmProblem

GNUmed problem related business object.

license: GPL v2 or later

Expand source code
# -*- coding: utf-8 -*-
"""GNUmed problem related business object.

license: GPL v2 or later
"""
#============================================================
__author__ = "<karsten.hilbert@gmx.net>"

import sys
import logging


if __name__ == '__main__':
        sys.path.insert(0, '../../')
        _ = lambda x:x
from Gnumed.pycommon import gmPG2
from Gnumed.pycommon import gmI18N
from Gnumed.pycommon import gmBusinessDBObject
from Gnumed.pycommon import gmExceptions

from Gnumed.business import gmCoding
from Gnumed.business import gmDocuments
from Gnumed.business import gmEpisode
from Gnumed.business import gmHealthIssue

_log = logging.getLogger('gm.emr')

#============================================================
# problem API
#============================================================
class cProblem(gmBusinessDBObject.cBusinessDBObject):
        """Represents one problem.

        problems are the aggregation of
                .clinically_relevant=True issues and
                .is_open=True episodes
        """
        _cmd_fetch_payload = ''                                 # will get programmatically defined in __init__
        _cmds_store_payload:list = ["select 1"]
        _updatable_fields:list = []

        #--------------------------------------------------------
        def __init__(self, aPK_obj=None, try_potential_problems:bool=False):
                """Initialize.

                aPK_obj must contain the keys
                        pk_patient
                        pk_episode
                        pk_health_issue
                """
                if aPK_obj is None:
                        raise gmExceptions.ConstructorError('cannot instatiate cProblem for PK: [%s]' % (aPK_obj))

                # As problems are rows from a view of different emr struct items,
                # the PK can't be a single field and, as some of the values of the
                # composed PK may be None, they must be queried using 'is null',
                # so we must programmatically construct the SQL query
                where_parts = []
                pk = {}
                for col_name in aPK_obj:
                        val = aPK_obj[col_name]
                        if val is None:
                                where_parts.append('%s IS NULL' % col_name)
                        else:
                                where_parts.append('%s = %%(%s)s' % (col_name, col_name))
                                pk[col_name] = val

                # try to instantiate from true problem view
                cProblem._cmd_fetch_payload = """
                        SELECT *, False as is_potential_problem
                        FROM clin.v_problem_list
                        WHERE %s""" % ' AND '.join(where_parts)

                try:
                        gmBusinessDBObject.cBusinessDBObject.__init__(self, aPK_obj = pk)
                        return

                except gmExceptions.ConstructorError:
                        _log.exception('actual problem not found, trying "potential" problems')
                        if try_potential_problems is False:
                                raise

                # try to instantiate from potential-problems view
                cProblem._cmd_fetch_payload = """
                        SELECT *, True as is_potential_problem
                        FROM clin.v_potential_problem_list
                        WHERE %s""" % ' AND '.join(where_parts)
                gmBusinessDBObject.cBusinessDBObject.__init__(self, aPK_obj=pk)

        #--------------------------------------------------------
        @classmethod
        def from_episode(cls, episode:'gmEpisode.cEpisode', allow_closed:bool=False) -> 'cProblem':
                """Initialize problem from episode"""
                return cls (
                        aPK_obj = {
                                'pk_patient': episode['pk_patient'],
                                'pk_health_issue': episode['pk_health_issue'],
                                'pk_episode': episode['pk_episode']
                        },
                        try_potential_problems = allow_closed
                )

        #--------------------------------------------------------
        @classmethod
        def from_health_issue(cls, health_issue, allow_irrelevant:bool=False) -> 'cProblem':
                """Initialize problem from health issue."""
                return cls (
                        aPK_obj = {
                                'pk_patient': health_issue['pk_patient'],
                                'pk_health_issue': health_issue['pk_health_issue'],
                                'pk_episode': None
                        },
                        try_potential_problems = allow_irrelevant
                )

        #--------------------------------------------------------
        @classmethod
        def from_issue_or_episode(cls, issue_or_episode, allow_all:bool=False) -> 'cProblem':
                if isinstance(issue_or_episode, cProblem):
                        return issue_or_episode

                pk_obj = {
                        'pk_patient': issue_or_episode['pk_patient'],
                        'pk_health_issue': issue_or_episode['pk_health_issue']
                }
                try:
                        pk_obj['pk_episode'] = issue_or_episode['pk_episode']
                except KeyError:
                        pk_obj['pk_episode'] = None
                return cls(aPK_obj = pk_obj, try_potential_problems = allow_all)

        #--------------------------------------------------------
        def __get_as_episode(self):
                """
                Retrieve the cEpisode instance equivalent to this problem.
                The problem's type attribute must be 'episode'
                """
                if self._payload['type'] != 'episode':
                        _log.error('cannot convert problem [%s] of type [%s] to episode' % (self._payload['problem'], self._payload['type']))
                        return None

                return gmEpisode.cEpisode(aPK_obj = self._payload['pk_episode'])

        as_episode = property(__get_as_episode)

        #--------------------------------------------------------
        def __get_as_health_issue(self):
                """
                Retrieve the cHealthIssue instance equivalent to this problem.
                The problem's type attribute must be 'issue'
                """
                if self._payload['type'] != 'issue':
                        _log.error('cannot convert problem [%s] of type [%s] to health issue' % (self._payload['problem'], self._payload['type']))
                        return None

                return gmHealthIssue.cHealthIssue(aPK_obj = self._payload['pk_health_issue'])

        as_health_issue = property(__get_as_health_issue)

        #--------------------------------------------------------
        def get_visual_progress_notes(self, encounter_id:int=None):
                if self._payload['type'] == 'issue':
                        latest = gmHealthIssue.cHealthIssue(aPK_obj = self._payload['pk_health_issue']).latest_episode
                        if latest is None:
                                return []

                        pk_episode = latest['pk_episode']
                else:
                        pk_episode = self._payload['pk_episode']
                doc_folder = gmDocuments.cDocumentFolder(aPKey = self._payload['pk_patient'])
                return doc_folder.get_visual_progress_notes(episodes = [pk_episode])

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def get_diagnostic_certainty_description(self):
                from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
                return diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification'])

        diagnostic_certainty_description = property(get_diagnostic_certainty_description)

        #--------------------------------------------------------
        def _get_generic_codes(self):
                if self._payload['type'] == 'issue':
                        cmd = """
                                SELECT * FROM clin.v_linked_codes WHERE
                                        item_table = 'clin.lnk_code2h_issue'::regclass
                                                AND
                                        pk_item = %(item)s
                        """
                        args = {'item': self._payload['pk_health_issue']}
                else:
                        cmd = """
                                SELECT * FROM clin.v_linked_codes WHERE
                                        item_table = 'clin.lnk_code2episode'::regclass
                                                AND
                                        pk_item = %(item)s
                        """
                        args = {'item': self._payload['pk_episode']}

                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]

        generic_codes = property(_get_generic_codes)

#============================================================
# main - unit testing
#------------------------------------------------------------
if __name__ == '__main__':

        if len(sys.argv) < 2:
                sys.exit()

        if sys.argv[1] != 'test':
                sys.exit()

        gmI18N.activate_locale()
        gmI18N.install_domain('gnumed')

        #--------------------------------------------------------
        # define tests
        #--------------------------------------------------------
        def test_problem():
                print("\nProblem test")
                print("------------")
                prob = cProblem(aPK_obj={'pk_patient': 12, 'pk_health_issue': 1, 'pk_episode': None})
                print(prob)
                fields = prob.get_fields()
                for field in fields:
                        print(field, ':', prob[field])
                print('\nupdatable:', prob.get_updatable_fields())
                epi = prob.as_episode
                print('\nas episode:')
                if epi is not None:
                        for field in epi.get_fields():
                                print('   .%s : %s' % (field, epi[field]))

        #--------------------------------------------------------
        gmPG2.request_login_params(setup_pool = True)

        test_problem()

Classes

class cProblem (aPK_obj=None, try_potential_problems: bool = False)

Represents one problem.

problems are the aggregation of .clinically_relevant=True issues and .is_open=True episodes

Initialize.

aPK_obj must contain the keys pk_patient pk_episode pk_health_issue

Expand source code
class cProblem(gmBusinessDBObject.cBusinessDBObject):
        """Represents one problem.

        problems are the aggregation of
                .clinically_relevant=True issues and
                .is_open=True episodes
        """
        _cmd_fetch_payload = ''                                 # will get programmatically defined in __init__
        _cmds_store_payload:list = ["select 1"]
        _updatable_fields:list = []

        #--------------------------------------------------------
        def __init__(self, aPK_obj=None, try_potential_problems:bool=False):
                """Initialize.

                aPK_obj must contain the keys
                        pk_patient
                        pk_episode
                        pk_health_issue
                """
                if aPK_obj is None:
                        raise gmExceptions.ConstructorError('cannot instatiate cProblem for PK: [%s]' % (aPK_obj))

                # As problems are rows from a view of different emr struct items,
                # the PK can't be a single field and, as some of the values of the
                # composed PK may be None, they must be queried using 'is null',
                # so we must programmatically construct the SQL query
                where_parts = []
                pk = {}
                for col_name in aPK_obj:
                        val = aPK_obj[col_name]
                        if val is None:
                                where_parts.append('%s IS NULL' % col_name)
                        else:
                                where_parts.append('%s = %%(%s)s' % (col_name, col_name))
                                pk[col_name] = val

                # try to instantiate from true problem view
                cProblem._cmd_fetch_payload = """
                        SELECT *, False as is_potential_problem
                        FROM clin.v_problem_list
                        WHERE %s""" % ' AND '.join(where_parts)

                try:
                        gmBusinessDBObject.cBusinessDBObject.__init__(self, aPK_obj = pk)
                        return

                except gmExceptions.ConstructorError:
                        _log.exception('actual problem not found, trying "potential" problems')
                        if try_potential_problems is False:
                                raise

                # try to instantiate from potential-problems view
                cProblem._cmd_fetch_payload = """
                        SELECT *, True as is_potential_problem
                        FROM clin.v_potential_problem_list
                        WHERE %s""" % ' AND '.join(where_parts)
                gmBusinessDBObject.cBusinessDBObject.__init__(self, aPK_obj=pk)

        #--------------------------------------------------------
        @classmethod
        def from_episode(cls, episode:'gmEpisode.cEpisode', allow_closed:bool=False) -> 'cProblem':
                """Initialize problem from episode"""
                return cls (
                        aPK_obj = {
                                'pk_patient': episode['pk_patient'],
                                'pk_health_issue': episode['pk_health_issue'],
                                'pk_episode': episode['pk_episode']
                        },
                        try_potential_problems = allow_closed
                )

        #--------------------------------------------------------
        @classmethod
        def from_health_issue(cls, health_issue, allow_irrelevant:bool=False) -> 'cProblem':
                """Initialize problem from health issue."""
                return cls (
                        aPK_obj = {
                                'pk_patient': health_issue['pk_patient'],
                                'pk_health_issue': health_issue['pk_health_issue'],
                                'pk_episode': None
                        },
                        try_potential_problems = allow_irrelevant
                )

        #--------------------------------------------------------
        @classmethod
        def from_issue_or_episode(cls, issue_or_episode, allow_all:bool=False) -> 'cProblem':
                if isinstance(issue_or_episode, cProblem):
                        return issue_or_episode

                pk_obj = {
                        'pk_patient': issue_or_episode['pk_patient'],
                        'pk_health_issue': issue_or_episode['pk_health_issue']
                }
                try:
                        pk_obj['pk_episode'] = issue_or_episode['pk_episode']
                except KeyError:
                        pk_obj['pk_episode'] = None
                return cls(aPK_obj = pk_obj, try_potential_problems = allow_all)

        #--------------------------------------------------------
        def __get_as_episode(self):
                """
                Retrieve the cEpisode instance equivalent to this problem.
                The problem's type attribute must be 'episode'
                """
                if self._payload['type'] != 'episode':
                        _log.error('cannot convert problem [%s] of type [%s] to episode' % (self._payload['problem'], self._payload['type']))
                        return None

                return gmEpisode.cEpisode(aPK_obj = self._payload['pk_episode'])

        as_episode = property(__get_as_episode)

        #--------------------------------------------------------
        def __get_as_health_issue(self):
                """
                Retrieve the cHealthIssue instance equivalent to this problem.
                The problem's type attribute must be 'issue'
                """
                if self._payload['type'] != 'issue':
                        _log.error('cannot convert problem [%s] of type [%s] to health issue' % (self._payload['problem'], self._payload['type']))
                        return None

                return gmHealthIssue.cHealthIssue(aPK_obj = self._payload['pk_health_issue'])

        as_health_issue = property(__get_as_health_issue)

        #--------------------------------------------------------
        def get_visual_progress_notes(self, encounter_id:int=None):
                if self._payload['type'] == 'issue':
                        latest = gmHealthIssue.cHealthIssue(aPK_obj = self._payload['pk_health_issue']).latest_episode
                        if latest is None:
                                return []

                        pk_episode = latest['pk_episode']
                else:
                        pk_episode = self._payload['pk_episode']
                doc_folder = gmDocuments.cDocumentFolder(aPKey = self._payload['pk_patient'])
                return doc_folder.get_visual_progress_notes(episodes = [pk_episode])

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def get_diagnostic_certainty_description(self):
                from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
                return diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification'])

        diagnostic_certainty_description = property(get_diagnostic_certainty_description)

        #--------------------------------------------------------
        def _get_generic_codes(self):
                if self._payload['type'] == 'issue':
                        cmd = """
                                SELECT * FROM clin.v_linked_codes WHERE
                                        item_table = 'clin.lnk_code2h_issue'::regclass
                                                AND
                                        pk_item = %(item)s
                        """
                        args = {'item': self._payload['pk_health_issue']}
                else:
                        cmd = """
                                SELECT * FROM clin.v_linked_codes WHERE
                                        item_table = 'clin.lnk_code2episode'::regclass
                                                AND
                                        pk_item = %(item)s
                        """
                        args = {'item': self._payload['pk_episode']}

                rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
                return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]

        generic_codes = property(_get_generic_codes)

Ancestors

Static methods

def from_episode(episode: gmEpisode.cEpisode, allow_closed: bool = False) ‑> cProblem

Initialize problem from episode

Expand source code
@classmethod
def from_episode(cls, episode:'gmEpisode.cEpisode', allow_closed:bool=False) -> 'cProblem':
        """Initialize problem from episode"""
        return cls (
                aPK_obj = {
                        'pk_patient': episode['pk_patient'],
                        'pk_health_issue': episode['pk_health_issue'],
                        'pk_episode': episode['pk_episode']
                },
                try_potential_problems = allow_closed
        )
def from_health_issue(health_issue, allow_irrelevant: bool = False) ‑> cProblem

Initialize problem from health issue.

Expand source code
@classmethod
def from_health_issue(cls, health_issue, allow_irrelevant:bool=False) -> 'cProblem':
        """Initialize problem from health issue."""
        return cls (
                aPK_obj = {
                        'pk_patient': health_issue['pk_patient'],
                        'pk_health_issue': health_issue['pk_health_issue'],
                        'pk_episode': None
                },
                try_potential_problems = allow_irrelevant
        )
def from_issue_or_episode(issue_or_episode, allow_all: bool = False) ‑> cProblem
Expand source code
@classmethod
def from_issue_or_episode(cls, issue_or_episode, allow_all:bool=False) -> 'cProblem':
        if isinstance(issue_or_episode, cProblem):
                return issue_or_episode

        pk_obj = {
                'pk_patient': issue_or_episode['pk_patient'],
                'pk_health_issue': issue_or_episode['pk_health_issue']
        }
        try:
                pk_obj['pk_episode'] = issue_or_episode['pk_episode']
        except KeyError:
                pk_obj['pk_episode'] = None
        return cls(aPK_obj = pk_obj, try_potential_problems = allow_all)

Instance variables

var as_episode

Retrieve the cEpisode instance equivalent to this problem. The problem's type attribute must be 'episode'

Expand source code
def __get_as_episode(self):
        """
        Retrieve the cEpisode instance equivalent to this problem.
        The problem's type attribute must be 'episode'
        """
        if self._payload['type'] != 'episode':
                _log.error('cannot convert problem [%s] of type [%s] to episode' % (self._payload['problem'], self._payload['type']))
                return None

        return gmEpisode.cEpisode(aPK_obj = self._payload['pk_episode'])
var as_health_issue

Retrieve the cHealthIssue instance equivalent to this problem. The problem's type attribute must be 'issue'

Expand source code
def __get_as_health_issue(self):
        """
        Retrieve the cHealthIssue instance equivalent to this problem.
        The problem's type attribute must be 'issue'
        """
        if self._payload['type'] != 'issue':
                _log.error('cannot convert problem [%s] of type [%s] to health issue' % (self._payload['problem'], self._payload['type']))
                return None

        return gmHealthIssue.cHealthIssue(aPK_obj = self._payload['pk_health_issue'])
var diagnostic_certainty_description
Expand source code
def get_diagnostic_certainty_description(self):
        from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
        return diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification'])
var generic_codes
Expand source code
def _get_generic_codes(self):
        if self._payload['type'] == 'issue':
                cmd = """
                        SELECT * FROM clin.v_linked_codes WHERE
                                item_table = 'clin.lnk_code2h_issue'::regclass
                                        AND
                                pk_item = %(item)s
                """
                args = {'item': self._payload['pk_health_issue']}
        else:
                cmd = """
                        SELECT * FROM clin.v_linked_codes WHERE
                                item_table = 'clin.lnk_code2episode'::regclass
                                        AND
                                pk_item = %(item)s
                """
                args = {'item': self._payload['pk_episode']}

        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]

Methods

def get_diagnostic_certainty_description(self)
Expand source code
def get_diagnostic_certainty_description(self):
        from Gnumed.business.gmHealthIssue import diagnostic_certainty_classification2str
        return diagnostic_certainty_classification2str(self._payload['diagnostic_certainty_classification'])
def get_visual_progress_notes(self, encounter_id: int = None)
Expand source code
def get_visual_progress_notes(self, encounter_id:int=None):
        if self._payload['type'] == 'issue':
                latest = gmHealthIssue.cHealthIssue(aPK_obj = self._payload['pk_health_issue']).latest_episode
                if latest is None:
                        return []

                pk_episode = latest['pk_episode']
        else:
                pk_episode = self._payload['pk_episode']
        doc_folder = gmDocuments.cDocumentFolder(aPKey = self._payload['pk_patient'])
        return doc_folder.get_visual_progress_notes(episodes = [pk_episode])

Inherited members