Module Gnumed.business.gmLOINC

LOINC handling code.

http://loinc.org

license: GPL v2 or later

Functions

def format_loinc(loinc)
Expand source code
def format_loinc(loinc):
        data = loinc2data(loinc)
        if data is None:
                return None
        return gmTools.format_dict_like (
                dict(data),
                tabular = True,
                value_delimiters = None,
                values2ignore = [None, '']
        )
def get_version(license_fname='loinc_license.txt')
Expand source code
def get_version(license_fname='loinc_license.txt'):

        in_file = open(license_fname, mode = 'rt', encoding = 'utf-8-sig', errors = 'replace')

        version = None
        for line in in_file:
                if line.startswith(version_tag):
                        version = line[len(version_tag):].strip()
                        break

        in_file.close()
        return version
def loinc2data(loinc)
Expand source code
def loinc2data(loinc):
        cmd = 'SELECT * FROM ref.loinc WHERE code = %(loinc)s'
        args = {'loinc': loinc}
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
        if len(rows) == 0:
                return None
        return rows[0]
def loinc2term(loinc=None)
Expand source code
def loinc2term(loinc=None):

        # NOTE: will return [NULL] on no-match due to the coalesce()
        cmd = """
SELECT coalesce (
        (SELECT term
        FROM ref.v_coded_terms
        WHERE
                coding_system = 'LOINC'
                        AND
                code = %(loinc)s
                        AND
                lang = i18n.get_curr_lang()
        ),
        (SELECT term
        FROM ref.v_coded_terms
        WHERE
                coding_system = 'LOINC'
                        AND
                code = %(loinc)s
                        AND
                lang = 'en_EN'
        ),
        (SELECT term
        FROM ref.v_coded_terms
        WHERE
                coding_system = 'LOINC'
                        AND
                code = %(loinc)s
        )
)"""
        args = {'loinc': loinc}
        rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])

        if rows[0][0] is None:
                return []

        return [ r[0] for r in rows ]
