Module Gnumed.business.gmLOINC
LOINC handling code.
license: GPL v2 or later
Functions
def format_loinc(loinc)
-
Expand source code
def format_loinc(loinc): data = loinc2data(loinc) if data is None: return None return gmTools.format_dict_like ( dict(data), tabular = True, value_delimiters = None, values2ignore = [None, ''] )
def get_version(license_fname='loinc_license.txt')
-
Expand source code
def get_version(license_fname='loinc_license.txt'): in_file = open(license_fname, mode = 'rt', encoding = 'utf-8-sig', errors = 'replace') version = None for line in in_file: if line.startswith(version_tag): version = line[len(version_tag):].strip() break in_file.close() return version
def loinc2data(loinc)
-
Expand source code
def loinc2data(loinc): cmd = 'SELECT * FROM ref.loinc WHERE code = %(loinc)s' args = {'loinc': loinc} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return rows[0]
def loinc2term(loinc=None)
-
Expand source code
def loinc2term(loinc=None): # NOTE: will return [NULL] on no-match due to the coalesce() cmd = """ SELECT coalesce ( (SELECT term FROM ref.v_coded_terms WHERE coding_system = 'LOINC' AND code = %(loinc)s AND lang = i18n.get_curr_lang() ), (SELECT term FROM ref.v_coded_terms WHERE coding_system = 'LOINC' AND code = %(loinc)s AND lang = 'en_EN' ), (SELECT term FROM ref.v_coded_terms WHERE coding_system = 'LOINC' AND code = %(loinc)s ) )""" args = {'loinc': loinc} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if rows[0][0] is None: return [] return [ r[0] for r in rows ]
def loinc_import(data_fname=None, license_fname=None, version=None, conn=None, lang='en_EN')
-
Expand source code
def loinc_import(data_fname=None, license_fname=None, version=None, conn=None, lang='en_EN'): if version is None: version = get_version(license_fname = license_fname) if version is None: raise ValueError('cannot detect LOINC version') _log.debug('importing LOINC version [%s]', version) # clean out staging area curs = conn.cursor() cmd = """DELETE FROM staging.loinc_staging""" gmPG2.run_rw_queries(link_obj = curs, queries = [{'sql': cmd}]) curs.close() conn.commit() _log.debug('staging table emptied') # import data from csv file into staging table csv_file = open(data_fname, mode = 'rt', encoding = 'utf-8-sig', errors = 'replace') loinc_reader = gmTools.unicode_csv_reader(csv_file, delimiter = "\t", quotechar = '"') curs = conn.cursor() cmd = """INSERT INTO staging.loinc_staging values (%s%%s)""" % ('%s, ' * (len(loinc_fields) - 1)) first = False for loinc_line in loinc_reader: if not first: first = True continue gmPG2.run_rw_queries(link_obj = curs, queries = [{'sql': cmd, 'args': loinc_line}]) curs.close() conn.commit() csv_file.close() _log.debug('staging table loaded') # create data source record in_file = open(license_fname, mode = 'rt', encoding = 'utf-8-sig', errors = 'replace') desc = in_file.read() in_file.close() args = {'ver': version, 'desc': desc, 'url': origin_url, 'name_long': name_long, 'name_short': name_short, 'lang': lang} queries = [ # insert if not existing {'args': args, 'sql': """ INSERT INTO ref.data_source (name_long, name_short, version) SELECT %(name_long)s, %(name_short)s, %(ver)s WHERE NOT EXISTS ( SELECT 1 FROM ref.data_source WHERE name_long = %(name_long)s AND name_short = %(name_short)s AND version = %(ver)s )""" }, # update non-unique fields {'args': args, 'sql': """ UPDATE ref.data_source SET description = %(desc)s, source = %(url)s, lang = %(lang)s WHERE name_long = %(name_long)s AND name_short = %(name_short)s AND version = %(ver)s """ }, # retrieve PK of data source {'args': args, 'sql': """SELECT pk FROM ref.data_source WHERE name_short = %(name_short)s AND version = %(ver)s"""} ] curs = conn.cursor() rows = gmPG2.run_rw_queries(link_obj = curs, queries = queries, return_data = True) data_src_pk = rows[0][0] curs.close() _log.debug('data source record created or updated, pk is #%s', data_src_pk) # import from staging table to real table args = {'src_pk': data_src_pk} queries = [] queries.append ({ 'args': args, 'sql': """ INSERT INTO ref.loinc ( fk_data_source, term, code ) SELECT %(src_pk)s, coalesce ( nullif(long_common_name, ''), ( coalesce(nullif(component, '') || ':', '') || coalesce(nullif(property, '') || ':', '') || coalesce(nullif(time_aspect, '') || ':', '') || coalesce(nullif(system, '') || ':', '') || coalesce(nullif(scale_type, '') || ':', '') || coalesce(nullif(method_type, '') || ':', '') ) ), nullif(loinc_num, '') FROM staging.loinc_staging st_ls WHERE NOT EXISTS ( SELECT 1 FROM ref.loinc r_l WHERE r_l.fk_data_source = %(src_pk)s AND r_l.code = nullif(st_ls.loinc_num, '') AND r_l.term = coalesce ( nullif(st_ls.long_common_name, ''), ( coalesce(nullif(st_ls.component, '') || ':', '') || coalesce(nullif(st_ls.property, '') || ':', '') || coalesce(nullif(st_ls.time_aspect, '') || ':', '') || coalesce(nullif(st_ls.system, '') || ':', '') || coalesce(nullif(st_ls.scale_type, '') || ':', '') || coalesce(nullif(st_ls.method_type, '') || ':', '') ) ) )""" }) queries.append ({ 'args': args, 'sql': """ UPDATE ref.loinc SET comment = nullif(st_ls.comments, ''), component = nullif(st_ls.component, ''), property = nullif(st_ls.property, ''), time_aspect = nullif(st_ls.time_aspect, ''), system = nullif(st_ls.system, ''), scale_type = nullif(st_ls.scale_type, ''), method_type = nullif(st_ls.method_type, ''), related_names_1_old = nullif(st_ls.related_names_1_old, ''), grouping_class = nullif(st_ls.class, ''), loinc_internal_source = nullif(st_ls.source, ''), dt_last_change = nullif(st_ls.dt_last_change, ''), change_type = nullif(st_ls.change_type, ''), answer_list = nullif(st_ls.answer_list, ''), code_status = nullif(st_ls.status, ''), maps_to = nullif(st_ls.map_to, ''), scope = nullif(st_ls.scope, ''), normal_range = nullif(st_ls.normal_range, ''), ipcc_units = nullif(st_ls.ipcc_units, ''), reference = nullif(st_ls.reference, ''), exact_component_synonym = nullif(st_ls.exact_component_synonym, ''), molar_mass = nullif(st_ls.molar_mass, ''), grouping_class_type = nullif(st_ls.class_type, '')::smallint, formula = nullif(st_ls.formula, ''), species = nullif(st_ls.species, ''), example_answers = nullif(st_ls.example_answers, ''), acs_synonyms = nullif(st_ls.acs_synonyms, ''), base_name = nullif(st_ls.base_name, ''), final = nullif(st_ls.final, ''), naa_ccr_id = nullif(st_ls.naa_ccr_id, ''), code_table = nullif(st_ls.code_table, ''), is_set_root = nullif(st_ls.is_set_root, '')::boolean, panel_elements = nullif(st_ls.panel_elements, ''), survey_question_text = nullif(st_ls.survey_question_text, ''), survey_question_source = nullif(st_ls.survey_question_source, ''), units_required = nullif(st_ls.units_required, ''), submitted_units = nullif(st_ls.submitted_units, ''), related_names_2 = nullif(st_ls.related_names_2, ''), short_name = nullif(st_ls.short_name, ''), order_obs = nullif(st_ls.order_obs, ''), cdisc_common_tests = nullif(st_ls.cdisc_common_tests, ''), hl7_field_subfield_id = nullif(st_ls.hl7_field_subfield_id, ''), external_copyright_notice = nullif(st_ls.external_copyright_notice, ''), example_units = nullif(st_ls.example_units, ''), inpc_percentage = nullif(st_ls.inpc_percentage, ''), long_common_name = nullif(st_ls.long_common_name, '') FROM staging.loinc_staging st_ls WHERE fk_data_source = %(src_pk)s AND code = nullif(st_ls.loinc_num, '') AND term = coalesce ( nullif(st_ls.long_common_name, ''), ( coalesce(nullif(st_ls.component, '') || ':', '') || coalesce(nullif(st_ls.property, '') || ':', '') || coalesce(nullif(st_ls.time_aspect, '') || ':', '') || coalesce(nullif(st_ls.system, '') || ':', '') || coalesce(nullif(st_ls.scale_type, '') || ':', '') || coalesce(nullif(st_ls.method_type, '') || ':', '') ) ) """ }) curs = conn.cursor() gmPG2.run_rw_queries(link_obj = curs, queries = queries) curs.close() conn.commit() _log.debug('transfer from staging table to real table done') # clean out staging area curs = conn.cursor() cmd = """DELETE FROM staging.loinc_staging""" gmPG2.run_rw_queries(link_obj = curs, queries = [{'sql': cmd}]) curs.close() conn.commit() _log.debug('staging table emptied') return True
def map_field_names(data_fname='loinc_data.csv')
-
Expand source code
def map_field_names(data_fname='loinc_data.csv'): csv_file = open(data_fname, mode = 'rt', encoding = 'utf-8-sig', errors = 'replace') first_line = csv_file.readline() sniffer = csv.Sniffer() if sniffer.has_header(first_line): pass
def split_LOINCDBTXT(input_fname=None, data_fname=None, license_fname=None)
-
Expand source code
def split_LOINCDBTXT(input_fname=None, data_fname=None, license_fname=None): _log.debug('splitting LOINC source file [%s]', input_fname) if license_fname is None: license_fname = gmTools.get_unique_filename(prefix = 'loinc_license-', suffix = '.txt') _log.debug('LOINC header: %s', license_fname) if data_fname is None: data_fname = gmTools.get_unique_filename(prefix = 'loinc_data-', suffix = '.csv') _log.debug('LOINC data: %s', data_fname) loinc_file = open(input_fname, mode = 'rt', encoding = file_encoding, errors = 'replace') out_file = open(license_fname, mode = 'wt', encoding = 'utf8', errors = 'replace') for line in loinc_file: if license_delimiter in line: out_file.write(line) out_file.close() out_file = open(data_fname, mode = 'wt', encoding = 'utf8', errors = 'replace') continue out_file.write(line) out_file.close() return data_fname, license_fname
Classes
class cLOINCMatchProvider (queries=None, context=None)
-
Expand source code
class cLOINCMatchProvider(gmMatchProvider.cMatchProvider_SQL2): _pattern = regex.compile(r'^\D+\s+\D+$', regex.UNICODE) _normal_query = """ SELECT DISTINCT ON (list_label) data, field_label, list_label FROM ( (%s) UNION ALL ( %s) ) AS all_known_loinc""" % ( _SQL_LOINC_from_test_type, _SQL_LOINC_from_any_coded_term ) #-- %s) UNION ALL ( #-- %s) UNION ALL ( # % # _SQL_LOINC_from_i18n_coded_term, # _SQL_LOINC_from_en_EN_coded_term, #-------------------------------------------------------- def getMatchesByPhrase(self, aFragment): """Return matches for aFragment at start of phrases.""" self._queries = [cLOINCMatchProvider._normal_query + '\nORDER BY list_label\nLIMIT 75'] return gmMatchProvider.cMatchProvider_SQL2.getMatchesByPhrase(self, aFragment) #-------------------------------------------------------- def getMatchesByWord(self, aFragment): """Return matches for aFragment at start of words inside phrases.""" if cLOINCMatchProvider._pattern.match(aFragment): fragmentA, fragmentB = aFragment.split(' ', 1) query1 = cLOINCMatchProvider._normal_query % {'fragment_condition': '~* %%(fragmentA)s'} self._args['fragmentA'] = "( %s)|(^%s)" % (fragmentA, fragmentA) query2 = cLOINCMatchProvider._normal_query % {'fragment_condition': '~* %%(fragmentB)s'} self._args['fragmentB'] = "( %s)|(^%s)" % (fragmentB, fragmentB) self._queries = ["SELECT * FROM (\n(%s\n) INTERSECT (%s)\n) AS intersected_matches\nORDER BY list_label\nLIMIT 75" % (query1, query2)] return self._find_matches('dummy') self._queries = [cLOINCMatchProvider._normal_query + '\nORDER BY list_label\nLIMIT 75'] return gmMatchProvider.cMatchProvider_SQL2.getMatchesByWord(self, aFragment) #-------------------------------------------------------- def getMatchesBySubstr(self, aFragment): """Return matches for aFragment as a true substring.""" if cLOINCMatchProvider._pattern.match(aFragment): fragmentA, fragmentB = aFragment.split(' ', 1) query1 = cLOINCMatchProvider._normal_query % {'fragment_condition': "ILIKE %%(fragmentA)s"} self._args['fragmentA'] = '%%%s%%' % fragmentA query2 = cLOINCMatchProvider._normal_query % {'fragment_condition': "ILIKE %%(fragmentB)s"} self._args['fragmentB'] = '%%%s%%' % fragmentB self._queries = ["SELECT * FROM (\n(%s\n) INTERSECT (%s)\n) AS intersected_matches\nORDER BY list_label\nLIMIT 75" % (query1, query2)] return self._find_matches('dummy') self._queries = [cLOINCMatchProvider._normal_query + '\nORDER BY list_label\nLIMIT 75'] return gmMatchProvider.cMatchProvider_SQL2.getMatchesBySubstr(self, aFragment)
Match provider which searches matches in possibly several database tables.
queries: - a list of unicode strings - each string is a query - each string must contain: "… WHERE
%(fragment_condition)s …" - each string can contain in the where clause: "… %( )s …" - each query must return (data, list_label, field_label) context definitions to be used in the queries, example: {'ctxt_key1': {'where_part': 'AND country = %(country)s', 'placeholder': 'country'}}
client code using .set_context() must use the 'placeholder':
/ .set_context('country', 'Germany') full example query:
query = u" " " SELECT DISTINCT ON (list_label) pk_encounter AS data, to_char(started, 'YYYY Mon DD (HH24:MI)') || ': ' || l10n_type || ' [#' || pk_encounter || ']' AS list_label, to_char(started, 'YYYY Mon DD') || ': ' || l10n_type AS field_label FROM clin.v_pat_encounters WHERE ( l10n_type %(fragment_condition)s OR type %(fragment_condition)s ) %(ctxt_patient)s ORDER BY list_label LIMIT 30 " " " context = {'ctxt_patient': { 'where_part': u'AND pk_patient = %(PLACEHOLDER)s', 'placeholder': u'PLACEHOLDER' }} self.mp = gmMatchProvider.cMatchProvider_SQL2(queries = query, context = context) self.set_context(context = 'PLACEHOLDER', val = '<THE VALUE>')
_SQL_data2match: SQL to retrieve a match by, say, primary key wherein the only keyword argument is 'pk'
Ancestors
Inherited members