Module Gnumed.business.gmPerformedProcedure
GNUmed performed-procedure business object.
license: GPL v2 or later
Expand source code
# -*- coding: utf-8 -*-
"""GNUmed performed-procedure business object.
license: GPL v2 or later
"""
#============================================================
__author__ = "Carlos Moro <cfmoro1976@yahoo.es>, <karsten.hilbert@gmx.net>"
import sys
import logging
# setup translation
if __name__ == '__main__':
sys.path.insert(0, '../../')
_ = lambda x:x
else:
try:
_
except NameError:
from Gnumed.pycommon import gmI18N
gmI18N.activate_locale()
gmI18N.install_domain()
from Gnumed.pycommon import gmPG2
from Gnumed.pycommon import gmI18N
from Gnumed.pycommon import gmTools
from Gnumed.pycommon import gmBusinessDBObject
from Gnumed.business import gmHospitalStay
from Gnumed.business import gmCoding
from Gnumed.business import gmOrganization
from Gnumed.business import gmDocuments
_log = logging.getLogger('gm.emr')
#============================================================
_SQL_get_procedures = "select * from clin.v_procedures where %s"
class cPerformedProcedure(gmBusinessDBObject.cBusinessDBObject):
_cmd_fetch_payload = _SQL_get_procedures % "pk_procedure = %s"
_cmds_store_payload = [
"""UPDATE clin.procedure SET
soap_cat = 'p',
clin_when = %(clin_when)s,
clin_end = %(clin_end)s,
is_ongoing = %(is_ongoing)s,
narrative = gm.nullify_empty_string(%(performed_procedure)s),
fk_hospital_stay = %(pk_hospital_stay)s,
fk_org_unit = (CASE
WHEN %(pk_hospital_stay)s IS NULL THEN %(pk_org_unit)s
ELSE NULL
END)::integer,
fk_episode = %(pk_episode)s,
fk_encounter = %(pk_encounter)s,
fk_doc = %(pk_doc)s,
comment = gm.nullify_empty_string(%(comment)s)
WHERE
pk = %(pk_procedure)s AND
xmin = %(xmin_procedure)s
RETURNING xmin as xmin_procedure"""
]
_updatable_fields = [
'clin_when',
'clin_end',
'is_ongoing',
'performed_procedure',
'pk_hospital_stay',
'pk_org_unit',
'pk_episode',
'pk_encounter',
'pk_doc',
'comment'
]
#-------------------------------------------------------
def __setitem__(self, attribute, value):
if (attribute == 'pk_hospital_stay') and (value is not None):
gmBusinessDBObject.cBusinessDBObject.__setitem__(self, 'pk_org_unit', None)
if (attribute == 'pk_org_unit') and (value is not None):
gmBusinessDBObject.cBusinessDBObject.__setitem__(self, 'pk_hospital_stay', None)
gmBusinessDBObject.cBusinessDBObject.__setitem__(self, attribute, value)
#--------------------------------------------------------
def format_maximum_information(self, left_margin=0, patient=None):
return self.format (
left_margin = left_margin,
include_episode = True,
include_codes = True,
include_address = True,
include_comm = True,
include_doc = True
).split('\n')
#-------------------------------------------------------
def format(self, left_margin=0, include_episode=True, include_codes=False, include_address=False, include_comm=False, include_doc=False):
if self._payload['is_ongoing']:
end = _(' (ongoing)')
else:
end = self._payload['clin_end']
if end is None:
end = ''
else:
end = ' - %s' % end.strftime('%Y %b %d')
line = '%s%s%s: %s%s [%s @ %s]' % (
(' ' * left_margin),
self._payload['clin_when'].strftime('%Y %b %d'),
end,
self._payload['performed_procedure'],
gmTools.bool2str(include_episode, ' (%s)' % self._payload['episode'], ''),
self._payload['unit'],
self._payload['organization']
)
line += gmTools.coalesce(self._payload['comment'], '', '\n' + (' ' * left_margin) + _('Comment: ') + '%s')
if include_comm:
for channel in self.org_unit.comm_channels:
line += ('\n%(comm_type)s: %(url)s' % channel)
if include_address:
adr = self.org_unit.address
if adr is not None:
line += '\n'
line += '\n'.join(adr.format(single_line = False, show_type = False))
line += '\n'
if include_doc:
doc = self.doc
if doc is not None:
line += '\n'
line += _('Document') + ': ' + doc.format(single_line = True)
line += '\n'
if include_codes:
codes = self.generic_codes
if len(codes) > 0:
line += '\n'
for c in codes:
line += '%s %s: %s (%s - %s)\n' % (
(' ' * left_margin),
c['code'],
c['term'],
c['name_short'],
c['version']
)
del codes
return line
#--------------------------------------------------------
def add_code(self, pk_code=None):
"""<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
cmd = "INSERT INTO clin.lnk_code2procedure (fk_item, fk_generic_code) values (%(issue)s, %(code)s)"
args = {
'issue': self._payload['pk_procedure'],
'code': pk_code
}
gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
return True
#--------------------------------------------------------
def remove_code(self, pk_code=None):
"""<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
cmd = "DELETE FROM clin.lnk_code2procedure WHERE fk_item = %(issue)s AND fk_generic_code = %(code)s"
args = {
'issue': self._payload['pk_procedure'],
'code': pk_code
}
gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
return True
#--------------------------------------------------------
# properties
#--------------------------------------------------------
def _get_stay(self):
if self._payload['pk_hospital_stay'] is None:
return None
return gmHospitalStay.cHospitalStay(aPK_obj = self._payload['pk_hospital_stay'])
hospital_stay = property(_get_stay)
#--------------------------------------------------------
def _get_org_unit(self):
return gmOrganization.cOrgUnit(self._payload['pk_org_unit'])
org_unit = property(_get_org_unit)
#--------------------------------------------------------
def _get_doc(self):
if self._payload['pk_doc'] is None:
return None
return gmDocuments.cDocument(aPK_obj = self._payload['pk_doc'])
doc = property(_get_doc)
#--------------------------------------------------------
def _get_generic_codes(self):
if len(self._payload['pk_generic_codes']) == 0:
return []
cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)'
args = {'pks': self._payload['pk_generic_codes']}
rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]
def _set_generic_codes(self, pk_codes):
queries = []
# remove all codes
if len(self._payload['pk_generic_codes']) > 0:
queries.append ({
'sql': 'DELETE FROM clin.lnk_code2procedure WHERE fk_item = %(proc)s AND fk_generic_code = ANY(%(codes)s)',
'args': {
'proc': self._payload['pk_procedure'],
'codes': self._payload['pk_generic_codes']
}
})
# add new codes
for pk_code in pk_codes:
queries.append ({
'sql': 'INSERT INTO clin.lnk_code2procedure (fk_item, fk_generic_code) VALUES (%(proc)s, %(pk_code)s)',
'args': {
'proc': self._payload['pk_procedure'],
'pk_code': pk_code
}
})
if len(queries) == 0:
return
# run it all in one transaction
gmPG2.run_rw_queries(queries = queries)
return
generic_codes = property(_get_generic_codes, _set_generic_codes)
#-----------------------------------------------------------
def get_performed_procedures(patient=None, return_pks=False):
queries = [{
'sql': 'SELECT * FROM clin.v_procedures WHERE pk_patient = %(pat)s ORDER BY clin_when',
'args': {'pat': patient}
}]
rows = gmPG2.run_ro_queries(queries = queries)
if return_pks:
return [ r['pk_procedure'] for r in rows ]
return [ cPerformedProcedure(row = {'data': r, 'pk_field': 'pk_procedure'}) for r in rows ]
#-----------------------------------------------------------
def get_procedures4document(pk_document=None, return_pks=False):
args = {'pk_doc': pk_document}
cmd = _SQL_get_procedures % 'pk_doc = %(pk_doc)s'
rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
if return_pks:
return [ r['pk_procedure'] for r in rows ]
return [ cPerformedProcedure(row = {'data': r, 'pk_field': 'pk_procedure'}) for r in rows ]
#-----------------------------------------------------------
def get_latest_performed_procedure(patient=None):
queries = [{
'sql': 'select * FROM clin.v_procedures WHERE pk_patient = %(pat)s ORDER BY clin_when DESC LIMIT 1',
'args': {'pat': patient}
}]
rows = gmPG2.run_ro_queries(queries = queries)
if len(rows) == 0:
return None
return cPerformedProcedure(row = {'data': rows[0], 'pk_field': 'pk_procedure'})
#-----------------------------------------------------------
def create_performed_procedure(encounter=None, episode=None, location=None, hospital_stay=None, procedure=None):
queries = [{
'sql': """
INSERT INTO clin.procedure (
fk_encounter,
fk_episode,
soap_cat,
fk_org_unit,
fk_hospital_stay,
narrative
) VALUES (
%(enc)s,
%(epi)s,
'p',
%(loc)s,
%(stay)s,
gm.nullify_empty_string(%(proc)s)
)
RETURNING pk""",
'args': {'enc': encounter, 'epi': episode, 'loc': location, 'stay': hospital_stay, 'proc': procedure}
}]
rows = gmPG2.run_rw_queries(queries = queries, return_data = True)
return cPerformedProcedure(aPK_obj = rows[0][0])
#-----------------------------------------------------------
def delete_performed_procedure(procedure=None):
cmd = 'delete from clin.procedure where pk = %(pk)s'
args = {'pk': procedure}
gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
return True
#============================================================
# main - unit testing
#------------------------------------------------------------
if __name__ == '__main__':
if len(sys.argv) < 2:
sys.exit()
if sys.argv[1] != 'test':
sys.exit()
del _
from Gnumed.pycommon import gmI18N
gmI18N.activate_locale()
gmI18N.install_domain('gnumed')
gmI18N.activate_locale()
gmI18N.install_domain('gnumed')
#--------------------------------------------------------
# define tests
#--------------------------------------------------------
def test_performed_procedure():
procs = get_performed_procedures(patient = 12)
for proc in procs:
print(proc.format(left_margin = 2))
#--------------------------------------------------------
gmPG2.request_login_params(setup_pool = True)
test_performed_procedure()
Functions
def create_performed_procedure(encounter=None, episode=None, location=None, hospital_stay=None, procedure=None)
-
Expand source code
def create_performed_procedure(encounter=None, episode=None, location=None, hospital_stay=None, procedure=None): queries = [{ 'sql': """ INSERT INTO clin.procedure ( fk_encounter, fk_episode, soap_cat, fk_org_unit, fk_hospital_stay, narrative ) VALUES ( %(enc)s, %(epi)s, 'p', %(loc)s, %(stay)s, gm.nullify_empty_string(%(proc)s) ) RETURNING pk""", 'args': {'enc': encounter, 'epi': episode, 'loc': location, 'stay': hospital_stay, 'proc': procedure} }] rows = gmPG2.run_rw_queries(queries = queries, return_data = True) return cPerformedProcedure(aPK_obj = rows[0][0])
def delete_performed_procedure(procedure=None)
-
Expand source code
def delete_performed_procedure(procedure=None): cmd = 'delete from clin.procedure where pk = %(pk)s' args = {'pk': procedure} gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True
def get_latest_performed_procedure(patient=None)
-
Expand source code
def get_latest_performed_procedure(patient=None): queries = [{ 'sql': 'select * FROM clin.v_procedures WHERE pk_patient = %(pat)s ORDER BY clin_when DESC LIMIT 1', 'args': {'pat': patient} }] rows = gmPG2.run_ro_queries(queries = queries) if len(rows) == 0: return None return cPerformedProcedure(row = {'data': rows[0], 'pk_field': 'pk_procedure'})
def get_performed_procedures(patient=None, return_pks=False)
-
Expand source code
def get_performed_procedures(patient=None, return_pks=False): queries = [{ 'sql': 'SELECT * FROM clin.v_procedures WHERE pk_patient = %(pat)s ORDER BY clin_when', 'args': {'pat': patient} }] rows = gmPG2.run_ro_queries(queries = queries) if return_pks: return [ r['pk_procedure'] for r in rows ] return [ cPerformedProcedure(row = {'data': r, 'pk_field': 'pk_procedure'}) for r in rows ]
def get_procedures4document(pk_document=None, return_pks=False)
-
Expand source code
def get_procedures4document(pk_document=None, return_pks=False): args = {'pk_doc': pk_document} cmd = _SQL_get_procedures % 'pk_doc = %(pk_doc)s' rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if return_pks: return [ r['pk_procedure'] for r in rows ] return [ cPerformedProcedure(row = {'data': r, 'pk_field': 'pk_procedure'}) for r in rows ]
Classes
class cPerformedProcedure (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Represents business objects in the database.
Rules
- instances ARE ASSUMED TO EXIST in the database
- PK construction (aPK_obj): DOES verify its existence on instantiation (fetching data fails)
- Row construction (row): allowed by using a dict of pairs of field name: field value (PERFORMANCE improvement)
- does NOT verify FK target existence
- does NOT create new entries in the database
- does NOT lazy-fetch fields on access
Class scope SQL commands and variables:
_cmd_fetch_payload:
- must return exactly one row
- WHERE clause argument values are expected in self.pk_obj (taken from init(aPK_obj))
- must return xmin of all rows that _cmds_store_payload will be updating, so views must support the xmin columns of their underlying tables
_cmds_store_payload:
- one or multiple "update … set … where xmin_ = … and pk = …" statements which actually update the database from the data in self._payload,
- the last query must refetch at least the XMIN values needed to detect concurrent updates, their field names had better be the same as in _cmd_fetch_payload,
- the last query CAN return other fields which is particularly useful when those other fields are computed in the backend and may thus change upon save but will not have been set by the client code explicitly - this is only really of concern if the saved subclass is to be reused after saving rather than re-instantiated
- when subclasses tend to live a while after save_payload() was called and they support computed fields (say, _(some_column) you need to return all columns (see cEncounter)
_updatable_fields:
- a list of fields available for update via object['field']
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Expand source code
class cPerformedProcedure(gmBusinessDBObject.cBusinessDBObject): _cmd_fetch_payload = _SQL_get_procedures % "pk_procedure = %s" _cmds_store_payload = [ """UPDATE clin.procedure SET soap_cat = 'p', clin_when = %(clin_when)s, clin_end = %(clin_end)s, is_ongoing = %(is_ongoing)s, narrative = gm.nullify_empty_string(%(performed_procedure)s), fk_hospital_stay = %(pk_hospital_stay)s, fk_org_unit = (CASE WHEN %(pk_hospital_stay)s IS NULL THEN %(pk_org_unit)s ELSE NULL END)::integer, fk_episode = %(pk_episode)s, fk_encounter = %(pk_encounter)s, fk_doc = %(pk_doc)s, comment = gm.nullify_empty_string(%(comment)s) WHERE pk = %(pk_procedure)s AND xmin = %(xmin_procedure)s RETURNING xmin as xmin_procedure""" ] _updatable_fields = [ 'clin_when', 'clin_end', 'is_ongoing', 'performed_procedure', 'pk_hospital_stay', 'pk_org_unit', 'pk_episode', 'pk_encounter', 'pk_doc', 'comment' ] #------------------------------------------------------- def __setitem__(self, attribute, value): if (attribute == 'pk_hospital_stay') and (value is not None): gmBusinessDBObject.cBusinessDBObject.__setitem__(self, 'pk_org_unit', None) if (attribute == 'pk_org_unit') and (value is not None): gmBusinessDBObject.cBusinessDBObject.__setitem__(self, 'pk_hospital_stay', None) gmBusinessDBObject.cBusinessDBObject.__setitem__(self, attribute, value) #-------------------------------------------------------- def format_maximum_information(self, left_margin=0, patient=None): return self.format ( left_margin = left_margin, include_episode = True, include_codes = True, include_address = True, include_comm = True, include_doc = True ).split('\n') #------------------------------------------------------- def format(self, left_margin=0, include_episode=True, include_codes=False, include_address=False, include_comm=False, include_doc=False): if self._payload['is_ongoing']: end = _(' (ongoing)') else: end = self._payload['clin_end'] if end is None: end = '' else: end = ' - %s' % end.strftime('%Y %b %d') line = '%s%s%s: %s%s [%s @ %s]' % ( (' ' * left_margin), self._payload['clin_when'].strftime('%Y %b %d'), end, self._payload['performed_procedure'], gmTools.bool2str(include_episode, ' (%s)' % self._payload['episode'], ''), self._payload['unit'], self._payload['organization'] ) line += gmTools.coalesce(self._payload['comment'], '', '\n' + (' ' * left_margin) + _('Comment: ') + '%s') if include_comm: for channel in self.org_unit.comm_channels: line += ('\n%(comm_type)s: %(url)s' % channel) if include_address: adr = self.org_unit.address if adr is not None: line += '\n' line += '\n'.join(adr.format(single_line = False, show_type = False)) line += '\n' if include_doc: doc = self.doc if doc is not None: line += '\n' line += _('Document') + ': ' + doc.format(single_line = True) line += '\n' if include_codes: codes = self.generic_codes if len(codes) > 0: line += '\n' for c in codes: line += '%s %s: %s (%s - %s)\n' % ( (' ' * left_margin), c['code'], c['term'], c['name_short'], c['version'] ) del codes return line #-------------------------------------------------------- def add_code(self, pk_code=None): """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)""" cmd = "INSERT INTO clin.lnk_code2procedure (fk_item, fk_generic_code) values (%(issue)s, %(code)s)" args = { 'issue': self._payload['pk_procedure'], 'code': pk_code } gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True #-------------------------------------------------------- def remove_code(self, pk_code=None): """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)""" cmd = "DELETE FROM clin.lnk_code2procedure WHERE fk_item = %(issue)s AND fk_generic_code = %(code)s" args = { 'issue': self._payload['pk_procedure'], 'code': pk_code } gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True #-------------------------------------------------------- # properties #-------------------------------------------------------- def _get_stay(self): if self._payload['pk_hospital_stay'] is None: return None return gmHospitalStay.cHospitalStay(aPK_obj = self._payload['pk_hospital_stay']) hospital_stay = property(_get_stay) #-------------------------------------------------------- def _get_org_unit(self): return gmOrganization.cOrgUnit(self._payload['pk_org_unit']) org_unit = property(_get_org_unit) #-------------------------------------------------------- def _get_doc(self): if self._payload['pk_doc'] is None: return None return gmDocuments.cDocument(aPK_obj = self._payload['pk_doc']) doc = property(_get_doc) #-------------------------------------------------------- def _get_generic_codes(self): if len(self._payload['pk_generic_codes']) == 0: return [] cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)' args = {'pks': self._payload['pk_generic_codes']} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ] def _set_generic_codes(self, pk_codes): queries = [] # remove all codes if len(self._payload['pk_generic_codes']) > 0: queries.append ({ 'sql': 'DELETE FROM clin.lnk_code2procedure WHERE fk_item = %(proc)s AND fk_generic_code = ANY(%(codes)s)', 'args': { 'proc': self._payload['pk_procedure'], 'codes': self._payload['pk_generic_codes'] } }) # add new codes for pk_code in pk_codes: queries.append ({ 'sql': 'INSERT INTO clin.lnk_code2procedure (fk_item, fk_generic_code) VALUES (%(proc)s, %(pk_code)s)', 'args': { 'proc': self._payload['pk_procedure'], 'pk_code': pk_code } }) if len(queries) == 0: return # run it all in one transaction gmPG2.run_rw_queries(queries = queries) return generic_codes = property(_get_generic_codes, _set_generic_codes)
Ancestors
Instance variables
var doc
-
Expand source code
def _get_doc(self): if self._payload['pk_doc'] is None: return None return gmDocuments.cDocument(aPK_obj = self._payload['pk_doc'])
var generic_codes
-
Expand source code
def _get_generic_codes(self): if len(self._payload['pk_generic_codes']) == 0: return [] cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)' args = {'pks': self._payload['pk_generic_codes']} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]
var hospital_stay
-
Expand source code
def _get_stay(self): if self._payload['pk_hospital_stay'] is None: return None return gmHospitalStay.cHospitalStay(aPK_obj = self._payload['pk_hospital_stay'])
var org_unit
-
Expand source code
def _get_org_unit(self): return gmOrganization.cOrgUnit(self._payload['pk_org_unit'])
Methods
def add_code(self, pk_code=None)
-
must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code) Expand source code
def add_code(self, pk_code=None): """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)""" cmd = "INSERT INTO clin.lnk_code2procedure (fk_item, fk_generic_code) values (%(issue)s, %(code)s)" args = { 'issue': self._payload['pk_procedure'], 'code': pk_code } gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True
def format_maximum_information(self, left_margin=0, patient=None)
-
Expand source code
def format_maximum_information(self, left_margin=0, patient=None): return self.format ( left_margin = left_margin, include_episode = True, include_codes = True, include_address = True, include_comm = True, include_doc = True ).split('\n')
def remove_code(self, pk_code=None)
-
must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code) Expand source code
def remove_code(self, pk_code=None): """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)""" cmd = "DELETE FROM clin.lnk_code2procedure WHERE fk_item = %(issue)s AND fk_generic_code = %(code)s" args = { 'issue': self._payload['pk_procedure'], 'code': pk_code } gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True
Inherited members