def loinc_import(data_fname=None, license_fname=None, version=None, conn=None, lang='en_EN')
Expand source code
def loinc_import(data_fname=None, license_fname=None, version=None, conn=None, lang='en_EN'):

        if version is None:
                version = get_version(license_fname = license_fname)

        if version is None:
                raise ValueError('cannot detect LOINC version')

        _log.debug('importing LOINC version [%s]', version)

        # clean out staging area
        curs = conn.cursor()
        cmd = """DELETE FROM staging.loinc_staging"""
        gmPG2.run_rw_queries(link_obj = curs, queries = [{'sql': cmd}])
        curs.close()
        conn.commit()
        _log.debug('staging table emptied')

        # import data from csv file into staging table
        csv_file = open(data_fname, mode = 'rt', encoding = 'utf-8-sig', errors = 'replace')
        loinc_reader = gmTools.unicode_csv_reader(csv_file, delimiter = "\t", quotechar = '"')
        curs = conn.cursor()
        cmd = """INSERT INTO staging.loinc_staging values (%s%%s)""" % ('%s, ' * (len(loinc_fields) - 1))
        first = False
        for loinc_line in loinc_reader:
                if not first:
                        first = True
                        continue
                gmPG2.run_rw_queries(link_obj = curs, queries = [{'sql': cmd, 'args': loinc_line}])
        curs.close()
        conn.commit()
        csv_file.close()
        _log.debug('staging table loaded')

        # create data source record
        in_file = open(license_fname, mode = 'rt', encoding = 'utf-8-sig', errors = 'replace')
        desc = in_file.read()
        in_file.close()
        args = {'ver': version, 'desc': desc, 'url': origin_url, 'name_long': name_long, 'name_short': name_short, 'lang': lang}
        queries = [
                # insert if not existing
                {'args': args, 'sql': """
                        INSERT INTO ref.data_source (name_long, name_short, version) SELECT
                                %(name_long)s,
                                %(name_short)s,
                                %(ver)s
                        WHERE NOT EXISTS (
                                SELECT 1 FROM ref.data_source WHERE
                                        name_long = %(name_long)s
                                                AND
                                        name_short = %(name_short)s
                                                AND
                                        version = %(ver)s
                        )"""
                },
                # update non-unique fields
                {'args': args, 'sql': """
                        UPDATE ref.data_source SET
                                description = %(desc)s,
                                source = %(url)s,
                                lang = %(lang)s
                        WHERE
                                name_long = %(name_long)s
                                        AND
                                name_short = %(name_short)s
                                        AND
                                version = %(ver)s
                        """
                },
                # retrieve PK of data source
                {'args': args, 'sql': """SELECT pk FROM ref.data_source WHERE name_short = %(name_short)s AND version = %(ver)s"""}
        ]
        curs = conn.cursor()
        rows = gmPG2.run_rw_queries(link_obj = curs, queries = queries, return_data = True)
        data_src_pk = rows[0][0]
        curs.close()
        _log.debug('data source record created or updated, pk is #%s', data_src_pk)

        # import from staging table to real table
        args = {'src_pk': data_src_pk}
        queries = []
        queries.append ({
                'args': args,
                'sql': """
                        INSERT INTO ref.loinc (
                                fk_data_source, term, code
                        )
                        SELECT
                                %(src_pk)s,
                                coalesce (
                                        nullif(long_common_name, ''),
                                        (
                                                coalesce(nullif(component, '') || ':', '') ||
                                                coalesce(nullif(property, '') || ':', '') ||
                                                coalesce(nullif(time_aspect, '') || ':', '') ||
                                                coalesce(nullif(system, '') || ':', '') ||
                                                coalesce(nullif(scale_type, '') || ':', '') ||
                                                coalesce(nullif(method_type, '') || ':', '')
                                        )
                                ),
                                nullif(loinc_num, '')
                        FROM
                                staging.loinc_staging st_ls
                        WHERE NOT EXISTS (
                                SELECT 1 FROM ref.loinc r_l WHERE
                                        r_l.fk_data_source = %(src_pk)s
                                                AND
                                        r_l.code = nullif(st_ls.loinc_num, '')
                                                AND
                                        r_l.term =      coalesce (
                                                nullif(st_ls.long_common_name, ''),
                                                (
                                                        coalesce(nullif(st_ls.component, '') || ':', '') ||
                                                        coalesce(nullif(st_ls.property, '') || ':', '') ||
                                                        coalesce(nullif(st_ls.time_aspect, '') || ':', '') ||
                                                        coalesce(nullif(st_ls.system, '') || ':', '') ||
                                                        coalesce(nullif(st_ls.scale_type, '') || ':', '') ||
                                                        coalesce(nullif(st_ls.method_type, '') || ':', '')
                                                )
                                        )
                        )"""
        })
        queries.append ({
                'args': args,
                'sql': """
                        UPDATE ref.loinc SET
                                comment = nullif(st_ls.comments, ''),
                                component = nullif(st_ls.component, ''),
                                property = nullif(st_ls.property, ''),
                                time_aspect = nullif(st_ls.time_aspect, ''),
                                system = nullif(st_ls.system, ''),
                                scale_type = nullif(st_ls.scale_type, ''),
                                method_type = nullif(st_ls.method_type, ''),
                                related_names_1_old = nullif(st_ls.related_names_1_old, ''),
                                grouping_class = nullif(st_ls.class, ''),
                                loinc_internal_source = nullif(st_ls.source, ''),
                                dt_last_change = nullif(st_ls.dt_last_change, ''),
                                change_type = nullif(st_ls.change_type, ''),
                                answer_list = nullif(st_ls.answer_list, ''),
                                code_status = nullif(st_ls.status, ''),
                                maps_to = nullif(st_ls.map_to, ''),
                                scope = nullif(st_ls.scope, ''),
                                normal_range = nullif(st_ls.normal_range, ''),
                                ipcc_units = nullif(st_ls.ipcc_units, ''),
                                reference = nullif(st_ls.reference, ''),
                                exact_component_synonym = nullif(st_ls.exact_component_synonym, ''),
                                molar_mass = nullif(st_ls.molar_mass, ''),
                                grouping_class_type = nullif(st_ls.class_type, '')::smallint,
                                formula = nullif(st_ls.formula, ''),
                                species = nullif(st_ls.species, ''),
                                example_answers = nullif(st_ls.example_answers, ''),
                                acs_synonyms = nullif(st_ls.acs_synonyms, ''),
                                base_name = nullif(st_ls.base_name, ''),
                                final = nullif(st_ls.final, ''),
                                naa_ccr_id = nullif(st_ls.naa_ccr_id, ''),
                                code_table = nullif(st_ls.code_table, ''),
                                is_set_root = nullif(st_ls.is_set_root, '')::boolean,
                                panel_elements = nullif(st_ls.panel_elements, ''),
                                survey_question_text = nullif(st_ls.survey_question_text, ''),
                                survey_question_source = nullif(st_ls.survey_question_source, ''),
                                units_required = nullif(st_ls.units_required, ''),
                                submitted_units = nullif(st_ls.submitted_units, ''),
                                related_names_2 = nullif(st_ls.related_names_2, ''),
                                short_name = nullif(st_ls.short_name, ''),
                                order_obs = nullif(st_ls.order_obs, ''),
                                cdisc_common_tests = nullif(st_ls.cdisc_common_tests, ''),
                                hl7_field_subfield_id = nullif(st_ls.hl7_field_subfield_id, ''),
                                external_copyright_notice = nullif(st_ls.external_copyright_notice, ''),
                                example_units = nullif(st_ls.example_units, ''),
                                inpc_percentage = nullif(st_ls.inpc_percentage, ''),
                                long_common_name = nullif(st_ls.long_common_name, '')
                        FROM
                                staging.loinc_staging st_ls
                        WHERE
                                fk_data_source = %(src_pk)s
                                        AND
                                code = nullif(st_ls.loinc_num, '')
                                        AND
                                term = coalesce (
                                        nullif(st_ls.long_common_name, ''),
                                        (
                                                coalesce(nullif(st_ls.component, '') || ':', '') ||
                                                coalesce(nullif(st_ls.property, '') || ':', '') ||
                                                coalesce(nullif(st_ls.time_aspect, '') || ':', '') ||
                                                coalesce(nullif(st_ls.system, '') || ':', '') ||
                                                coalesce(nullif(st_ls.scale_type, '') || ':', '') ||
                                                coalesce(nullif(st_ls.method_type, '') || ':', '')
                                        )
                                )
                """
        })
        curs = conn.cursor()
        gmPG2.run_rw_queries(link_obj = curs, queries = queries)
        curs.close()
        conn.commit()
        _log.debug('transfer from staging table to real table done')

        # clean out staging area
        curs = conn.cursor()
        cmd = """DELETE FROM staging.loinc_staging"""
        gmPG2.run_rw_queries(link_obj = curs, queries = [{'sql': cmd}])
        curs.close()
        conn.commit()
        _log.debug('staging table emptied')

        return True
