Module Gnumed.business.gmPathLab
GNUmed measurements related business objects.
Functions
def calculate_bmi(mass=None, height=None, age=None)
-
Expand source code
def calculate_bmi(mass=None, height=None, age=None): """Calculate BMI. mass: kg height: cm age: not yet used returns: (True/False, data) True: data = (bmi, lower_normal, upper_normal) False: data = error message """ converted, mass = gmTools.input2decimal(mass) if not converted: return False, 'mass: cannot convert <%s> to Decimal' % mass converted, height = gmTools.input2decimal(height) if not converted: return False, 'height: cannot convert <%s> to Decimal' % height approx_surface = (height / decimal.Decimal(100))**2 bmi = mass / approx_surface print(mass, height, '->', approx_surface, '->', bmi) lower_normal_mass = 20.0 * approx_surface upper_normal_mass = 25.0 * approx_surface return True, (bmi, lower_normal_mass, upper_normal_mass)
Calculate BMI.
mass: kg height: cm age: not yet used
returns: (True/False, data) True: data = (bmi, lower_normal, upper_normal) False: data = error message
def create_lab_request(lab=None, req_id=None, pat_id=None, encounter_id=None, episode_id=None)
-
Expand source code
def create_lab_request(lab=None, req_id=None, pat_id=None, encounter_id=None, episode_id=None): """Create or get lab request. returns tuple (status, value): (True, lab request instance) (False, error message) (None, housekeeping_todo primary key) """ pass # req = None # aPK_obj = { # 'lab': lab, # 'req_id': req_id # } # try: # req = cLabRequest (aPK_obj) # except gmExceptions.NoSuchClinItemError as msg: # _log.info('%s: will try to create lab request' % str(msg)) # except gmExceptions.ConstructorError as msg: # _log.exception(str(msg), sys.exc_info(), verbose=0) # return (False, msg) # found
Create or get lab request.
returns tuple (status, value): (True, lab request instance) (False, error message) (None, housekeeping_todo primary key)
def create_measurement_type(lab=None, abbrev=None, unit=None, name=None, link_obj=None)
-
Expand source code
def create_measurement_type(lab=None, abbrev=None, unit=None, name=None, link_obj=None): """Create or get test type.""" ttype = find_measurement_type(lab = lab, abbrev = abbrev, name = name, link_obj = link_obj) # found ? if ttype is not None: return ttype _log.debug('creating test type [%s:%s:%s:%s]', lab, abbrev, name, unit) # not found, so create it # if unit is None: # _log.error('need <unit> to create test type: %s:%s:%s:%s' % (lab, abbrev, name, unit)) # raise ValueError('need <unit> to create test type') # make query cols = [] val_snippets = [] vals = {} # lab if lab is None: lab = create_test_org(link_obj = link_obj)['pk_test_org'] cols.append('fk_test_org') try: vals['lab'] = int(lab) val_snippets.append('%(lab)s') except Exception: vals['lab'] = lab val_snippets.append('(SELECT pk_test_org FROM clin.v_test_orgs WHERE unit = %(lab)s)') # code cols.append('abbrev') val_snippets.append('%(abbrev)s') vals['abbrev'] = abbrev # unit if unit is not None: cols.append('reference_unit') val_snippets.append('%(unit)s') vals['unit'] = unit # name if name is not None: cols.append('name') val_snippets.append('%(name)s') vals['name'] = name col_clause = ', '.join(cols) val_clause = ', '.join(val_snippets) queries = [ {'sql': 'insert into clin.test_type (%s) values (%s)' % (col_clause, val_clause), 'args': vals}, {'sql': "select * from clin.v_test_types where pk_test_type = currval(pg_get_serial_sequence('clin.test_type', 'pk'))"} ] rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = queries, return_data = True) ttype = cMeasurementType(row = {'pk_field': 'pk_test_type', 'data': rows[0]}) return ttype
Create or get test type.
def create_meta_type(name=None, abbreviation=None, return_existing=False)
-
Expand source code
def create_meta_type(name=None, abbreviation=None, return_existing=False): cmd = """ INSERT INTO clin.meta_test_type (name, abbrev) SELECT %(name)s, %(abbr)s WHERE NOT EXISTS ( SELECT 1 FROM clin.meta_test_type WHERE name = %(name)s AND abbrev = %(abbr)s ) RETURNING *, xmin """ args = { 'name': name.strip(), 'abbr': abbreviation.strip() } rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}], return_data = True) if len(rows) == 0: if not return_existing: return None cmd = "SELECT *, xmin FROM clin.meta_test_type WHERE name = %(name)s and %(abbr)s" rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return cMetaTestType(row = {'pk_field': 'pk', 'data': rows[0]})
def create_test_org(name=None, comment=None, pk_org_unit=None, link_obj=None)
-
Expand source code
def create_test_org(name=None, comment=None, pk_org_unit=None, link_obj=None): _log.debug('creating test org [%s:%s:%s]', name, comment, pk_org_unit) if name is None: name = 'unassigned lab' # get org unit if pk_org_unit is None: org = gmOrganization.create_org ( link_obj = link_obj, organization = name, category = 'Laboratory' ) org_unit = gmOrganization.create_org_unit ( link_obj = link_obj, pk_organization = org['pk_org'], unit = name ) pk_org_unit = org_unit['pk_org_unit'] # test org exists ? args = {'pk_unit': pk_org_unit} cmd = 'SELECT pk_test_org FROM clin.v_test_orgs WHERE pk_org_unit = %(pk_unit)s' rows = gmPG2.run_ro_queries(link_obj = link_obj, queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: cmd = 'INSERT INTO clin.test_org (fk_org_unit) VALUES (%(pk_unit)s) RETURNING pk' rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'sql': cmd, 'args': args}], return_data = True) test_org = cTestOrg(link_obj = link_obj, aPK_obj = rows[0][0]) if comment is not None: comment = comment.strip() test_org['comment'] = comment test_org.save(conn = link_obj) return test_org
def create_test_panel(description=None)
-
Expand source code
def create_test_panel(description=None): args = {'desc': description.strip()} cmd = """ INSERT INTO clin.test_panel (description) VALUES (gm.nullify_empty_string(%(desc)s)) RETURNING pk """ rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}], return_data = True) return cTestPanel(aPK_obj = rows[0]['pk'])
def create_test_result(encounter=None,
episode=None,
type=None,
intended_reviewer=None,
val_num=None,
val_alpha=None,
unit=None,
link_obj=None)-
Expand source code
def create_test_result(encounter=None, episode=None, type=None, intended_reviewer=None, val_num=None, val_alpha=None, unit=None, link_obj=None): cmd1 = """ INSERT INTO clin.test_result ( fk_encounter, fk_episode, fk_type, fk_intended_reviewer, val_num, val_alpha, val_unit ) VALUES ( %(enc)s, %(epi)s, %(type)s, %(rev)s, %(v_num)s, %(v_alpha)s, %(unit)s ) """ cmd2 = "SELECT * from clin.v_test_results WHERE pk_test_result = currval(pg_get_serial_sequence('clin.test_result', 'pk'))" args = { 'enc': encounter, 'epi': episode, 'type': type, 'rev': intended_reviewer, 'v_num': val_num, 'v_alpha': val_alpha, 'unit': unit } rows = gmPG2.run_rw_queries ( link_obj = link_obj, queries = [ {'sql': cmd1, 'args': args}, {'sql': cmd2} ], return_data = True ) tr = cTestResult(row = { 'pk_field': 'pk_test_result', 'data': rows[0] }) return tr
def delete_measurement_type(measurement_type=None)
-
Expand source code
def delete_measurement_type(measurement_type=None): cmd = 'delete from clin.test_type where pk = %(pk)s' args = {'pk': measurement_type} gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
def delete_meta_type(meta_type=None)
-
Expand source code
def delete_meta_type(meta_type=None): cmd = """ DELETE FROM clin.meta_test_type WHERE pk = %(pk)s AND NOT EXISTS ( SELECT 1 FROM clin.test_type WHERE fk_meta_test_type = %(pk)s )""" args = {'pk': meta_type} gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
def delete_test_org(test_org=None)
-
Expand source code
def delete_test_org(test_org=None): args = {'pk': test_org} cmd = """ DELETE FROM clin.test_org WHERE pk = %(pk)s AND NOT EXISTS (SELECT 1 FROM clin.lab_request WHERE fk_test_org = %(pk)s LIMIT 1) AND NOT EXISTS (SELECT 1 FROM clin.test_type WHERE fk_test_org = %(pk)s LIMIT 1) """ gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
def delete_test_panel(pk=None)
-
Expand source code
def delete_test_panel(pk=None): args = {'pk': pk} cmd = "DELETE FROM clin.test_panel WHERE pk = %(pk)s" gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True
def delete_test_result(result=None)
-
Expand source code
def delete_test_result(result=None): try: pk = int(result) except (TypeError, AttributeError): pk = result['pk_test_result'] cmd = 'DELETE FROM clin.test_result WHERE pk = %(pk)s' gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': {'pk': pk}}])
def export_results_for_gnuplot(results=None, filename=None, show_year=True, patient=None)
-
Expand source code
def export_results_for_gnuplot(results=None, filename=None, show_year=True, patient=None): sandbox_dir = os.path.join(gmTools.gmPaths().tmp_dir, 'gplot') if not gmTools.mkdir(sandbox_dir): sandbox_dir = gmTools.mk_sandbox_dir(prefix = 'gm2gpl-') _log.debug('sandbox directory: [%s]', sandbox_dir) if filename is None: filename = gmTools.get_unique_filename(prefix = 'gm2gpl-', suffix = '.dat', tmp_dir = sandbox_dir) # sort results into groups by test type results_grouped_by_test_type = {} for r in results: try: results_grouped_by_test_type[r['unified_name']].append(r) except KeyError: results_grouped_by_test_type[r['unified_name']] = [r] conf_name = '%s.conf' % filename gplot_conf = open(conf_name, mode = 'wt', encoding = 'utf8') gplot_conf.write('# settings for stacked multiplot layouts:\n') sub_title = _('plotted %s (GNUmed v%s)') % ( gmDateTime.pydt_now_here().strftime('%Y %b %d %H:%M'), _cfg.get(option = 'client_version') ) if patient is None: plot_title = sub_title else: plot_title = '%s - %s\\n{/*0.8 %s}' % ( patient.get_description_gender(with_nickname = False).strip(), patient.get_formatted_dob(format = '%Y %b %d', none_string = _('unknown DOB'), honor_estimation = True), sub_title ) gplot_conf.write('multiplot_title = "%s"\n' % plot_title) gplot_conf.write('multiplot_no_of_tests = %s # number of index blocks (resp. test types)\n' % len(results_grouped_by_test_type)) gplot_conf.write('array multiplot_y_labels[multiplot_no_of_tests] # list for ylabels suitable for stacked multiplots\n') gplot_conf.write('\n') gplot_conf.write('# settings for individual plots, stacked or not:\n') gplot_data = open(filename, mode = 'wt', encoding = 'utf8') gplot_data.write(GPLOT_DATAFILE_HEADER % filename) test_type_groups = list(results_grouped_by_test_type) for test_type_idx in range(len(test_type_groups)): test_type = test_type_groups[test_type_idx] if len(results_grouped_by_test_type[test_type]) == 0: continue first_result = results_grouped_by_test_type[test_type][0] if test_type_idx == 0: gplot_conf.write('set title "%s" enhanced\n' % plot_title) gplot_conf.write('\n') gplot_conf.write('set ylabel "%s"\n' % first_result['unified_name']) elif test_type_idx == 1: gplot_conf.write('set y2label "%s"\n' % first_result['unified_name']) gplot_conf.write('multiplot_y_labels[%s] = "%s (%s)"\n' % (test_type_idx + 1, first_result['unified_name'], first_result['unified_abbrev'])) title = '%s (%s)' % ( first_result['unified_abbrev'], first_result['unified_name'] ) gplot_data.write('\n\n"%s" "%s"\n' % (title, title)) prev_date = None prev_year = None for result in sorted(results_grouped_by_test_type[test_type], key=lambda result:result['clin_when']): curr_date = result['clin_when'].strftime('%Y-%m-%d') if curr_date == prev_date: gplot_data.write('# %s\n\n' % _('blank line inserted to allow for discontinued-line style drawing of same-day values:')) if show_year: if result['clin_when'].year == prev_year: when_template = '%b %d %H:%M' else: when_template = '%b %d %H:%M (%Y)' prev_year = result['clin_when'].year else: when_template = '%b %d' val = result['val_num'] if val is None: val = result.estimate_numeric_value_from_alpha if val is None: continue # skip distinctly non-numericable values gplot_data.write ('%s %s "%s" %s %s %s %s %s %s "%s"\n' % ( result['clin_when'].strftime('%Y-%m-%d_%H:%M'), val, gmTools.coalesce(result['val_unit'], '"<?>"'), gmTools.coalesce(result['unified_target_min'], '"<?>"'), gmTools.coalesce(result['unified_target_max'], '"<?>"'), gmTools.coalesce(result['val_normal_min'], '"<?>"'), gmTools.coalesce(result['val_normal_max'], '"<?>"'), gmTools.coalesce(result['val_target_min'], '"<?>"'), gmTools.coalesce(result['val_target_max'], '"<?>"'), result['clin_when'].strftime(when_template) )) prev_date = curr_date gplot_data.close() gplot_conf.close() return filename
def find_measurement_type(lab=None, abbrev=None, name=None, link_obj=None)
-
Expand source code
def find_measurement_type(lab=None, abbrev=None, name=None, link_obj=None): if (abbrev is None) and (name is None): raise ValueError('must have <abbrev> and/or <name> set') where_snippets = [] if lab is None: where_snippets.append('pk_test_org IS NULL') else: try: int(lab) where_snippets.append('pk_test_org = %(lab)s') except (TypeError, ValueError): where_snippets.append('pk_test_org = (SELECT pk_test_org FROM clin.v_test_orgs WHERE unit = %(lab)s)') if abbrev is not None: where_snippets.append('abbrev = %(abbrev)s') if name is not None: where_snippets.append('name = %(name)s') where_clause = ' and '.join(where_snippets) cmd = "select * from clin.v_test_types where %s" % where_clause args = {'lab': lab, 'abbrev': abbrev, 'name': name} rows = gmPG2.run_ro_queries(link_obj = link_obj, queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None tt = cMeasurementType(row = {'pk_field': 'pk_test_type', 'data': rows[0]}) return tt
def format_test_results(results=None, output_format='latex')
-
Expand source code
def format_test_results(results=None, output_format='latex'): _log.debug('formatting test results into [%s]', output_format) if output_format == 'latex': return __format_test_results_latex(results = results) msg = _('unknown test results output format [%s]') % output_format _log.error(msg) return msg
def generate_failsafe_test_results_entries(pk_patient: int = None,
test_results: list[cTestResult] = None,
max_width: int = 80) ‑> list[str]-
Expand source code
def generate_failsafe_test_results_entries(pk_patient:int=None, test_results:list[cTestResult]=None, max_width:int=80) -> list[str]: if not test_results: if pk_patient: test_results = get_test_results(pk_patient = 12, order_by = 'unified_abbrev, clin_when DESC') if not test_results: return [] prev_abbrev = None lines = [] for tr in test_results: abbrev = tr['unified_abbrev'] if abbrev != prev_abbrev: lines.append('') lines.append('%s - %s' % (abbrev, tr['unified_name'])) prev_abbrev = abbrev line = ' %s: %s %s%s%s' % ( tr['clin_when'].strftime('%Y %b %d %H:%m'), tr['unified_val'], tr['val_unit'], gmTools.bool2subst(tr.is_considered_abnormal, ' !', '', ''), gmTools.coalesce(tr.formatted_range, '', ' (%s)') ) lines.append(gmTools.shorten_text(line, max_width)) if tr['note_test_org']: cmt = _(' Lab: %s') % tr['note_test_org'].replace('\n', ' // ') lines.append(gmTools.shorten_text(cmt, max_width)) if tr['comment']: cmt = _(' Praxis: %s') % tr['comment'].replace('\n', ' // ') lines.append(gmTools.shorten_text(cmt, max_width)) return lines
def get_measurement_types(order_by=None, loincs=None, return_pks=False)
-
Expand source code
def get_measurement_types(order_by=None, loincs=None, return_pks=False): args = {} where_parts = [] if loincs is not None: if len(loincs) > 0: where_parts.append('loinc = ANY(%(loincs)s)') args['loincs'] = loincs if len(where_parts) == 0: where_parts.append('TRUE') WHERE_clause = ' AND '.join(where_parts) cmd = (_SQL_get_test_types % WHERE_clause) + gmTools.coalesce(order_by, '', ' ORDER BY %s') rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if return_pks: return [ r['pk_test_type'] for r in rows ] return [ cMeasurementType(row = {'pk_field': 'pk_test_type', 'data': r}) for r in rows ]
def get_meta_test_types(return_pks=False)
-
Expand source code
def get_meta_test_types(return_pks=False): cmd = 'SELECT *, xmin FROM clin.meta_test_type' rows = gmPG2.run_ro_queries(queries = [{'sql': cmd}]) if return_pks: return [ r['pk'] for r in rows ] return [ cMetaTestType(row = {'pk_field': 'pk', 'data': r}) for r in rows ]
def get_most_recent_result_for_test_types(pk_test_types=None,
pk_patient=None,
return_pks=False,
consider_meta_type=False,
order_by=None)-
Expand source code
def get_most_recent_result_for_test_types(pk_test_types=None, pk_patient=None, return_pks=False, consider_meta_type=False, order_by=None): """Return the one most recent result for *each* of a list of test types. If <pk_test_types> is not given, most recent results for *each* test type the patient has any results for is returned. """ where_parts = ['pk_patient = %(pat)s'] args = {'pat': pk_patient} if pk_test_types is not None: where_parts.append('pk_test_type = ANY(%(ttyps)s)') args['ttyps'] = pk_test_types order_by = 'ORDER BY clin_when DESC' if order_by is None else 'ORDER BY %s' % order_by if consider_meta_type: cmd = 'SELECT * FROM ((%s) UNION ALL (%s)) AS result_union %s' % ( _SQL_most_recent_result_for_test_types_without_meta_type % ' AND '.join(where_parts), _SQL_most_recent_result_for_test_types_by_meta_type % ' AND '.join(where_parts), order_by ) else: cmd = _SQL_most_recent_result_for_test_types % ( ' AND '.join(where_parts), order_by ) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if return_pks: return [ r['pk_test_result'] for r in rows ] return [ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]
Return the one most recent result for each of a list of test types.
If
is not given, most recent results for each test type the patient has any results for is returned. def get_most_recent_results_for_panel(pk_patient=None, pk_panel=None, order_by=None, group_by_meta_type=False)
-
Expand source code
def get_most_recent_results_for_panel(pk_patient=None, pk_panel=None, order_by=None, group_by_meta_type=False): if order_by is None: order_by = '' else: order_by = 'ORDER BY %s' % order_by args = { 'pat': pk_patient, 'pk_pnl': pk_panel } if group_by_meta_type: # return most recent results in panel grouped by # meta test type if any, non-grouped results are # returned ungrouped :-) cmd = """ SELECT c_vtr.* FROM ( -- max(clin_when) per test_type-in-panel for patient SELECT pk_meta_test_type, MAX(clin_when) AS max_clin_when FROM clin.v_test_results WHERE pk_patient = %(pat)s AND pk_meta_test_type IS DISTINCT FROM NULL AND pk_test_type IN ( (SELECT c_vtt4tp.pk_test_type FROM clin.v_test_types4test_panel c_vtt4tp WHERE c_vtt4tp.pk_test_panel = %(pk_pnl)s) ) GROUP BY pk_meta_test_type ) AS latest_results INNER JOIN clin.v_test_results c_vtr ON c_vtr.pk_meta_test_type = latest_results.pk_meta_test_type AND c_vtr.clin_when = latest_results.max_clin_when UNION ALL SELECT c_vtr.* FROM ( -- max(clin_when) per test_type-in-panel for patient SELECT pk_test_type, MAX(clin_when) AS max_clin_when FROM clin.v_test_results WHERE pk_patient = %(pat)s AND pk_meta_test_type IS NULL AND pk_test_type IN ( (SELECT c_vtt4tp.pk_test_type FROM clin.v_test_types4test_panel c_vtt4tp WHERE c_vtt4tp.pk_test_panel = %(pk_pnl)s) ) GROUP BY pk_test_type ) AS latest_results INNER JOIN clin.v_test_results c_vtr ON c_vtr.pk_test_type = latest_results.pk_test_type AND c_vtr.clin_when = latest_results.max_clin_when """ else: # return most recent results in panel regardless of whether # distinct test types in this panel are grouped under the # same meta test type cmd = """ SELECT c_vtr.* FROM ( -- max(clin_when) per test_type-in-panel for patient SELECT pk_test_type, MAX(clin_when) AS max_clin_when FROM clin.v_test_results WHERE pk_patient = %(pat)s AND pk_test_type IN ( (SELECT c_vtt4tp.pk_test_type FROM clin.v_test_types4test_panel c_vtt4tp WHERE c_vtt4tp.pk_test_panel = %(pk_pnl)s) ) GROUP BY pk_test_type ) AS latest_results -- this INNER join makes certain we do not expand -- the row selection beyond the patient's rows -- which we constrained to inside the SELECT -- producing "latest_results" INNER JOIN clin.v_test_results c_vtr ON c_vtr.pk_test_type = latest_results.pk_test_type AND c_vtr.clin_when = latest_results.max_clin_when """ cmd += order_by rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]
def get_most_recent_results_for_patient(no_of_results=1, patient=None)
-
Expand source code
def get_most_recent_results_for_patient(no_of_results=1, patient=None): """Get N most recent results for a given patient.""" if no_of_results < 1: raise ValueError('<no_of_results> must be > 0') args = {'pat': patient} cmd = """ SELECT * FROM clin.v_test_results WHERE pk_patient = %%(pat)s ORDER BY clin_when DESC LIMIT %s""" % no_of_results rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]
Get N most recent results for a given patient.
def get_most_recent_results_for_test_type(test_type=None, max_no_of_results=1, patient=None)
-
Expand source code
def get_most_recent_results_for_test_type(test_type=None, max_no_of_results=1, patient=None): """Get N most recent results for *one* defined test type.""" assert (test_type is not None), '<test_type> must not be None' assert (max_no_of_results > 0), '<max_no_of_results> must be > 0' args = {'pat': patient, 'ttyp': test_type} where_parts = ['pk_patient = %(pat)s'] where_parts.append('pk_test_type = %(ttyp)s') # ?consider: pk_meta_test_type = %(pkmtt)s / self._payload['pk_meta_test_type'] cmd = """ SELECT * FROM clin.v_test_results WHERE %s ORDER BY clin_when DESC LIMIT %s""" % ( ' AND '.join(where_parts), max_no_of_results ) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]
Get N most recent results for one defined test type.
def get_most_recent_results_in_loinc_group(loincs=None,
max_no_of_results=1,
patient=None,
max_age=None,
consider_indirect_matches=False)-
Expand source code
def get_most_recent_results_in_loinc_group(loincs=None, max_no_of_results=1, patient=None, max_age=None, consider_indirect_matches=False): """Get N most recent results *among* a list of tests selected by LOINC. 1) get test types with LOINC (or meta type LOINC) in the group of <loincs> 2) from these get the test results for <patient> within the given <max_age> 3) from these return "the N=<max_no_of_results> most recent ones" or "None" <loinc> must be a list or tuple or set, NOT a single string <max_age> must be a string holding a PG interval or else a pydt interval """ assert (max_no_of_results > 0), '<max_no_of_results> must be >0' args = {'pat': patient, 'loincs': loincs} if max_age is None: max_age_cond = '' else: max_age_cond = 'AND clin_when > (now() - %(max_age)s::interval)' args['max_age'] = max_age cmd = """ SELECT * FROM ( SELECT DISTINCT ON (pk_test_result) * FROM clin.v_test_results WHERE pk_patient = %%(pat)s AND unified_loinc = ANY(%%(loincs)s) %s ) AS distinct_results ORDER BY clin_when DESC LIMIT %s""" % ( max_age_cond, max_no_of_results ) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: return [ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ] if not consider_indirect_matches: return [] cmd = """ -- get the test results SELECT * FROM clin.v_test_results c_vtr WHERE -- for this <patient> pk_patient = %%(pat)s AND -- not having *any* LOINC (if the result did have a LOINC and had not been caught by the by-LOINC search it does not apply) unified_loinc IS NULL AND -- with these meta test types (= results for the explicit equivalance group) c_vtr.pk_meta_test_type IN ( -- get the meta test types for those types SELECT pk_meta_test_type FROM clin.v_test_types WHERE pk_meta_test_type IS NOT NULL AND (-- retrieve test types which have .LOINC in <loincs> (loinc IN %%(loincs)s) OR (loinc_meta IN %%(loincs)s) ) AND -- but no result for <patient> pk_test_type NOT IN ( select pk_test_type from clin.v_test_results WHERE pk_patient = %%(pat)s ) %s ) -- return the N most resent result ORDER BY clin_when DESC LIMIT %s """ % ( max_age_cond, max_no_of_results ) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]
Get N most recent results among a list of tests selected by LOINC.
1) get test types with LOINC (or meta type LOINC) in the group of
2) from these get the test results for within the given 3) from these return "the N= most recent ones" or "None" must be a list or tuple or set, NOT a single string must be a string holding a PG interval or else a pydt interval def get_next_request_ID(lab=None, incrementor_func=None)
-
Expand source code
def get_next_request_ID(lab=None, incrementor_func=None): """Get logically next request ID for given lab. - incrementor_func: - if not supplied the next ID is guessed - if supplied it is applied to the most recently used ID """ if type(lab) == int: lab_snippet = 'vlr.fk_test_org=%s' else: lab_snippet = 'vlr.lab_name=%s' lab = str(lab) cmd = """ select request_id from lab_request lr0 where lr0.clin_when = ( select max(vlr.sampled_when) from v_lab_requests vlr where %s )""" % lab_snippet rows = gmPG2.run_ro_queries(cmd, None, lab) if rows is None: _log.warning('error getting most recently used request ID for lab [%s]' % lab) return '' if len(rows) == 0: return '' most_recent = rows[0][0] # apply supplied incrementor if incrementor_func is not None: try: next = incrementor_func(most_recent) except TypeError: _log.error('cannot call incrementor function [%s]' % str(incrementor_func)) return most_recent return next # try to be smart ourselves for pos in range(len(most_recent)): header = most_recent[:pos] trailer = most_recent[pos:] try: return '%s%s' % (header, str(int(trailer) + 1)) except ValueError: header = most_recent[:-1] trailer = most_recent[-1:] return '%s%s' % (header, chr(ord(trailer) + 1))
Get logically next request ID for given lab.
- incrementor_func:
- if not supplied the next ID is guessed
- if supplied it is applied to the most recently used ID
def get_oldest_result(test_type=None, loinc=None, patient=None)
-
Expand source code
def get_oldest_result(test_type=None, loinc=None, patient=None): if None not in [test_type, loinc]: raise ValueError('either <test_type> or <loinc> must be None') args = { 'pat': patient, 'ttyp': test_type, 'loinc': loinc } where_parts = ['pk_patient = %(pat)s'] if test_type is not None: where_parts.append('pk_test_type = %(ttyp)s') # consider: pk_meta_test_type = %(pkmtt)s / self._payload['pk_meta_test_type'] elif loinc is not None: where_parts.append('((loinc_tt = ANY(%(loinc)s)) OR (loinc_meta = ANY(%(loinc)s)))') args['loinc'] = loinc cmd = """ SELECT * FROM clin.v_test_results WHERE %s ORDER BY clin_when LIMIT 1""" % ' AND '.join(where_parts) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return cTestResult(row = {'pk_field': 'pk_test_result', 'data': rows[0]})
def get_pending_requests(limit=250)
-
Expand source code
def get_pending_requests(limit=250): lim = limit + 1 cmd = "select pk from lab_request where is_pending is true limit %s" % lim rows = gmPG2.run_ro_queries(queries = cmd) if rows is None: _log.error('error retrieving pending lab requests') return (None, None) if len(rows) == 0: return (False, []) # more than LIMIT rows ? if len(rows) == lim: too_many = True # but deliver only LIMIT rows so that our assumption holds true... rows = rows[:limit] else: too_many = False requests = [] for row in rows: try: requests.append(cLabRequest(aPK_obj=row[0])) except gmExceptions.ConstructorError: _log.exception('skipping pending lab request [%s]' % row[0], sys.exc_info(), verbose=0) return (too_many, requests)
def get_result_at_timestamp(timestamp=None, test_type=None, loinc=None, tolerance_interval=None, patient=None)
-
Expand source code
def get_result_at_timestamp(timestamp=None, test_type=None, loinc=None, tolerance_interval=None, patient=None): if None not in [test_type, loinc]: raise ValueError('either <test_type> or <loinc> must be None') args = { 'pat': patient, 'ttyp': test_type, 'loinc': loinc, 'ts': timestamp, 'intv': tolerance_interval } where_parts = ['pk_patient = %(pat)s'] if test_type is not None: where_parts.append('pk_test_type = %(ttyp)s') # consider: pk_meta_test_type = %(pkmtt)s / self._payload['pk_meta_test_type'] elif loinc is not None: where_parts.append('((loinc_tt = ANY(%(loinc)s)) OR (loinc_meta = ANY(%(loinc)s)))') args['loinc'] = loinc if tolerance_interval is None: where_parts.append('clin_when = %(ts)s') else: where_parts.append('clin_when between (%(ts)s - %(intv)s::interval) AND (%(ts)s + %(intv)s::interval)') cmd = """ SELECT * FROM clin.v_test_results WHERE %s ORDER BY abs(extract(epoch from age(clin_when, %%(ts)s))) LIMIT 1""" % ' AND '.join(where_parts) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return cTestResult(row = {'pk_field': 'pk_test_result', 'data': rows[0]})
def get_results_for_day(timestamp=None, patient=None, order_by=None)
-
Expand source code
def get_results_for_day(timestamp=None, patient=None, order_by=None): args = { 'pat': patient, 'ts': timestamp } where_parts = [ 'pk_patient = %(pat)s', "date_trunc('day'::text, clin_when) = date_trunc('day'::text, %(ts)s)" ] cmd = """ SELECT * FROM clin.v_test_results WHERE %s ORDER BY val_grouping, abbrev_tt, clin_when DESC """ % ' AND '.join(where_parts) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]
def get_results_for_episode(pk_episode=None)
-
Expand source code
def get_results_for_episode(pk_episode=None): args = {'pk_epi': pk_episode} where_parts = ['pk_episode = %(pk_epi)s'] cmd = """ SELECT * FROM clin.v_test_results WHERE %s ORDER BY val_grouping, abbrev_tt, clin_when DESC """ % ' AND '.join(where_parts) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]
def get_results_for_issue(pk_health_issue=None, order_by=None)
-
Expand source code
def get_results_for_issue(pk_health_issue=None, order_by=None): args = {'pk_issue': pk_health_issue} where_parts = ['pk_health_issue = %(pk_issue)s'] cmd = """ SELECT * FROM clin.v_test_results WHERE %s ORDER BY val_grouping, abbrev_tt, clin_when DESC """ % ' AND '.join(where_parts) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]
def get_test_orgs(order_by='unit', return_pks=False)
-
Expand source code
def get_test_orgs(order_by='unit', return_pks=False): cmd = 'SELECT * FROM clin.v_test_orgs ORDER BY %s' % order_by rows = gmPG2.run_ro_queries(queries = [{'sql': cmd}]) if return_pks: return [ r['pk_test_org'] for r in rows ] return [ cTestOrg(row = {'pk_field': 'pk_test_org', 'data': r}) for r in rows ]
def get_test_panels(order_by=None, loincs=None, return_pks=False)
-
Expand source code
def get_test_panels(order_by=None, loincs=None, return_pks=False): where_args = {} if loincs is None: where_parts = ['true'] else: where_parts = ['loincs @> %(loincs)s'] where_args['loincs'] = list(loincs) if order_by is None: order_by = u'' else: order_by = ' ORDER BY %s' % order_by cmd = (_SQL_get_test_panels % ' AND '.join(where_parts)) + order_by rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': where_args}]) if return_pks: return [ r['pk_test_panel'] for r in rows ] return [ cTestPanel(row = {'data': r, 'pk_field': 'pk_test_panel'}) for r in rows ]
def get_test_results(pk_patient=None, encounters=None, episodes=None, order_by=None, return_pks=False)
-
Expand source code
def get_test_results(pk_patient=None, encounters=None, episodes=None, order_by=None, return_pks=False): where_parts = [] if pk_patient is not None: where_parts.append('pk_patient = %(pat)s') args = {'pat': pk_patient} # if tests is not None: # where_parts.append(u'pk_test_type = ANY(%(tests)s)') # args['tests'] = tests if encounters is not None: where_parts.append('pk_encounter = ANY(%(encs)s)') args['encs'] = encounters if episodes is not None: where_parts.append('pk_episode = ANY(%(epis)s)') args['epis'] = episodes if order_by is None: order_by = '' else: order_by = 'ORDER BY %s' % order_by cmd = """ SELECT * FROM clin.v_test_results WHERE %s %s """ % ( ' AND '.join(where_parts), order_by ) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if return_pks: return [ r['pk_test_result'] for r in rows ] tests = [ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ] return tests
Classes
class cLabRequest (aPK_obj=None, row=None)
-
Expand source code
class cLabRequest(gmBusinessDBObject.cBusinessDBObject): """Represents one lab request.""" _cmd_fetch_payload = """ select *, xmin_lab_request from v_lab_requests where pk_request=%s""" _cmds_lock_rows_for_update = [ """select 1 from lab_request where pk=%(pk_request)s and xmin=%(xmin_lab_request)s for update""" ] _cmds_store_payload = [ """update lab_request set request_id=%(request_id)s, lab_request_id=%(lab_request_id)s, clin_when=%(sampled_when)s, lab_rxd_when=%(lab_rxd_when)s, results_reported_when=%(results_reported_when)s, request_status=%(request_status)s, is_pending=%(is_pending)s::bool, narrative=%(progress_note)s where pk=%(pk_request)s""", """select xmin_lab_request from v_lab_requests where pk_request=%(pk_request)s""" ] _updatable_fields = [ 'request_id', 'lab_request_id', 'sampled_when', 'lab_rxd_when', 'results_reported_when', 'request_status', 'is_pending', 'progress_note' ] #-------------------------------------------------------- def __init__(self, aPK_obj=None, row=None): """Instantiate lab request. The aPK_obj can be either a dict with the keys "req_id" and "lab" or a simple primary key. """ # instantiate from row data ? if aPK_obj is None: gmBusinessDBObject.cBusinessDBObject.__init__(self, row=row) return pk = aPK_obj # instantiate from "req_id" and "lab" ? if type(aPK_obj) == dict: # sanity check try: aPK_obj['req_id'] aPK_obj['lab'] except Exception: _log.exception('[%s:??]: faulty <aPK_obj> structure: [%s]' % (self.__class__.__name__, aPK_obj), sys.exc_info()) raise gmExceptions.ConstructorError('[%s:??]: cannot derive PK from [%s]' % (self.__class__.__name__, aPK_obj)) # generate query where_snippets = [] where_snippets.append('request_id=%(req_id)s') if type(aPK_obj['lab']) == int: where_snippets.append('pk_test_org=%(lab)s') else: where_snippets.append('lab_name=%(lab)s') # where_clause = ' and '.join(where_snippets) # cmd = "select pk_request from v_lab_requests where %s" % where_clause # # get pk # data = gmPG2.run_ro_query('historica', cmd, None, aPK_obj) # if data is None: # raise gmExceptions.ConstructorError('[%s:??]: error getting lab request for [%s]' % (self.__class__.__name__, aPK_obj)) # if len(data) == 0: # raise gmExceptions.NoSuchClinItemError('[%s:??]: no lab request for [%s]' % (self.__class__.__name__, aPK_obj)) # pk = data[0][0] # instantiate class gmBusinessDBObject.cBusinessDBObject.__init__(self, aPK_obj=pk)
Represents one lab request.
Instantiate lab request.
The aPK_obj can be either a dict with the keys "req_id" and "lab" or a simple primary key.
Ancestors
Inherited members
class cMeasurementType (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Expand source code
class cMeasurementType(gmBusinessDBObject.cBusinessDBObject): """Represents one test result type.""" _cmd_fetch_payload = _SQL_get_test_types % "pk_test_type = %s" _cmds_store_payload = [ """UPDATE clin.test_type SET abbrev = gm.nullify_empty_string(%(abbrev)s), name = gm.nullify_empty_string(%(name)s), loinc = gm.nullify_empty_string(%(loinc)s), comment = gm.nullify_empty_string(%(comment_type)s), reference_unit = gm.nullify_empty_string(%(reference_unit)s), fk_test_org = %(pk_test_org)s, fk_meta_test_type = %(pk_meta_test_type)s WHERE pk = %(pk_test_type)s AND xmin = %(xmin_test_type)s RETURNING xmin AS xmin_test_type""" ] _updatable_fields = [ 'abbrev', 'name', 'loinc', 'comment_type', 'reference_unit', 'pk_test_org', 'pk_meta_test_type' ] #-------------------------------------------------------- # properties #-------------------------------------------------------- def _get_in_use(self): cmd = 'SELECT EXISTS(SELECT 1 FROM clin.test_result WHERE fk_type = %(pk_type)s)' args = {'pk_type': self._payload['pk_test_type']} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return rows[0][0] in_use = property(_get_in_use) #-------------------------------------------------------- def get_most_recent_results(self, patient=None, max_no_of_results=1): results = get_most_recent_results_for_test_type ( test_type = self._payload['pk_test_type'], max_no_of_results = max_no_of_results, patient = patient ) if len(results) > 0: return results if self._payload['loinc'] is None: return [] return get_most_recent_results_in_loinc_group ( loincs = list(self._payload['loinc']), max_no_of_results = max_no_of_results, patient = patient # ? ) #-------------------------------------------------------- def get_oldest_result(self, patient=None): result = get_oldest_result ( test_type = self._payload['pk_test_type'], loinc = None, patient = patient ) if result is None: if self._payload['loinc'] is not None: result = get_oldest_result ( test_type = None, loinc = self._payload['loinc'], patient = patient ) return result #-------------------------------------------------------- def _get_test_panels(self): if self._payload['pk_test_panels'] is None: return None return [ cTestPanel(aPK_obj = pk) for pk in self._payload['pk_test_panels'] ] test_panels = property(_get_test_panels) #-------------------------------------------------------- def get_meta_test_type(self, real_one_only=True): if real_one_only is False: return cMetaTestType(aPK_obj = self._payload['pk_meta_test_type']) if self._payload['is_fake_meta_type']: return None return cMetaTestType(aPK_obj = self._payload['pk_meta_test_type']) meta_test_type = property(get_meta_test_type) #-------------------------------------------------------- def get_temporally_closest_normal_range(self, unit, timestamp=None): """Returns the closest test result which does have normal range information. - needs <unit> - if <timestamp> is None it will assume now() and thus return the most recent """ if timestamp is None: timestamp = gmDateTime.pydt_now_here() cmd = """ SELECT * FROM clin.v_test_results WHERE pk_test_type = %(pk_type)s AND val_unit = %(unit)s AND ( (val_normal_min IS NOT NULL) OR (val_normal_max IS NOT NULL) OR (val_normal_range IS NOT NULL) ) ORDER BY CASE WHEN clin_when > %(clin_when)s THEN clin_when - %(clin_when)s ELSE %(clin_when)s - clin_when END LIMIT 1""" args = { 'pk_type': self._payload['pk_test_type'], 'unit': unit, 'clin_when': timestamp } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None r = rows[0] return cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) #-------------------------------------------------------- def get_temporally_closest_target_range(self, unit, patient, timestamp=None): """Returns the closest test result which does have target range information. - needs <unit> - needs <patient> (as target will be per-patient) - if <timestamp> is None it will assume now() and thus return the most recent """ if timestamp is None: timestamp = gmDateTime.pydt_now_here() cmd = """ SELECT * FROM clin.v_test_results WHERE pk_test_type = %(pk_type)s AND val_unit = %(unit)s AND pk_patient = %(pat)s AND ( (val_target_min IS NOT NULL) OR (val_target_max IS NOT NULL) OR (val_target_range IS NOT NULL) ) ORDER BY CASE WHEN clin_when > %(clin_when)s THEN clin_when - %(clin_when)s ELSE %(clin_when)s - clin_when END LIMIT 1""" args = { 'pk_type': self._payload['pk_test_type'], 'unit': unit, 'pat': patient, 'clin_when': timestamp } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None r = rows[0] return cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) #-------------------------------------------------------- def get_temporally_closest_unit(self, timestamp=None): """Returns the unit of the closest test result. - if <timestamp> is None it will assume now() and thus return the most recent """ if timestamp is None: timestamp = gmDateTime.pydt_now_here() cmd = """ SELECT val_unit FROM clin.v_test_results WHERE pk_test_type = %(pk_type)s AND val_unit IS NOT NULL ORDER BY CASE WHEN clin_when > %(clin_when)s THEN clin_when - %(clin_when)s ELSE %(clin_when)s - clin_when END LIMIT 1""" args = { 'pk_type': self._payload['pk_test_type'], 'clin_when': timestamp } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return rows[0]['val_unit'] temporally_closest_unit = property(get_temporally_closest_unit) #-------------------------------------------------------- def format(self, patient=None): tt = '' tt += _('Test type "%s" (%s) [#%s]\n') % ( self._payload['name'], self._payload['abbrev'], self._payload['pk_test_type'] ) tt += '\n' tt += gmTools.coalesce(self._payload['loinc'], '', ' LOINC: %s\n') tt += gmTools.coalesce(self._payload['reference_unit'], '', _(' Reference unit: %s\n')) tt += gmTools.coalesce(self._payload['comment_type'], '', _(' Comment: %s\n')) tt += '\n' tt += _('Lab details:\n') tt += _(' Name: %s\n') % gmTools.coalesce(self._payload['name_org'], '') tt += gmTools.coalesce(self._payload['contact_org'], '', _(' Contact: %s\n')) tt += gmTools.coalesce(self._payload['comment_org'], '', _(' Comment: %s\n')) if self._payload['is_fake_meta_type'] is False: tt += '\n' tt += _('Aggregated under meta type:\n') tt += _(' Name: %s - %s [#%s]\n') % ( self._payload['abbrev_meta'], self._payload['name_meta'], self._payload['pk_meta_test_type'] ) tt += gmTools.coalesce(self._payload['loinc_meta'], '', ' LOINC: %s\n') tt += gmTools.coalesce(self._payload['comment_meta'], '', _(' Comment: %s\n')) panels = self.test_panels if panels is not None: tt += '\n' tt += _('Listed in test panels:\n') for panel in panels: tt += _(' Panel "%s" [#%s]\n') % ( panel['description'], panel['pk_test_panel'] ) if patient is not None: tt += '\n' results = self.get_most_recent_results(patient = patient, max_no_of_results = 1) if len(results) > 0: result = results[0] tt += _(' Most recent (%s): %s%s%s') % ( result['clin_when'].strftime('%Y %b %d'), result['unified_val'], gmTools.coalesce(result['val_unit'], '', ' %s'), gmTools.coalesce(result['abnormality_indicator'], '', ' (%s)') ) result = self.get_oldest_result(patient = patient) if result is not None: tt += '\n' tt += _(' Oldest (%s): %s%s%s') % ( result['clin_when'].strftime('%Y %b %d'), result['unified_val'], gmTools.coalesce(result['val_unit'], '', ' %s'), gmTools.coalesce(result['abnormality_indicator'], '', ' (%s)') ) return tt
Represents one test result type.
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Ancestors
Instance variables
prop in_use
-
Expand source code
def _get_in_use(self): cmd = 'SELECT EXISTS(SELECT 1 FROM clin.test_result WHERE fk_type = %(pk_type)s)' args = {'pk_type': self._payload['pk_test_type']} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return rows[0][0]
prop meta_test_type
-
Expand source code
def get_meta_test_type(self, real_one_only=True): if real_one_only is False: return cMetaTestType(aPK_obj = self._payload['pk_meta_test_type']) if self._payload['is_fake_meta_type']: return None return cMetaTestType(aPK_obj = self._payload['pk_meta_test_type'])
prop temporally_closest_unit
-
Expand source code
def get_temporally_closest_unit(self, timestamp=None): """Returns the unit of the closest test result. - if <timestamp> is None it will assume now() and thus return the most recent """ if timestamp is None: timestamp = gmDateTime.pydt_now_here() cmd = """ SELECT val_unit FROM clin.v_test_results WHERE pk_test_type = %(pk_type)s AND val_unit IS NOT NULL ORDER BY CASE WHEN clin_when > %(clin_when)s THEN clin_when - %(clin_when)s ELSE %(clin_when)s - clin_when END LIMIT 1""" args = { 'pk_type': self._payload['pk_test_type'], 'clin_when': timestamp } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return rows[0]['val_unit']
Returns the unit of the closest test result.
- if
is None it will assume now() and thus return the most recent
- if
prop test_panels
-
Expand source code
def _get_test_panels(self): if self._payload['pk_test_panels'] is None: return None return [ cTestPanel(aPK_obj = pk) for pk in self._payload['pk_test_panels'] ]
Methods
def get_meta_test_type(self, real_one_only=True)
-
Expand source code
def get_meta_test_type(self, real_one_only=True): if real_one_only is False: return cMetaTestType(aPK_obj = self._payload['pk_meta_test_type']) if self._payload['is_fake_meta_type']: return None return cMetaTestType(aPK_obj = self._payload['pk_meta_test_type'])
def get_most_recent_results(self, patient=None, max_no_of_results=1)
-
Expand source code
def get_most_recent_results(self, patient=None, max_no_of_results=1): results = get_most_recent_results_for_test_type ( test_type = self._payload['pk_test_type'], max_no_of_results = max_no_of_results, patient = patient ) if len(results) > 0: return results if self._payload['loinc'] is None: return [] return get_most_recent_results_in_loinc_group ( loincs = list(self._payload['loinc']), max_no_of_results = max_no_of_results, patient = patient # ? )
def get_oldest_result(self, patient=None)
-
Expand source code
def get_oldest_result(self, patient=None): result = get_oldest_result ( test_type = self._payload['pk_test_type'], loinc = None, patient = patient ) if result is None: if self._payload['loinc'] is not None: result = get_oldest_result ( test_type = None, loinc = self._payload['loinc'], patient = patient ) return result
def get_temporally_closest_normal_range(self, unit, timestamp=None)
-
Expand source code
def get_temporally_closest_normal_range(self, unit, timestamp=None): """Returns the closest test result which does have normal range information. - needs <unit> - if <timestamp> is None it will assume now() and thus return the most recent """ if timestamp is None: timestamp = gmDateTime.pydt_now_here() cmd = """ SELECT * FROM clin.v_test_results WHERE pk_test_type = %(pk_type)s AND val_unit = %(unit)s AND ( (val_normal_min IS NOT NULL) OR (val_normal_max IS NOT NULL) OR (val_normal_range IS NOT NULL) ) ORDER BY CASE WHEN clin_when > %(clin_when)s THEN clin_when - %(clin_when)s ELSE %(clin_when)s - clin_when END LIMIT 1""" args = { 'pk_type': self._payload['pk_test_type'], 'unit': unit, 'clin_when': timestamp } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None r = rows[0] return cTestResult(row = {'pk_field': 'pk_test_result', 'data': r})
Returns the closest test result which does have normal range information.
- needs
- if
is None it will assume now() and thus return the most recent
- needs
def get_temporally_closest_target_range(self, unit, patient, timestamp=None)
-
Expand source code
def get_temporally_closest_target_range(self, unit, patient, timestamp=None): """Returns the closest test result which does have target range information. - needs <unit> - needs <patient> (as target will be per-patient) - if <timestamp> is None it will assume now() and thus return the most recent """ if timestamp is None: timestamp = gmDateTime.pydt_now_here() cmd = """ SELECT * FROM clin.v_test_results WHERE pk_test_type = %(pk_type)s AND val_unit = %(unit)s AND pk_patient = %(pat)s AND ( (val_target_min IS NOT NULL) OR (val_target_max IS NOT NULL) OR (val_target_range IS NOT NULL) ) ORDER BY CASE WHEN clin_when > %(clin_when)s THEN clin_when - %(clin_when)s ELSE %(clin_when)s - clin_when END LIMIT 1""" args = { 'pk_type': self._payload['pk_test_type'], 'unit': unit, 'pat': patient, 'clin_when': timestamp } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None r = rows[0] return cTestResult(row = {'pk_field': 'pk_test_result', 'data': r})
Returns the closest test result which does have target range information.
- needs
- needs
(as target will be per-patient) - if
is None it will assume now() and thus return the most recent
- needs
def get_temporally_closest_unit(self, timestamp=None)
-
Expand source code
def get_temporally_closest_unit(self, timestamp=None): """Returns the unit of the closest test result. - if <timestamp> is None it will assume now() and thus return the most recent """ if timestamp is None: timestamp = gmDateTime.pydt_now_here() cmd = """ SELECT val_unit FROM clin.v_test_results WHERE pk_test_type = %(pk_type)s AND val_unit IS NOT NULL ORDER BY CASE WHEN clin_when > %(clin_when)s THEN clin_when - %(clin_when)s ELSE %(clin_when)s - clin_when END LIMIT 1""" args = { 'pk_type': self._payload['pk_test_type'], 'clin_when': timestamp } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return rows[0]['val_unit']
Returns the unit of the closest test result.
- if
is None it will assume now() and thus return the most recent
- if
Inherited members
class cMetaTestType (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Expand source code
class cMetaTestType(gmBusinessDBObject.cBusinessDBObject): """Represents one meta test type under which actual test types can be aggregated.""" _cmd_fetch_payload = "SELECT *, xmin FROM clin.meta_test_type WHERE pk = %s" _cmds_store_payload = [""" UPDATE clin.meta_test_type SET abbrev = %(abbrev)s, name = %(name)s, loinc = gm.nullify_empty_string(%(loinc)s), comment = gm.nullify_empty_string(%(comment)s) WHERE pk = %(pk)s AND xmin = %(xmin)s RETURNING xmin """] _updatable_fields = [ 'abbrev', 'name', 'loinc', 'comment' ] #-------------------------------------------------------- def format(self, with_tests=False, patient=None): txt = _('Meta (%s=aggregate) test type [#%s]\n\n') % (gmTools.u_sum, self._payload['pk']) txt += _(' Name: %s (%s)\n') % ( self._payload['abbrev'], self._payload['name'] ) if self._payload['loinc'] is not None: txt += ' LOINC: %s\n' % self._payload['loinc'] if self._payload['comment'] is not None: txt += _(' Comment: %s\n') % self._payload['comment'] if with_tests: ttypes = self.included_test_types if len(ttypes) > 0: txt += _(' Aggregates the following test types:\n') for ttype in ttypes: txt += ' - %s (%s)%s%s%s [#%s]\n' % ( ttype['name'], ttype['abbrev'], gmTools.coalesce(ttype['reference_unit'], '', ', %s'), gmTools.coalesce(ttype['name_org'], '', ' (%s)'), gmTools.coalesce(ttype['loinc'], '', ', LOINC: %s'), ttype['pk_test_type'] ) if patient is not None: txt += '\n' most_recent = self.get_most_recent_result(patient = patient) if most_recent is not None: txt += _(' Most recent (%s): %s%s%s') % ( most_recent['clin_when'].strftime('%Y %b %d'), most_recent['unified_val'], gmTools.coalesce(most_recent['val_unit'], '', ' %s'), gmTools.coalesce(most_recent['abnormality_indicator'], '', ' (%s)') ) oldest = self.get_oldest_result(patient = patient) if oldest is not None: txt += '\n' txt += _(' Oldest (%s): %s%s%s') % ( oldest['clin_when'].strftime('%Y %b %d'), oldest['unified_val'], gmTools.coalesce(oldest['val_unit'], '', ' %s'), gmTools.coalesce(oldest['abnormality_indicator'], '', ' (%s)') ) return txt #-------------------------------------------------------- def get_most_recent_result(self, patient=None): args = { 'pat': patient, 'mttyp': self._payload['pk'] } cmd = """ SELECT * FROM clin.v_test_results WHERE pk_patient = %(pat)s AND pk_meta_test_type = %(mttyp)s ORDER BY clin_when DESC LIMIT 1""" rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return cTestResult(row = {'pk_field': 'pk_test_result', 'data': rows[0]}) #-------------------------------------------------------- def get_oldest_result(self, patient=None): args = { 'pat': patient, 'mttyp': self._payload['pk'] } cmd = """ SELECT * FROM clin.v_test_results WHERE pk_patient = %(pat)s AND pk_meta_test_type = %(mttyp)s ORDER BY clin_when LIMIT 1""" rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return cTestResult(row = {'pk_field': 'pk_test_result', 'data': rows[0]}) #-------------------------------------------------------- def get_temporally_closest_result(self, date, pk_patient): args = { 'pat': pk_patient, 'mtyp': self._payload['pk'], 'mloinc': self._payload['loinc'], 'when': date } SQL = """ SELECT * FROM clin.v_test_results WHERE pk_patient = %%(pat)s AND clin_when %s %%(when)s AND ((pk_meta_test_type = %%(mtyp)s) OR (loinc_meta = %%(mloinc)s)) ORDER BY clin_when LIMIT 1""" # get earlier results by meta type earlier_result = None cmd = SQL % '<' rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: earlier_result = cTestResult(row = {'pk_field': 'pk_test_result', 'data': rows[0]}) # get later results by meta type ? later_result = None cmd = SQL % '>' rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: later_result = cTestResult(row = {'pk_field': 'pk_test_result', 'data': rows[0]}) if earlier_result is None: return later_result if later_result is None: return earlier_result earlier_ago = date - earlier_result['clin_when'] later_ago = later_result['clin_when'] - date if earlier_ago < later_ago: return earlier_result return later_result #-------------------------------------------------------- # properties #-------------------------------------------------------- def _get_included_test_types(self): cmd = _SQL_get_test_types % 'pk_meta_test_type = %(pk_meta)s' args = {'pk_meta': self._payload['pk']} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cMeasurementType(row = {'pk_field': 'pk_test_type', 'data': r}) for r in rows ] included_test_types = property(_get_included_test_types)
Represents one meta test type under which actual test types can be aggregated.
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Ancestors
Instance variables
prop included_test_types
-
Expand source code
def _get_included_test_types(self): cmd = _SQL_get_test_types % 'pk_meta_test_type = %(pk_meta)s' args = {'pk_meta': self._payload['pk']} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cMeasurementType(row = {'pk_field': 'pk_test_type', 'data': r}) for r in rows ]
Methods
def get_most_recent_result(self, patient=None)
-
Expand source code
def get_most_recent_result(self, patient=None): args = { 'pat': patient, 'mttyp': self._payload['pk'] } cmd = """ SELECT * FROM clin.v_test_results WHERE pk_patient = %(pat)s AND pk_meta_test_type = %(mttyp)s ORDER BY clin_when DESC LIMIT 1""" rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return cTestResult(row = {'pk_field': 'pk_test_result', 'data': rows[0]})
def get_oldest_result(self, patient=None)
-
Expand source code
def get_oldest_result(self, patient=None): args = { 'pat': patient, 'mttyp': self._payload['pk'] } cmd = """ SELECT * FROM clin.v_test_results WHERE pk_patient = %(pat)s AND pk_meta_test_type = %(mttyp)s ORDER BY clin_when LIMIT 1""" rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return cTestResult(row = {'pk_field': 'pk_test_result', 'data': rows[0]})
def get_temporally_closest_result(self, date, pk_patient)
-
Expand source code
def get_temporally_closest_result(self, date, pk_patient): args = { 'pat': pk_patient, 'mtyp': self._payload['pk'], 'mloinc': self._payload['loinc'], 'when': date } SQL = """ SELECT * FROM clin.v_test_results WHERE pk_patient = %%(pat)s AND clin_when %s %%(when)s AND ((pk_meta_test_type = %%(mtyp)s) OR (loinc_meta = %%(mloinc)s)) ORDER BY clin_when LIMIT 1""" # get earlier results by meta type earlier_result = None cmd = SQL % '<' rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: earlier_result = cTestResult(row = {'pk_field': 'pk_test_result', 'data': rows[0]}) # get later results by meta type ? later_result = None cmd = SQL % '>' rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: later_result = cTestResult(row = {'pk_field': 'pk_test_result', 'data': rows[0]}) if earlier_result is None: return later_result if later_result is None: return earlier_result earlier_ago = date - earlier_result['clin_when'] later_ago = later_result['clin_when'] - date if earlier_ago < later_ago: return earlier_result return later_result
Inherited members
class cTestOrg (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Expand source code
class cTestOrg(gmBusinessDBObject.cBusinessDBObject): """Represents one test org/lab.""" _cmd_fetch_payload = _SQL_get_test_orgs % 'pk_test_org = %s' _cmds_store_payload = [ """UPDATE clin.test_org SET fk_org_unit = %(pk_org_unit)s, contact = gm.nullify_empty_string(%(test_org_contact)s), comment = gm.nullify_empty_string(%(comment)s) WHERE pk = %(pk_test_org)s AND xmin = %(xmin_test_org)s RETURNING xmin AS xmin_test_org """ ] _updatable_fields = [ 'pk_org_unit', 'test_org_contact', 'comment' ]
Represents one test org/lab.
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Ancestors
Inherited members
class cTestPanel (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Expand source code
class cTestPanel(gmBusinessDBObject.cBusinessDBObject): """Represents a grouping/listing of tests into a panel.""" _cmd_fetch_payload = _SQL_get_test_panels % "pk_test_panel = %s" _cmds_store_payload = [ """ UPDATE clin.test_panel SET description = gm.nullify_empty_string(%(description)s), comment = gm.nullify_empty_string(%(comment)s) WHERE pk = %(pk_test_panel)s AND xmin = %(xmin_test_panel)s RETURNING xmin AS xmin_test_panel """ ] _updatable_fields = [ 'description', 'comment' ] #-------------------------------------------------------- def format(self): txt = _('Test panel "%s" [#%s]\n') % ( self._payload['description'], self._payload['pk_test_panel'] ) if self._payload['comment'] is not None: txt += '\n' txt += gmTools.wrap ( text = self._payload['comment'], width = 50, initial_indent = ' ', subsequent_indent = ' ' ) txt += '\n' txt += '\n' txt += _('Includes:\n') if len(self.included_loincs) == 0: txt += _('no tests') else: tts_by_loinc = {} for loinc in self._payload['loincs']: tts_by_loinc[loinc] = [] for ttype in self.test_types: tts_by_loinc[ttype['loinc']].append(ttype) for loinc, ttypes in tts_by_loinc.items(): # maybe resolve LOINC, too txt += _(' %s: %s\n') % ( loinc, '; '.join([ '%(abbrev)s@%(name_org)s [#%(pk_test_type)s]' % tt for tt in ttypes ]) ) codes = self.generic_codes if len(codes) > 0: txt += '\n' for c in codes: txt += ' %s: %s (%s - %s)\n' % ( c['code'], c['term'], c['name_short'], c['version'] ) return txt #-------------------------------------------------------- def add_code(self, pk_code=None): """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)""" cmd = "INSERT INTO clin.lnk_code2tst_pnl (fk_item, fk_generic_code) values (%(tp)s, %(code)s)" args = { 'tp': self._payload['pk_test_panel'], 'code': pk_code } gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True #-------------------------------------------------------- def remove_code(self, pk_code=None): """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)""" cmd = "DELETE FROM clin.lnk_code2tst_pnl WHERE fk_item = %(tp)s AND fk_generic_code = %(code)s" args = { 'tp': self._payload['pk_test_panel'], 'code': pk_code } gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True #-------------------------------------------------------- def get_test_types_for_results(self, pk_patient, order_by=None, unique_meta_types=False): """Retrieve data about test types on this panel (for which this patient has results).""" if order_by is None: order_by = '' else: order_by = 'ORDER BY %s' % order_by if unique_meta_types: cmd = """ SELECT * FROM clin.v_test_types c_vtt WHERE c_vtt.pk_test_type IN ( SELECT DISTINCT ON (c_vtr1.pk_meta_test_type) c_vtr1.pk_test_type FROM clin.v_test_results c_vtr1 WHERE c_vtr1.pk_test_type IN %%(pks)s AND c_vtr1.pk_patient = %%(pat)s AND c_vtr1.pk_meta_test_type IS NOT NULL UNION ALL SELECT DISTINCT ON (c_vtr2.pk_test_type) c_vtr2.pk_test_type FROM clin.v_test_results c_vtr2 WHERE c_vtr2.pk_test_type IN %%(pks)s AND c_vtr2.pk_patient = %%(pat)s AND c_vtr2.pk_meta_test_type IS NULL ) %s""" % order_by else: cmd = """ SELECT * FROM clin.v_test_types c_vtt WHERE c_vtt.pk_test_type IN ( SELECT DISTINCT ON (c_vtr.pk_test_type) c_vtr.pk_test_type FROM clin.v_test_results c_vtr WHERE c_vtr.pk_test_type = ANY(%%(pks)s) AND c_vtr.pk_patient = %%(pat)s ) %s""" % order_by args = { 'pat': pk_patient, 'pks': [ tt['pk_test_type'] for tt in self._payload['test_types'] ] } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cMeasurementType(row = {'pk_field': 'pk_test_type', 'data': r}) for r in rows ] #-------------------------------------------------------- def add_loinc(self, loinc): if self._payload['loincs'] is not None: if loinc in self._payload['loincs']: return gmPG2.run_rw_queries(queries = [{ 'sql': 'INSERT INTO clin.lnk_loinc2test_panel (fk_test_panel, loinc) VALUES (%(pk_pnl)s, %(loinc)s)', 'args': {'loinc': loinc, 'pk_pnl': self._payload['pk_test_panel']} }]) return #-------------------------------------------------------- def remove_loinc(self, loinc): if self._payload['loincs'] is None: return if loinc not in self._payload['loincs']: return gmPG2.run_rw_queries(queries = [{ 'sql': 'DELETE FROM clin.lnk_loinc2test_panel WHERE fk_test_panel = %(pk_pnl)s AND loinc = %(loinc)s', 'args': {'loinc': loinc, 'pk_pnl': self._payload['pk_test_panel']} }]) return #-------------------------------------------------------- # properties #-------------------------------------------------------- def _get_included_loincs(self): return self._payload['loincs'] def _set_included_loincs(self, loincs): queries = [] # remove those which don't belong if len(loincs) == 0: cmd = 'DELETE FROM clin.lnk_loinc2test_panel WHERE fk_test_panel = %(pk_pnl)s' else: cmd = 'DELETE FROM clin.lnk_loinc2test_panel WHERE fk_test_panel = %(pk_pnl)s AND loinc <> ALL(%(loincs)s)' queries.append({'sql': cmd, 'args': {'loincs': loincs, 'pk_pnl': self._payload['pk_test_panel']}}) # add those not there yet if len(loincs) > 0: for loinc in loincs: cmd = """INSERT INTO clin.lnk_loinc2test_panel (fk_test_panel, loinc) SELECT %(pk_pnl)s, %(loinc)s WHERE NOT EXISTS ( SELECT 1 FROM clin.lnk_loinc2test_panel WHERE fk_test_panel = %(pk_pnl)s AND loinc = %(loinc)s )""" queries.append({'sql': cmd, 'args': {'loinc': loinc, 'pk_pnl': self._payload['pk_test_panel']}}) return gmPG2.run_rw_queries(queries = queries) included_loincs = property(_get_included_loincs, _set_included_loincs) #-------------------------------------------------------- def _get_test_types(self): if len(self._payload['test_types']) == 0: return [] rows = gmPG2.run_ro_queries ( queries = [{ 'sql': _SQL_get_test_types % 'pk_test_type = ANY(%(pks)s) ORDER BY unified_abbrev', 'args': {'pks': [ tt['pk_test_type'] for tt in self._payload['test_types'] ]} }] ) return [ cMeasurementType(row = {'data': r, 'pk_field': 'pk_test_type'}) for r in rows ] test_types = property(_get_test_types) #-------------------------------------------------------- def _get_generic_codes(self): if len(self._payload['pk_generic_codes']) == 0: return [] cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)' args = {'pks': self._payload['pk_generic_codes']} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ] def _set_generic_codes(self, pk_codes): queries = [] # remove all codes if len(self._payload['pk_generic_codes']) > 0: queries.append ({ 'sql': 'DELETE FROM clin.lnk_code2tst_pnl WHERE fk_item = %(tp)s AND fk_generic_code = ANY(%(codes)s)', 'args': { 'tp': self._payload['pk_test_panel'], 'codes': self._payload['pk_generic_codes'] } }) # add new codes for pk_code in pk_codes: queries.append ({ 'sql': 'INSERT INTO clin.lnk_code2test_panel (fk_item, fk_generic_code) VALUES (%(tp)s, %(pk_code)s)', 'args': { 'tp': self._payload['pk_test_panel'], 'pk_code': pk_code } }) if len(queries) == 0: return # run it all in one transaction gmPG2.run_rw_queries(queries = queries) return generic_codes = property(_get_generic_codes, _set_generic_codes) #-------------------------------------------------------- def get_most_recent_results(self, pk_patient=None, order_by=None, group_by_meta_type=False, include_missing=False): if len(self._payload['test_types']) == 0: return [] pnl_results = get_most_recent_results_for_panel ( pk_patient = pk_patient, pk_panel = self._payload['pk_test_panel'], order_by = order_by, group_by_meta_type = group_by_meta_type ) if not include_missing: return pnl_results loincs_found = [ r['loinc_tt'] for r in pnl_results ] loincs_found.extend([ r['loinc_meta'] for r in pnl_results if r['loinc_meta'] not in loincs_found ]) loincs2consider = set([ tt['loinc'] for tt in self._payload['test_types'] ]) loincs_missing = loincs2consider - set(loincs_found) pnl_results.extend(loincs_missing) return pnl_results
Represents a grouping/listing of tests into a panel.
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Ancestors
Instance variables
prop generic_codes
-
Expand source code
def _get_generic_codes(self): if len(self._payload['pk_generic_codes']) == 0: return [] cmd = gmCoding._SQL_get_generic_linked_codes % 'pk_generic_code = ANY(%(pks)s)' args = {'pks': self._payload['pk_generic_codes']} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]
prop included_loincs
-
Expand source code
def _get_included_loincs(self): return self._payload['loincs']
prop test_types
-
Expand source code
def _get_test_types(self): if len(self._payload['test_types']) == 0: return [] rows = gmPG2.run_ro_queries ( queries = [{ 'sql': _SQL_get_test_types % 'pk_test_type = ANY(%(pks)s) ORDER BY unified_abbrev', 'args': {'pks': [ tt['pk_test_type'] for tt in self._payload['test_types'] ]} }] ) return [ cMeasurementType(row = {'data': r, 'pk_field': 'pk_test_type'}) for r in rows ]
Methods
def add_code(self, pk_code=None)
-
Expand source code
def add_code(self, pk_code=None): """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)""" cmd = "INSERT INTO clin.lnk_code2tst_pnl (fk_item, fk_generic_code) values (%(tp)s, %(code)s)" args = { 'tp': self._payload['pk_test_panel'], 'code': pk_code } gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True
must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code) def add_loinc(self, loinc)
-
Expand source code
def add_loinc(self, loinc): if self._payload['loincs'] is not None: if loinc in self._payload['loincs']: return gmPG2.run_rw_queries(queries = [{ 'sql': 'INSERT INTO clin.lnk_loinc2test_panel (fk_test_panel, loinc) VALUES (%(pk_pnl)s, %(loinc)s)', 'args': {'loinc': loinc, 'pk_pnl': self._payload['pk_test_panel']} }]) return
def get_most_recent_results(self,
pk_patient=None,
order_by=None,
group_by_meta_type=False,
include_missing=False)-
Expand source code
def get_most_recent_results(self, pk_patient=None, order_by=None, group_by_meta_type=False, include_missing=False): if len(self._payload['test_types']) == 0: return [] pnl_results = get_most_recent_results_for_panel ( pk_patient = pk_patient, pk_panel = self._payload['pk_test_panel'], order_by = order_by, group_by_meta_type = group_by_meta_type ) if not include_missing: return pnl_results loincs_found = [ r['loinc_tt'] for r in pnl_results ] loincs_found.extend([ r['loinc_meta'] for r in pnl_results if r['loinc_meta'] not in loincs_found ]) loincs2consider = set([ tt['loinc'] for tt in self._payload['test_types'] ]) loincs_missing = loincs2consider - set(loincs_found) pnl_results.extend(loincs_missing) return pnl_results
def get_test_types_for_results(self, pk_patient, order_by=None, unique_meta_types=False)
-
Expand source code
def get_test_types_for_results(self, pk_patient, order_by=None, unique_meta_types=False): """Retrieve data about test types on this panel (for which this patient has results).""" if order_by is None: order_by = '' else: order_by = 'ORDER BY %s' % order_by if unique_meta_types: cmd = """ SELECT * FROM clin.v_test_types c_vtt WHERE c_vtt.pk_test_type IN ( SELECT DISTINCT ON (c_vtr1.pk_meta_test_type) c_vtr1.pk_test_type FROM clin.v_test_results c_vtr1 WHERE c_vtr1.pk_test_type IN %%(pks)s AND c_vtr1.pk_patient = %%(pat)s AND c_vtr1.pk_meta_test_type IS NOT NULL UNION ALL SELECT DISTINCT ON (c_vtr2.pk_test_type) c_vtr2.pk_test_type FROM clin.v_test_results c_vtr2 WHERE c_vtr2.pk_test_type IN %%(pks)s AND c_vtr2.pk_patient = %%(pat)s AND c_vtr2.pk_meta_test_type IS NULL ) %s""" % order_by else: cmd = """ SELECT * FROM clin.v_test_types c_vtt WHERE c_vtt.pk_test_type IN ( SELECT DISTINCT ON (c_vtr.pk_test_type) c_vtr.pk_test_type FROM clin.v_test_results c_vtr WHERE c_vtr.pk_test_type = ANY(%%(pks)s) AND c_vtr.pk_patient = %%(pat)s ) %s""" % order_by args = { 'pat': pk_patient, 'pks': [ tt['pk_test_type'] for tt in self._payload['test_types'] ] } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cMeasurementType(row = {'pk_field': 'pk_test_type', 'data': r}) for r in rows ]
Retrieve data about test types on this panel (for which this patient has results).
def remove_code(self, pk_code=None)
-
Expand source code
def remove_code(self, pk_code=None): """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)""" cmd = "DELETE FROM clin.lnk_code2tst_pnl WHERE fk_item = %(tp)s AND fk_generic_code = %(code)s" args = { 'tp': self._payload['pk_test_panel'], 'code': pk_code } gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True
must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code) def remove_loinc(self, loinc)
-
Expand source code
def remove_loinc(self, loinc): if self._payload['loincs'] is None: return if loinc not in self._payload['loincs']: return gmPG2.run_rw_queries(queries = [{ 'sql': 'DELETE FROM clin.lnk_loinc2test_panel WHERE fk_test_panel = %(pk_pnl)s AND loinc = %(loinc)s', 'args': {'loinc': loinc, 'pk_pnl': self._payload['pk_test_panel']} }]) return
Inherited members
class cTestResult (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Expand source code
class cTestResult(gmBusinessDBObject.cBusinessDBObject): """Represents one test result.""" _cmd_fetch_payload = "select * from clin.v_test_results where pk_test_result = %s" _cmds_store_payload = [ """UPDATE clin.test_result SET clin_when = %(clin_when)s, narrative = nullif(trim(%(comment)s), ''), val_num = %(val_num)s, val_alpha = nullif(trim(%(val_alpha)s), ''), val_unit = nullif(trim(%(val_unit)s), ''), val_normal_min = %(val_normal_min)s, val_normal_max = %(val_normal_max)s, val_normal_range = nullif(trim(%(val_normal_range)s), ''), val_target_min = %(val_target_min)s, val_target_max = %(val_target_max)s, val_target_range = nullif(trim(%(val_target_range)s), ''), abnormality_indicator = nullif(trim(%(abnormality_indicator)s), ''), norm_ref_group = nullif(trim(%(norm_ref_group)s), ''), note_test_org = nullif(trim(%(note_test_org)s), ''), material = nullif(trim(%(material)s), ''), material_detail = nullif(trim(%(material_detail)s), ''), status = gm.nullify_empty_string(%(status)s), val_grouping = gm.nullify_empty_string(%(val_grouping)s), source_data = gm.nullify_empty_string(%(source_data)s), fk_intended_reviewer = %(pk_intended_reviewer)s, fk_encounter = %(pk_encounter)s, fk_episode = %(pk_episode)s, fk_type = %(pk_test_type)s, fk_request = %(pk_request)s WHERE pk = %(pk_test_result)s AND xmin = %(xmin_test_result)s RETURNING xmin AS xmin_test_result """ # , u"""select xmin_test_result from clin.v_test_results where pk_test_result = %(pk_test_result)s""" ] _updatable_fields = [ 'clin_when', 'comment', 'val_num', 'val_alpha', 'val_unit', 'val_normal_min', 'val_normal_max', 'val_normal_range', 'val_target_min', 'val_target_max', 'val_target_range', 'abnormality_indicator', 'norm_ref_group', 'note_test_org', 'material', 'material_detail', 'status', 'val_grouping', 'source_data', 'pk_intended_reviewer', 'pk_encounter', 'pk_episode', 'pk_test_type', 'pk_request' ] #-------------------------------------------------------- def format_concisely(self, date_format='%Y %b %d', with_notes=True): range_info = gmTools.coalesce ( self.formatted_clinical_range, self.formatted_normal_range ) review = gmTools.bool2subst ( self._payload['reviewed'], '', ' ' + gmTools.u_writing_hand, ' ' + gmTools.u_writing_hand ) txt = '%s %s: %s%s%s%s%s%s' % ( self._payload['clin_when'].strftime(date_format), self._payload['name_tt'], self._payload['unified_val'], gmTools.coalesce(self._payload['val_unit'], '', ' %s'), gmTools.coalesce(self._payload['abnormality_indicator'], '', ' %s'), gmTools.coalesce(range_info, '', ' (%s)'), gmTools.coalesce(self._payload['status'], '', ' [%s]')[:2], review ) if with_notes: txt += '\n' if self._payload['note_test_org'] is not None: txt += ' ' + _('Lab comment: %s\n') % _('\n Lab comment: ').join(self._payload['note_test_org'].split('\n')) if self._payload['comment'] is not None: txt += ' ' + _('Praxis comment: %s\n') % _('\n Praxis comment: ').join(self._payload['comment'].split('\n')) return txt.strip('\n') #-------------------------------------------------------- def __format_evaluation(self) -> str: if self._payload['val_num'] is None: return '' tt = '' norm_eval = None # 1) normal range # lowered ? if (self._payload['val_normal_min'] is not None) and (self._payload['val_num'] < self._payload['val_normal_min']): try: percent = (self._payload['val_num'] * 100) / self._payload['val_normal_min'] except ZeroDivisionError: percent = None if percent is not None: if percent < 6: norm_eval = _('%.1f %% of the normal lower limit') % percent else: norm_eval = _('%.0f %% of the normal lower limit') % percent # raised ? if (self._payload['val_normal_max'] is not None) and (self._payload['val_num'] > self._payload['val_normal_max']): try: x_times = self._payload['val_num'] / self._payload['val_normal_max'] except ZeroDivisionError: x_times = None if x_times is not None: if x_times < 10: norm_eval = _('%.1f times the normal upper limit') % x_times else: norm_eval = _('%.0f times the normal upper limit') % x_times if norm_eval: tt += ' = %s\n' % norm_eval # #------------------------------------- # # this idea was shot down on the list # #------------------------------------- # # bandwidth of deviation # if None not in [self._payload['val_normal_min'], self._payload['val_normal_max']]: # normal_width = self._payload['val_normal_max'] - self._payload['val_normal_min'] # deviation_from_normal_range = None # # below ? # if self._payload['val_num'] < self._payload['val_normal_min']: # deviation_from_normal_range = self._payload['val_normal_min'] - self._payload['val_num'] # # above ? # elif self._payload['val_num'] > self._payload['val_normal_max']: # deviation_from_normal_range = self._payload['val_num'] - self._payload['val_normal_max'] # if deviation_from_normal_range is None: # try: # times_deviation = deviation_from_normal_range / normal_width # except ZeroDivisionError: # times_deviation = None # if times_deviation is not None: # if times_deviation < 10: # tt += u' (%s)\n' % _(u'deviates by %.1f times of the normal range') % times_deviation # else: # tt += u' (%s)\n' % _(u'deviates by %.0f times of the normal range') % times_deviation # #------------------------------------- # 2) clinical target range norm_eval = None # lowered ? if (self._payload['val_target_min'] is not None) and (self._payload['val_num'] < self._payload['val_target_min']): try: percent = (self._payload['val_num'] * 100) / self._payload['val_target_min'] except ZeroDivisionError: percent = None if percent is not None: if percent < 6: norm_eval = _('%.1f %% of the target lower limit') % percent else: norm_eval = _('%.0f %% of the target lower limit') % percent # raised ? if (self._payload['val_target_max'] is not None) and (self._payload['val_num'] > self._payload['val_target_max']): try: x_times = self._payload['val_num'] / self._payload['val_target_max'] except ZeroDivisionError: x_times = None if x_times is not None: if x_times < 10: norm_eval = _('%.1f times the target upper limit') % x_times else: norm_eval = _('%.0f times the target upper limit') % x_times if norm_eval: tt += ' = %s\n' % norm_eval # #------------------------------------- # # this idea was shot down on the list # #------------------------------------- # # bandwidth of deviation # if None not in [self._payload['val_target_min'], self._payload['val_target_max']]: # normal_width = self._payload['val_target_max'] - self._payload['val_target_min'] # deviation_from_target_range = None # # below ? # if self._payload['val_num'] < self._payload['val_target_min']: # deviation_from_target_range = self._payload['val_target_min'] - self._payload['val_num'] # # above ? # elif self._payload['val_num'] > self._payload['val_target_max']: # deviation_from_target_range = self._payload['val_num'] - self._payload['val_target_max'] # if deviation_from_target_range is None: # try: # times_deviation = deviation_from_target_range / normal_width # except ZeroDivisionError: # times_deviation = None # if times_deviation is not None: # if times_deviation < 10: # tt += u' (%s)\n' % _(u'deviates by %.1f times of the target range') % times_deviation # else: # tt += u' (%s)\n' % _(u'deviates by %.0f times of the target range') % times_deviation return tt #-------------------------------------------------------- def __format_review(self, date_format:str='%Y %b %d %H:%M') -> str: if self._payload['reviewed']: review = self._payload['last_reviewed'].strftime(date_format) else: review = _('not yet') tt = _('Signed (%(sig_hand)s): %(reviewed)s\n') % ({ 'sig_hand': gmTools.u_writing_hand, 'reviewed': review }) tt += ' ' + _('Responsible clinician: %s\n') % gmTools.bool2subst ( self._payload['you_are_responsible'], _('you'), self._payload['responsible_reviewer'] ) if self._payload['reviewed']: tt += ' ' + _('Last reviewer: %(reviewer)s\n') % ({ 'reviewer': gmTools.bool2subst ( self._payload['review_by_you'], _('you'), gmTools.coalesce(self._payload['last_reviewer'], '?') ) }) tt += ' ' + _(' Technically abnormal: %(abnormal)s\n') % ({ 'abnormal': gmTools.bool2subst ( self._payload['is_technically_abnormal'], _('yes'), _('no'), '?' ) }) tt += ' ' + _(' Clinically relevant: %(relevant)s\n') % ({ 'relevant': gmTools.bool2subst ( self._payload['is_clinically_relevant'], _('yes'), _('no'), '?' ) }) if self._payload['review_comment']: tt += ' ' + _(' Comment: %s\n') % self._payload['review_comment'].strip() tt += '\n' return tt #-------------------------------------------------------- def format(self, with_review=True, with_evaluation=True, with_ranges=True, with_episode=True, with_type_details=True, with_source_data=False, date_format='%Y %b %d %H:%M'): # FIXME: add battery, request details # header tt = _('Result from %s \n') % self._payload['clin_when'].strftime(date_format) # basics tt += ' ' + _('Type: "%(name)s" (%(abbr)s) [#%(pk_type)s]\n') % ({ 'name': self._payload['name_tt'], 'abbr': self._payload['abbrev_tt'], 'pk_type': self._payload['pk_test_type'] }) if self.is_long_text: sso = gmTools.u_superscript_one else: sso = '' tt += ' ' + _('%(sso)sResult: %(val)s%(unit)s%(ind)s [#%(pk_result)s]\n') % ({ 'sso': sso, 'val': self._payload['unified_val'], 'unit': gmTools.coalesce(self._payload['val_unit'], '', ' %s'), 'ind': gmTools.coalesce(self._payload['abnormality_indicator'], '', ' (%s)'), 'pk_result': self._payload['pk_test_result'] }) if self._payload['status']: try: stat = HL7_RESULT_STATI[self._payload['status']] except KeyError: stat = self._payload['status'] tt += ' ' + _('Status: %s\n') % stat if self._payload['val_grouping']: tt += ' ' + _('Grouping: %s\n') % self._payload['val_grouping'] if with_evaluation: tt += self.__format_evaluation() tmp = ('%s%s' % ( gmTools.coalesce(self._payload['name_test_org'], ''), gmTools.coalesce(self._payload['contact_test_org'], '', ' (%s)'), )).strip() if tmp != '': tt += ' ' + _('Source: %s\n') % tmp tt += '\n' if self._payload['note_test_org'] is not None: tt += ' ' + gmTools.u_superscript_one + _('Lab comment: %s\n') % _('\n Lab comment: ').join(self._payload['note_test_org'].split('\n')) if self._payload['comment'] is not None: tt += ' ' + gmTools.u_superscript_one + _('Praxis comment: %s\n') % _('\n Praxis comment: ').join(self._payload['comment'].split('\n')) if with_ranges: tt += gmTools.coalesce(self.formatted_normal_range, '', ' ' + _('Standard normal range: %s\n')) tt += gmTools.coalesce(self.formatted_clinical_range, '', ' ' + _('Clinical target range: %s\n')) tt += gmTools.coalesce(self._payload['norm_ref_group'], '', ' ' + _('Reference group: %s\n')) # metadata if with_episode: tt += ' ' + _('Episode: %s\n') % self._payload['episode'] if self._payload['health_issue'] is not None: tt += ' ' + _('Issue: %s\n') % self._payload['health_issue'] if self._payload['material'] is not None: tt += ' ' + _('Material: %s\n') % self._payload['material'] if self._payload['material_detail'] is not None: tt += ' ' + _('Details: %s\n') % self._payload['material_detail'] tt += '\n' if with_review: tt += self.__format_review(date_format = date_format) # type if with_type_details: has_details = None not in [self._payload['comment_tt'], self._payload['pk_meta_test_type'], self._payload['comment_meta']] if has_details: tt += _('Test type details:\n') if self._payload['comment_tt'] is not None: tt += ' ' + _('Type comment: %s\n') % _('\n Type comment:').join(self._payload['comment_tt'].split('\n')) if self._payload['pk_meta_test_type'] is not None: tt += ' ' + _('Aggregated (%s) under: %s (%s) [#%s]\n') % ( gmTools.u_sum, self._payload['name_meta'], self._payload['abbrev_meta'], self._payload['pk_meta_test_type'] ) if self._payload['comment_meta'] is not None: tt += ' ' + _('Group comment: %s\n') % _('\n Group comment: ').join(self._payload['comment_meta'].split('\n')) if has_details: tt += '\n' if with_source_data: if self._payload['source_data'] is not None: tt += _('Source data:\n') tt += ' ' + self._payload['source_data'] tt += '\n\n' if with_review: tt += _('Revisions: %(row_ver)s, last %(mod_when)s by %(mod_by)s.') % ({ 'row_ver': self._payload['row_version'], 'mod_when': self._payload['modified_when'].strftime(date_format), 'mod_by': self._payload['modified_by'] }) return tt #-------------------------------------------------------- def _get_has_normal_min_or_max(self): return ( self._payload['val_normal_min'] is not None ) or ( self._payload['val_normal_max'] is not None ) has_normal_min_or_max = property(_get_has_normal_min_or_max) #-------------------------------------------------------- def _get_normal_min_max(self): has_range_info = ( self._payload['val_normal_min'] is not None ) or ( self._payload['val_normal_max'] is not None ) if has_range_info is False: return None return '%s - %s' % ( gmTools.coalesce(self._payload['val_normal_min'], '?'), gmTools.coalesce(self._payload['val_normal_max'], '?') ) normal_min_max = property(_get_normal_min_max) #-------------------------------------------------------- def _get_formatted_normal_range(self): has_numerical_range = ( self._payload['val_normal_min'] is not None ) or ( self._payload['val_normal_max'] is not None ) if has_numerical_range: numerical_range = '%s - %s' % ( gmTools.coalesce(self._payload['val_normal_min'], '?'), gmTools.coalesce(self._payload['val_normal_max'], '?') ) else: numerical_range = '' textual_range = gmTools.coalesce ( self._payload['val_normal_range'], '', gmTools.bool2subst ( has_numerical_range, ' / %s', '%s' ) ) range_info = '%s%s' % (numerical_range, textual_range) if range_info == '': return None return range_info formatted_normal_range = property(_get_formatted_normal_range) #-------------------------------------------------------- def _get_has_clinical_min_or_max(self): return ( self._payload['val_target_min'] is not None ) or ( self._payload['val_target_max'] is not None ) has_clinical_min_or_max = property(_get_has_clinical_min_or_max) #-------------------------------------------------------- def _get_clinical_min_max(self): has_range_info = ( self._payload['val_target_min'] is not None ) or ( self._payload['val_target_max'] is not None ) if has_range_info is False: return None return '%s - %s' % ( gmTools.coalesce(self._payload['val_target_min'], '?'), gmTools.coalesce(self._payload['val_target_max'], '?') ) clinical_min_max = property(_get_clinical_min_max) #-------------------------------------------------------- def _get_formatted_clinical_range(self): has_numerical_range = ( self._payload['val_target_min'] is not None ) or ( self._payload['val_target_max'] is not None ) if has_numerical_range: numerical_range = '%s - %s' % ( gmTools.coalesce(self._payload['val_target_min'], '?'), gmTools.coalesce(self._payload['val_target_max'], '?') ) else: numerical_range = '' textual_range = gmTools.coalesce ( self._payload['val_target_range'], '', gmTools.bool2subst ( has_numerical_range, ' / %s', '%s' ) ) range_info = '%s%s' % (numerical_range, textual_range) if range_info == '': return None return range_info formatted_clinical_range = property(_get_formatted_clinical_range) #-------------------------------------------------------- def _get_temporally_closest_normal_range(self): """Returns the closest test result which does have normal range information.""" if self._payload['val_normal_min'] is not None: return self if self._payload['val_normal_max'] is not None: return self if self._payload['val_normal_range'] is not None: return self cmd = """ SELECT * from clin.v_test_results WHERE pk_type = %(pk_type)s AND val_unit = %(unit)s AND ( (val_normal_min IS NOT NULL) OR (val_normal_max IS NOT NULL) OR (val_normal_range IS NOT NULL) ) ORDER BY CASE WHEN clin_when > %(clin_when)s THEN clin_when - %(clin_when)s ELSE %(clin_when)s - clin_when END LIMIT 1""" args = { 'pk_type': self._payload['pk_test_type'], 'unit': self._payload['val_unit'], 'clin_when': self._payload['clin_when'] } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return cTestResult(row = {'pk_field': 'pk_test_result', 'data': rows[0]}) temporally_closest_normal_range = property(_get_temporally_closest_normal_range) #-------------------------------------------------------- def _get_formatted_range(self): has_normal_min_or_max = ( self._payload['val_normal_min'] is not None ) or ( self._payload['val_normal_max'] is not None ) if has_normal_min_or_max: normal_min_max = '%s - %s' % ( gmTools.coalesce(self._payload['val_normal_min'], '?'), gmTools.coalesce(self._payload['val_normal_max'], '?') ) has_clinical_min_or_max = ( self._payload['val_target_min'] is not None ) or ( self._payload['val_target_max'] is not None ) if has_clinical_min_or_max: clinical_min_max = '%s - %s' % ( gmTools.coalesce(self._payload['val_target_min'], '?'), gmTools.coalesce(self._payload['val_target_max'], '?') ) if has_clinical_min_or_max: return _('Target: %(clin_min_max)s%(clin_range)s') % ({ 'clin_min_max': clinical_min_max, 'clin_range': gmTools.coalesce ( self._payload['val_target_range'], '', gmTools.bool2subst ( has_clinical_min_or_max, ' / %s', '%s' ) ) }) if has_normal_min_or_max: return _('Norm: %(norm_min_max)s%(norm_range)s') % ({ 'norm_min_max': normal_min_max, 'norm_range': gmTools.coalesce ( self._payload['val_normal_range'], '', gmTools.bool2subst ( has_normal_min_or_max, ' / %s', '%s' ) ) }) if self._payload['val_target_range'] is not None: return _('Target: %s') % self._payload['val_target_range'], if self._payload['val_normal_range'] is not None: return _('Norm: %s') % self._payload['val_normal_range'] return None formatted_range = property(_get_formatted_range) #-------------------------------------------------------- def _get_test_type(self): return cMeasurementType(aPK_obj = self._payload['pk_test_type']) test_type = property(_get_test_type) #-------------------------------------------------------- def _get_is_considered_elevated(self): # 1) the user is right (review) if self._payload['is_technically_abnormal'] is False: return False # 2) the lab is right (result.abnormality_indicator) indicator = self._payload['abnormality_indicator'] if indicator is not None: indicator = indicator.strip() if indicator != '': if indicator.strip('+') == '': return True if indicator.strip('-') == '': return False # 3) non-numerical value ? if self._payload['val_num'] is None: return None # 4) the target range is right target_max = self._payload['val_target_max'] if target_max is not None: if target_max < self._payload['val_num']: return True # 4) the normal range is right normal_max = self._payload['val_normal_max'] if normal_max is not None: if normal_max < self._payload['val_num']: return True return None is_considered_elevated = property(_get_is_considered_elevated) #-------------------------------------------------------- def _get_is_considered_lowered(self): # 1) the user is right (review) if self._payload['is_technically_abnormal'] is False: return False # 2) the lab is right (result.abnormality_indicator) indicator = self._payload['abnormality_indicator'] if indicator is not None: indicator = indicator.strip() if indicator != '': if indicator.strip('+') == '': return False if indicator.strip('-') == '': return True # 3) non-numerical value ? if self._payload['val_num'] is None: return None # 4) the target range is right target_min = self._payload['val_target_min'] if target_min is not None: if target_min > self._payload['val_num']: return True # 4) the normal range is right normal_min = self._payload['val_normal_min'] if normal_min is not None: if normal_min > self._payload['val_num']: return True return None is_considered_lowered = property(_get_is_considered_lowered) #-------------------------------------------------------- def _get_is_considered_abnormal(self): if self.is_considered_lowered is True: return True if self.is_considered_elevated is True: return True if (self.is_considered_lowered is False) and (self.is_considered_elevated is False): return False return self._payload['is_technically_abnormal'] is_considered_abnormal = property(_get_is_considered_abnormal) #-------------------------------------------------------- def _set_reference_range(self, ref_range): """Parse reference range from string. Note: does NOT save the result. """ ref_range = ref_range.strip().replace(' ', '') is_range = regex.match(r'-{0,1}\d+[.,]{0,1}\d*--{0,1}\d+[.,]{0,1}\d*$', ref_range, regex.UNICODE) if is_range is not None: min_val = regex.match(r'-{0,1}\d+[.,]{0,1}\d*-', ref_range, regex.UNICODE).group(0).rstrip('-') success, min_val = gmTools.input2decimal(min_val) max_val = (regex.search(r'--{0,1}\d+[.,]{0,1}\d*$', ref_range, regex.UNICODE).group(0))[1:] success, max_val = gmTools.input2decimal(max_val) self['val_normal_min'] = min_val self['val_normal_max'] = max_val return if ref_range.startswith('<'): is_range = regex.match(r'<\d+[.,]{0,1}\d*$', ref_range, regex.UNICODE) if is_range is not None: max_val = ref_range[1:] success, max_val = gmTools.input2decimal(max_val) self['val_normal_min'] = 0 self['val_normal_max'] = max_val return if ref_range.startswith('<-'): is_range = regex.match(r'<-\d+[.,]{0,1}\d*$', ref_range, regex.UNICODE) if is_range is not None: max_val = ref_range[1:] success, max_val = gmTools.input2decimal(max_val) self['val_normal_min'] = None self['val_normal_max'] = max_val return if ref_range.startswith('>'): is_range = regex.match(r'>\d+[.,]{0,1}\d*$', ref_range, regex.UNICODE) if is_range is not None: min_val = ref_range[1:] success, min_val = gmTools.input2decimal(min_val) self['val_normal_min'] = min_val self['val_normal_max'] = None return if ref_range.startswith('>-'): is_range = regex.match(r'>-\d+[.,]{0,1}\d*$', ref_range, regex.UNICODE) if is_range is not None: min_val = ref_range[1:] success, min_val = gmTools.input2decimal(min_val) self['val_normal_min'] = min_val self['val_normal_max'] = 0 return self['val_normal_range'] = ref_range return reference_range = property(lambda x:x, _set_reference_range) #-------------------------------------------------------- def _get_formatted_abnormality_indicator(self): # 1) the user is right if self._payload['is_technically_abnormal'] is False: return '' # 2) the lab is right (result.abnormality_indicator) indicator = self._payload['abnormality_indicator'] if indicator is not None: indicator = indicator.strip() if indicator != '': return indicator # 3) non-numerical value ? then we can't know more if self._payload['val_num'] is None: return None # 4) the target range is right target_min = self._payload['val_target_min'] if target_min is not None: if target_min > self._payload['val_num']: return '-' target_max = self._payload['val_target_max'] if target_max is not None: if target_max < self._payload['val_num']: return '+' # 4) the normal range is right normal_min = self._payload['val_normal_min'] if normal_min is not None: if normal_min > self._payload['val_num']: return '-' normal_max = self._payload['val_normal_max'] if normal_max is not None: if normal_max < self._payload['val_num']: return '+' # reviewed, abnormal, but no indicator available if self._payload['is_technically_abnormal'] is True: return gmTools.u_plus_minus return None formatted_abnormality_indicator = property(_get_formatted_abnormality_indicator) #-------------------------------------------------------- def _get_is_long_text(self): if self._payload['val_alpha'] is None: return False lines = gmTools.strip_empty_lines(text = self._payload['val_alpha'], eol = '\n', return_list = True) if len(lines) > 4: return True return False is_long_text = property(_get_is_long_text) #-------------------------------------------------------- def _get_estimate_numeric_value_from_alpha(self): if self._payload['val_alpha'] is None: return None val = self._payload['val_alpha'].lstrip() if val[0] == '<': factor = decimal.Decimal(0.5) val = val[1:] elif val[0] == '>': factor = 2 val = val[1:] else: return None success, val = gmTools.input2decimal(initial = val) if not success: return None return val * factor estimate_numeric_value_from_alpha = property(_get_estimate_numeric_value_from_alpha) #-------------------------------------------------------- def set_review(self, technically_abnormal=None, clinically_relevant=None, comment=None, make_me_responsible=False): # FIXME: this is not concurrency safe if self._payload['reviewed']: self.__change_existing_review ( technically_abnormal = technically_abnormal, clinically_relevant = clinically_relevant, comment = comment ) else: # do not sign off unreviewed results if # NOTHING AT ALL is known about them if technically_abnormal is None: if clinically_relevant is None: comment = gmTools.none_if(comment, '', strip_string = True) if comment is None: if make_me_responsible is False: return True self.__set_new_review ( technically_abnormal = technically_abnormal, clinically_relevant = clinically_relevant, comment = comment ) if make_me_responsible is True: cmd = "SELECT pk FROM dem.staff WHERE db_user = current_user" rows = gmPG2.run_ro_queries(queries = [{'sql': cmd}]) self['pk_intended_reviewer'] = rows[0][0] self.save_payload() return self.refetch_payload() #-------------------------------------------------------- def get_adjacent_results(self, desired_earlier_results=1, desired_later_results=1, max_offset=None): if desired_earlier_results < 1: raise ValueError('<desired_earlier_results> must be > 0') if desired_later_results < 1: raise ValueError('<desired_later_results> must be > 0') args = { 'pat': self._payload['pk_patient'], 'ttyp': self._payload['pk_test_type'], 'tloinc': self._payload['loinc_tt'], 'mtyp': self._payload['pk_meta_test_type'], 'mloinc': self._payload['loinc_meta'], 'when': self._payload['clin_when'], 'offset': max_offset } WHERE = '((pk_test_type = %(ttyp)s) OR (loinc_tt = %(tloinc)s))' WHERE_meta = '((pk_meta_test_type = %(mtyp)s) OR (loinc_meta = %(mloinc)s))' if max_offset is not None: WHERE = WHERE + ' AND (clin_when BETWEEN (%(when)s - %(offset)s) AND (%(when)s + %(offset)s))' WHERE_meta = WHERE_meta + ' AND (clin_when BETWEEN (%(when)s - %(offset)s) AND (%(when)s + %(offset)s))' SQL = """ SELECT * FROM clin.v_test_results WHERE pk_patient = %%(pat)s AND clin_when %s %%(when)s AND %s ORDER BY clin_when LIMIT %s""" # get earlier results earlier_results = [] # by type cmd = SQL % ('<', WHERE, desired_earlier_results) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: earlier_results.extend([ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]) # by meta type ? missing_results = desired_earlier_results - len(earlier_results) if missing_results > 0: cmd = SQL % ('<', WHERE_meta, missing_results) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: earlier_results.extend([ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]) # get later results later_results = [] # by type cmd = SQL % ('>', WHERE, desired_later_results) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: later_results.extend([ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]) # by meta type ? missing_results = desired_later_results - len(later_results) if missing_results > 0: cmd = SQL % ('>', WHERE_meta, missing_results) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: later_results.extend([ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]) return earlier_results, later_results #-------------------------------------------------------- # internal API #-------------------------------------------------------- def __set_new_review(self, technically_abnormal=None, clinically_relevant=None, comment=None): """Add a review to a row. - if technically abnormal is not provided/None it will be set to True if the lab's indicator has a meaningful value - if clinically relevant is not provided/None it is set to whatever technically abnormal is """ if technically_abnormal is None: technically_abnormal = False if self._payload['abnormality_indicator'] is not None: if self._payload['abnormality_indicator'].strip() != '': technically_abnormal = True if clinically_relevant is None: clinically_relevant = technically_abnormal cmd = """ INSERT INTO clin.reviewed_test_results ( fk_reviewed_row, is_technically_abnormal, clinically_relevant, comment ) VALUES ( %(pk)s, %(abnormal)s, %(relevant)s, gm.nullify_empty_string(%(cmt)s) )""" args = { 'pk': self._payload['pk_test_result'], 'abnormal': technically_abnormal, 'relevant': clinically_relevant, 'cmt': comment } gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) #-------------------------------------------------------- def __change_existing_review(self, technically_abnormal=None, clinically_relevant=None, comment=None): """Change a review on a row. - if technically abnormal/clinically relevant are None they are not set """ args = { 'pk_result': self._payload['pk_test_result'], 'abnormal': technically_abnormal, 'relevant': clinically_relevant, 'cmt': comment } set_parts = [ 'fk_reviewer = (SELECT pk FROM dem.staff WHERE db_user = current_user)', 'comment = gm.nullify_empty_string(%(cmt)s)' ] if technically_abnormal is not None: set_parts.append('is_technically_abnormal = %(abnormal)s') if clinically_relevant is not None: set_parts.append('clinically_relevant = %(relevant)s') cmd = """ UPDATE clin.reviewed_test_results SET %s WHERE fk_reviewed_row = %%(pk_result)s """ % ',\n '.join(set_parts) gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
Represents one test result.
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Ancestors
Instance variables
prop clinical_min_max
-
Expand source code
def _get_clinical_min_max(self): has_range_info = ( self._payload['val_target_min'] is not None ) or ( self._payload['val_target_max'] is not None ) if has_range_info is False: return None return '%s - %s' % ( gmTools.coalesce(self._payload['val_target_min'], '?'), gmTools.coalesce(self._payload['val_target_max'], '?') )
prop estimate_numeric_value_from_alpha
-
Expand source code
def _get_estimate_numeric_value_from_alpha(self): if self._payload['val_alpha'] is None: return None val = self._payload['val_alpha'].lstrip() if val[0] == '<': factor = decimal.Decimal(0.5) val = val[1:] elif val[0] == '>': factor = 2 val = val[1:] else: return None success, val = gmTools.input2decimal(initial = val) if not success: return None return val * factor
prop formatted_abnormality_indicator
-
Expand source code
def _get_formatted_abnormality_indicator(self): # 1) the user is right if self._payload['is_technically_abnormal'] is False: return '' # 2) the lab is right (result.abnormality_indicator) indicator = self._payload['abnormality_indicator'] if indicator is not None: indicator = indicator.strip() if indicator != '': return indicator # 3) non-numerical value ? then we can't know more if self._payload['val_num'] is None: return None # 4) the target range is right target_min = self._payload['val_target_min'] if target_min is not None: if target_min > self._payload['val_num']: return '-' target_max = self._payload['val_target_max'] if target_max is not None: if target_max < self._payload['val_num']: return '+' # 4) the normal range is right normal_min = self._payload['val_normal_min'] if normal_min is not None: if normal_min > self._payload['val_num']: return '-' normal_max = self._payload['val_normal_max'] if normal_max is not None: if normal_max < self._payload['val_num']: return '+' # reviewed, abnormal, but no indicator available if self._payload['is_technically_abnormal'] is True: return gmTools.u_plus_minus return None
prop formatted_clinical_range
-
Expand source code
def _get_formatted_clinical_range(self): has_numerical_range = ( self._payload['val_target_min'] is not None ) or ( self._payload['val_target_max'] is not None ) if has_numerical_range: numerical_range = '%s - %s' % ( gmTools.coalesce(self._payload['val_target_min'], '?'), gmTools.coalesce(self._payload['val_target_max'], '?') ) else: numerical_range = '' textual_range = gmTools.coalesce ( self._payload['val_target_range'], '', gmTools.bool2subst ( has_numerical_range, ' / %s', '%s' ) ) range_info = '%s%s' % (numerical_range, textual_range) if range_info == '': return None return range_info
prop formatted_normal_range
-
Expand source code
def _get_formatted_normal_range(self): has_numerical_range = ( self._payload['val_normal_min'] is not None ) or ( self._payload['val_normal_max'] is not None ) if has_numerical_range: numerical_range = '%s - %s' % ( gmTools.coalesce(self._payload['val_normal_min'], '?'), gmTools.coalesce(self._payload['val_normal_max'], '?') ) else: numerical_range = '' textual_range = gmTools.coalesce ( self._payload['val_normal_range'], '', gmTools.bool2subst ( has_numerical_range, ' / %s', '%s' ) ) range_info = '%s%s' % (numerical_range, textual_range) if range_info == '': return None return range_info
prop formatted_range
-
Expand source code
def _get_formatted_range(self): has_normal_min_or_max = ( self._payload['val_normal_min'] is not None ) or ( self._payload['val_normal_max'] is not None ) if has_normal_min_or_max: normal_min_max = '%s - %s' % ( gmTools.coalesce(self._payload['val_normal_min'], '?'), gmTools.coalesce(self._payload['val_normal_max'], '?') ) has_clinical_min_or_max = ( self._payload['val_target_min'] is not None ) or ( self._payload['val_target_max'] is not None ) if has_clinical_min_or_max: clinical_min_max = '%s - %s' % ( gmTools.coalesce(self._payload['val_target_min'], '?'), gmTools.coalesce(self._payload['val_target_max'], '?') ) if has_clinical_min_or_max: return _('Target: %(clin_min_max)s%(clin_range)s') % ({ 'clin_min_max': clinical_min_max, 'clin_range': gmTools.coalesce ( self._payload['val_target_range'], '', gmTools.bool2subst ( has_clinical_min_or_max, ' / %s', '%s' ) ) }) if has_normal_min_or_max: return _('Norm: %(norm_min_max)s%(norm_range)s') % ({ 'norm_min_max': normal_min_max, 'norm_range': gmTools.coalesce ( self._payload['val_normal_range'], '', gmTools.bool2subst ( has_normal_min_or_max, ' / %s', '%s' ) ) }) if self._payload['val_target_range'] is not None: return _('Target: %s') % self._payload['val_target_range'], if self._payload['val_normal_range'] is not None: return _('Norm: %s') % self._payload['val_normal_range'] return None
prop has_clinical_min_or_max
-
Expand source code
def _get_has_clinical_min_or_max(self): return ( self._payload['val_target_min'] is not None ) or ( self._payload['val_target_max'] is not None )
prop has_normal_min_or_max
-
Expand source code
def _get_has_normal_min_or_max(self): return ( self._payload['val_normal_min'] is not None ) or ( self._payload['val_normal_max'] is not None )
prop is_considered_abnormal
-
Expand source code
def _get_is_considered_abnormal(self): if self.is_considered_lowered is True: return True if self.is_considered_elevated is True: return True if (self.is_considered_lowered is False) and (self.is_considered_elevated is False): return False return self._payload['is_technically_abnormal']
prop is_considered_elevated
-
Expand source code
def _get_is_considered_elevated(self): # 1) the user is right (review) if self._payload['is_technically_abnormal'] is False: return False # 2) the lab is right (result.abnormality_indicator) indicator = self._payload['abnormality_indicator'] if indicator is not None: indicator = indicator.strip() if indicator != '': if indicator.strip('+') == '': return True if indicator.strip('-') == '': return False # 3) non-numerical value ? if self._payload['val_num'] is None: return None # 4) the target range is right target_max = self._payload['val_target_max'] if target_max is not None: if target_max < self._payload['val_num']: return True # 4) the normal range is right normal_max = self._payload['val_normal_max'] if normal_max is not None: if normal_max < self._payload['val_num']: return True return None
prop is_considered_lowered
-
Expand source code
def _get_is_considered_lowered(self): # 1) the user is right (review) if self._payload['is_technically_abnormal'] is False: return False # 2) the lab is right (result.abnormality_indicator) indicator = self._payload['abnormality_indicator'] if indicator is not None: indicator = indicator.strip() if indicator != '': if indicator.strip('+') == '': return False if indicator.strip('-') == '': return True # 3) non-numerical value ? if self._payload['val_num'] is None: return None # 4) the target range is right target_min = self._payload['val_target_min'] if target_min is not None: if target_min > self._payload['val_num']: return True # 4) the normal range is right normal_min = self._payload['val_normal_min'] if normal_min is not None: if normal_min > self._payload['val_num']: return True return None
prop is_long_text
-
Expand source code
def _get_is_long_text(self): if self._payload['val_alpha'] is None: return False lines = gmTools.strip_empty_lines(text = self._payload['val_alpha'], eol = '\n', return_list = True) if len(lines) > 4: return True return False
prop normal_min_max
-
Expand source code
def _get_normal_min_max(self): has_range_info = ( self._payload['val_normal_min'] is not None ) or ( self._payload['val_normal_max'] is not None ) if has_range_info is False: return None return '%s - %s' % ( gmTools.coalesce(self._payload['val_normal_min'], '?'), gmTools.coalesce(self._payload['val_normal_max'], '?') )
prop reference_range
-
Expand source code
reference_range = property(lambda x:x, _set_reference_range)
prop temporally_closest_normal_range
-
Expand source code
def _get_temporally_closest_normal_range(self): """Returns the closest test result which does have normal range information.""" if self._payload['val_normal_min'] is not None: return self if self._payload['val_normal_max'] is not None: return self if self._payload['val_normal_range'] is not None: return self cmd = """ SELECT * from clin.v_test_results WHERE pk_type = %(pk_type)s AND val_unit = %(unit)s AND ( (val_normal_min IS NOT NULL) OR (val_normal_max IS NOT NULL) OR (val_normal_range IS NOT NULL) ) ORDER BY CASE WHEN clin_when > %(clin_when)s THEN clin_when - %(clin_when)s ELSE %(clin_when)s - clin_when END LIMIT 1""" args = { 'pk_type': self._payload['pk_test_type'], 'unit': self._payload['val_unit'], 'clin_when': self._payload['clin_when'] } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return cTestResult(row = {'pk_field': 'pk_test_result', 'data': rows[0]})
Returns the closest test result which does have normal range information.
prop test_type
-
Expand source code
def _get_test_type(self): return cMeasurementType(aPK_obj = self._payload['pk_test_type'])
Methods
def format_concisely(self, date_format='%Y %b %d', with_notes=True)
-
Expand source code
def format_concisely(self, date_format='%Y %b %d', with_notes=True): range_info = gmTools.coalesce ( self.formatted_clinical_range, self.formatted_normal_range ) review = gmTools.bool2subst ( self._payload['reviewed'], '', ' ' + gmTools.u_writing_hand, ' ' + gmTools.u_writing_hand ) txt = '%s %s: %s%s%s%s%s%s' % ( self._payload['clin_when'].strftime(date_format), self._payload['name_tt'], self._payload['unified_val'], gmTools.coalesce(self._payload['val_unit'], '', ' %s'), gmTools.coalesce(self._payload['abnormality_indicator'], '', ' %s'), gmTools.coalesce(range_info, '', ' (%s)'), gmTools.coalesce(self._payload['status'], '', ' [%s]')[:2], review ) if with_notes: txt += '\n' if self._payload['note_test_org'] is not None: txt += ' ' + _('Lab comment: %s\n') % _('\n Lab comment: ').join(self._payload['note_test_org'].split('\n')) if self._payload['comment'] is not None: txt += ' ' + _('Praxis comment: %s\n') % _('\n Praxis comment: ').join(self._payload['comment'].split('\n')) return txt.strip('\n')
def get_adjacent_results(self, desired_earlier_results=1, desired_later_results=1, max_offset=None)
-
Expand source code
def get_adjacent_results(self, desired_earlier_results=1, desired_later_results=1, max_offset=None): if desired_earlier_results < 1: raise ValueError('<desired_earlier_results> must be > 0') if desired_later_results < 1: raise ValueError('<desired_later_results> must be > 0') args = { 'pat': self._payload['pk_patient'], 'ttyp': self._payload['pk_test_type'], 'tloinc': self._payload['loinc_tt'], 'mtyp': self._payload['pk_meta_test_type'], 'mloinc': self._payload['loinc_meta'], 'when': self._payload['clin_when'], 'offset': max_offset } WHERE = '((pk_test_type = %(ttyp)s) OR (loinc_tt = %(tloinc)s))' WHERE_meta = '((pk_meta_test_type = %(mtyp)s) OR (loinc_meta = %(mloinc)s))' if max_offset is not None: WHERE = WHERE + ' AND (clin_when BETWEEN (%(when)s - %(offset)s) AND (%(when)s + %(offset)s))' WHERE_meta = WHERE_meta + ' AND (clin_when BETWEEN (%(when)s - %(offset)s) AND (%(when)s + %(offset)s))' SQL = """ SELECT * FROM clin.v_test_results WHERE pk_patient = %%(pat)s AND clin_when %s %%(when)s AND %s ORDER BY clin_when LIMIT %s""" # get earlier results earlier_results = [] # by type cmd = SQL % ('<', WHERE, desired_earlier_results) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: earlier_results.extend([ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]) # by meta type ? missing_results = desired_earlier_results - len(earlier_results) if missing_results > 0: cmd = SQL % ('<', WHERE_meta, missing_results) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: earlier_results.extend([ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]) # get later results later_results = [] # by type cmd = SQL % ('>', WHERE, desired_later_results) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: later_results.extend([ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]) # by meta type ? missing_results = desired_later_results - len(later_results) if missing_results > 0: cmd = SQL % ('>', WHERE_meta, missing_results) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) > 0: later_results.extend([ cTestResult(row = {'pk_field': 'pk_test_result', 'data': r}) for r in rows ]) return earlier_results, later_results
def set_review(self,
technically_abnormal=None,
clinically_relevant=None,
comment=None,
make_me_responsible=False)-
Expand source code
def set_review(self, technically_abnormal=None, clinically_relevant=None, comment=None, make_me_responsible=False): # FIXME: this is not concurrency safe if self._payload['reviewed']: self.__change_existing_review ( technically_abnormal = technically_abnormal, clinically_relevant = clinically_relevant, comment = comment ) else: # do not sign off unreviewed results if # NOTHING AT ALL is known about them if technically_abnormal is None: if clinically_relevant is None: comment = gmTools.none_if(comment, '', strip_string = True) if comment is None: if make_me_responsible is False: return True self.__set_new_review ( technically_abnormal = technically_abnormal, clinically_relevant = clinically_relevant, comment = comment ) if make_me_responsible is True: cmd = "SELECT pk FROM dem.staff WHERE db_user = current_user" rows = gmPG2.run_ro_queries(queries = [{'sql': cmd}]) self['pk_intended_reviewer'] = rows[0][0] self.save_payload() return self.refetch_payload()
Inherited members