def map_field_names(data_fname='loinc_data.csv')
Expand source code
def map_field_names(data_fname='loinc_data.csv'):

        csv_file = open(data_fname, mode = 'rt', encoding = 'utf-8-sig', errors = 'replace')
        first_line = csv_file.readline()
        sniffer = csv.Sniffer()
        if sniffer.has_header(first_line):
                pass
def split_LOINCDBTXT(input_fname=None, data_fname=None, license_fname=None)
Expand source code
def split_LOINCDBTXT(input_fname=None, data_fname=None, license_fname=None):

        _log.debug('splitting LOINC source file [%s]', input_fname)

        if license_fname is None:
                license_fname = gmTools.get_unique_filename(prefix = 'loinc_license-', suffix = '.txt')
        _log.debug('LOINC header: %s', license_fname)

        if data_fname is None:
                data_fname = gmTools.get_unique_filename(prefix = 'loinc_data-', suffix = '.csv')
        _log.debug('LOINC data: %s', data_fname)

        loinc_file = open(input_fname, mode = 'rt', encoding = file_encoding, errors = 'replace')
        out_file = open(license_fname, mode = 'wt', encoding = 'utf8', errors = 'replace')

        for line in loinc_file:

                if license_delimiter in line:
                        out_file.write(line)
                        out_file.close()
                        out_file = open(data_fname, mode = 'wt', encoding = 'utf8', errors = 'replace')
                        continue

                out_file.write(line)

        out_file.close()

        return data_fname, license_fname

Classes

class cLOINCMatchProvider (queries=None, context=None)
Expand source code
class cLOINCMatchProvider(gmMatchProvider.cMatchProvider_SQL2):

        _pattern = regex.compile(r'^\D+\s+\D+$', regex.UNICODE)

        _normal_query = """
                SELECT DISTINCT ON (list_label)
                        data,
                        field_label,
                        list_label
                FROM (
                        (%s) UNION ALL (
                        %s)
                ) AS all_known_loinc""" % (
                        _SQL_LOINC_from_test_type,
                        _SQL_LOINC_from_any_coded_term
                )
#--                     %s) UNION ALL (
#--                     %s) UNION ALL (
#               %
#                       _SQL_LOINC_from_i18n_coded_term,
#                       _SQL_LOINC_from_en_EN_coded_term,

        #--------------------------------------------------------
        def getMatchesByPhrase(self, aFragment):
                """Return matches for aFragment at start of phrases."""

                self._queries = [cLOINCMatchProvider._normal_query + '\nORDER BY list_label\nLIMIT 75']
                return gmMatchProvider.cMatchProvider_SQL2.getMatchesByPhrase(self, aFragment)

        #--------------------------------------------------------
        def getMatchesByWord(self, aFragment):
                """Return matches for aFragment at start of words inside phrases."""

                if cLOINCMatchProvider._pattern.match(aFragment):
                        fragmentA, fragmentB = aFragment.split(' ', 1)
                        query1 = cLOINCMatchProvider._normal_query % {'fragment_condition': '~* %%(fragmentA)s'}
                        self._args['fragmentA'] = "( %s)|(^%s)" % (fragmentA, fragmentA)
                        query2 = cLOINCMatchProvider._normal_query % {'fragment_condition': '~* %%(fragmentB)s'}
                        self._args['fragmentB'] = "( %s)|(^%s)" % (fragmentB, fragmentB)
                        self._queries = ["SELECT * FROM (\n(%s\n) INTERSECT (%s)\n) AS intersected_matches\nORDER BY list_label\nLIMIT 75" % (query1, query2)]
                        return self._find_matches('dummy')

                self._queries = [cLOINCMatchProvider._normal_query + '\nORDER BY list_label\nLIMIT 75']
                return gmMatchProvider.cMatchProvider_SQL2.getMatchesByWord(self, aFragment)

        #--------------------------------------------------------
        def getMatchesBySubstr(self, aFragment):
                """Return matches for aFragment as a true substring."""

                if cLOINCMatchProvider._pattern.match(aFragment):
                        fragmentA, fragmentB = aFragment.split(' ', 1)
                        query1 = cLOINCMatchProvider._normal_query % {'fragment_condition': "ILIKE %%(fragmentA)s"}
                        self._args['fragmentA'] = '%%%s%%' % fragmentA
                        query2 = cLOINCMatchProvider._normal_query % {'fragment_condition': "ILIKE %%(fragmentB)s"}
                        self._args['fragmentB'] = '%%%s%%' % fragmentB
                        self._queries = ["SELECT * FROM (\n(%s\n) INTERSECT (%s)\n) AS intersected_matches\nORDER BY list_label\nLIMIT 75" % (query1, query2)]
                        return self._find_matches('dummy')

                self._queries = [cLOINCMatchProvider._normal_query + '\nORDER BY list_label\nLIMIT 75']
                return gmMatchProvider.cMatchProvider_SQL2.getMatchesBySubstr(self, aFragment)

Match provider which searches matches in possibly several database tables.

queries: - a list of unicode strings - each string is a query - each string must contain: "… WHERE %(fragment_condition)s …" - each string can contain in the where clause: "… %()s …" - each query must return (data, list_label, field_label)

context definitions to be used in the queries, example: {'ctxt_key1': {'where_part': 'AND country = %(country)s', 'placeholder': 'country'}}

client code using .set_context() must use the 'placeholder': /.set_context('country', 'Germany')

full example query:

    query = u" " "
            SELECT DISTINCT ON (list_label)
                    pk_encounter
                            AS data,
                    to_char(started, 'YYYY Mon DD (HH24:MI)') || ': ' || l10n_type || ' [#' || pk_encounter || ']'
                            AS list_label,
                    to_char(started, 'YYYY Mon DD') || ': ' || l10n_type
                            AS field_label
            FROM
                    clin.v_pat_encounters
            WHERE
                    (
                            l10n_type %(fragment_condition)s
                                    OR
                            type %(fragment_condition)s
                    )       %(ctxt_patient)s
            ORDER BY
                    list_label
            LIMIT
                    30
    " " "
    context = {'ctxt_patient': {
            'where_part': u'AND pk_patient = %(PLACEHOLDER)s',
            'placeholder': u'PLACEHOLDER'
    }}
    self.mp = gmMatchProvider.cMatchProvider_SQL2(queries = query, context = context)
    self.set_context(context = 'PLACEHOLDER', val = '<THE VALUE>')

_SQL_data2match: SQL to retrieve a match by, say, primary key wherein the only keyword argument is 'pk'

Ancestors

Inherited members