Module Gnumed.business.gmHL7

Some HL7 handling.

Functions

def extract_HL7_from_XML_CDATA(filename, xml_path, target_dir=None)
Expand source code
def extract_HL7_from_XML_CDATA(filename, xml_path, target_dir=None):

        _log.debug('extracting HL7 from CDATA of <%s> nodes in XML file [%s]', xml_path, filename)

        # sanity checks/setup
        try:
                open(filename).close()
                orig_dir = os.path.split(filename)[0]
                work_filename = gmTools.get_unique_filename(prefix = 'gm-x2h-%s-' % gmTools.fname_stem(filename), suffix = '.hl7')
                if target_dir is None:
                        target_dir = os.path.join(orig_dir, 'HL7')
                        done_dir = os.path.join(orig_dir, 'done')
                else:
                        done_dir = os.path.join(target_dir, 'done')
                _log.debug('target dir: %s', target_dir)
                gmTools.mkdir(target_dir)
                gmTools.mkdir(done_dir)
        except Exception:
                _log.exception('cannot setup unwrapping environment')
                return None

        hl7_xml = pyxml.ElementTree()
        try:
                hl7_xml.parse(filename)
        except pyxml.ParseError:
                _log.exception('cannot parse [%s]' % filename)
                return None
        nodes = hl7_xml.findall(xml_path)
        if len(nodes) == 0:
                _log.debug('no nodes found for data extraction')
                return None

        _log.debug('unwrapping HL7 from XML into [%s]', work_filename)
        hl7_file = open(work_filename, mode = 'wt', encoding = 'utf8', newline = '')            # universal newlines acceptance but no translation on output
        for node in nodes:
#               hl7_file.write(node.text.rstrip() + HL7_EOL)
                hl7_file.write(node.text + '')          # trick to make node.text unicode
        hl7_file.close()

        target_fname = os.path.join(target_dir, os.path.split(work_filename)[1])
        shutil.copy(work_filename, target_dir)
        shutil.move(filename, done_dir)

        return target_fname
def format_hl7_file(filename, skip_empty_fields=True, eol='\n ', return_filename=False, fix_hl7=True)
Expand source code
def format_hl7_file(filename, skip_empty_fields=True, eol='\n ', return_filename=False, fix_hl7=True):
        if fix_hl7:
                fixed_name = __fix_malformed_hl7_file(filename)
                hl7_file = open(fixed_name, mode = 'rt', encoding = 'utf-8-sig', newline = '')  # read universal but pass on untranslated
                source = '%s (<- %s)' % (fixed_name, filename)
        else:
                hl7_file = open(filename, mode = 'rt', encoding = 'utf-8-sig', newline = '')    # read universal but pass on untranslated
                source = filename
        output = format_hl7_message (
                message = hl7_file.read(1024 * 1024 * 5),               # 5 MB max
                skip_empty_fields = skip_empty_fields,
                eol = eol,
                source = source
        )
        hl7_file.close()

        if not return_filename:
                return output

        max_len = 120
        if eol is None:
                output = '\n '.join([ '%s: %s' % ((o[0] + (' ' * max_len))[:max_len], o[1]) for o in output ])

        out_name = gmTools.get_unique_filename(prefix = 'gm-formatted_hl7-', suffix = '.hl7')
        out_file = open(out_name, mode = 'wt', encoding = 'utf8')
        out_file.write(output)
        out_file.close()

        return out_name
def format_hl7_message(message=None, skip_empty_fields=True, eol='\n ', source=None)
Expand source code
def format_hl7_message(message=None, skip_empty_fields=True, eol='\n ', source=None):
        # a segment is a line starting with a type
        msg = pyhl7.parse(message)

        output = []
        if source is not None:
                output.append([_('HL7 Source'), '%s' % source])
        output.append([_('HL7 data size'), _('%s bytes') % len(message)])
        output.append([_('HL7 Message'), _(' %s segments (lines)%s') % (len(msg), gmTools.bool2subst(skip_empty_fields, _(', skipping empty fields'), ''))])

        max_len = 0
        for seg_idx in range(len(msg)):
                seg = msg[seg_idx]
                seg_type = seg[0][0]

                output.append([_('Segment #%s <%s>') % (seg_idx, seg_type), _('%s fields') % len(seg)])

                for field_idx in range(len(seg)):
                        field = seg[field_idx]
                        try:
                                label = HL7_field_labels[seg_type][field_idx]
                        except KeyError:
                                label = _('HL7 %s field') % seg_type

                        max_len = max(max_len, len(label))

                        if len(field) == 0:
                                if not skip_empty_fields:
                                        output.append(['%2s - %s' % (field_idx, label), _('<EMTPY>')])
                                continue
                        if (len(field) == 1) and (('%s' % field[0]).strip() == ''):
                                if not skip_empty_fields:
                                        output.append(['%2s - %s' % (field_idx, label), _('<EMTPY>')])
                                continue

                        content_lines = ('%s' % field).split(HL7_BRK)
                        output.append(['%2s - %s' % (field_idx, label), content_lines[0]])
                        for line in content_lines[1:]:
                                output.append(['', line])
                        #output.append([u'%2s - %s' % (field_idx, label), u'%s' % field])

        if eol is None:
                return output

        max_len += 7
        return eol.join([ '%s: %s' % ((o[0] + (' ' * max_len))[:max_len], o[1]) for o in output ])
def import_single_PID_hl7_file(filename)
Expand source code
def import_single_PID_hl7_file(filename):

        log_name = '%s.import.log' % filename
        import_logger = logging.FileHandler(log_name)
        import_logger.setLevel(logging.DEBUG)
        root_logger = logging.getLogger('')
        root_logger.addHandler(import_logger)
        _log.debug('log file: %s', log_name)

        success = True
        try:
                success = __import_single_PID_hl7_file(filename)
                if not success:
                        _log.error('error when importing single-PID/single-MSH file')
        except Exception:
                _log.exception('error when importing single-PID/single-MSH file')

        root_logger.removeHandler(import_logger)
        return success, log_name
def process_staged_single_PID_hl7_file(staged_item)
Expand source code
def process_staged_single_PID_hl7_file(staged_item):

        log_name = gmTools.get_unique_filename (
                prefix = 'gm-staged_hl7_import-',
                suffix = '.log'
        )
        import_logger = logging.FileHandler(log_name)
        import_logger.setLevel(logging.DEBUG)
        root_logger = logging.getLogger('')
        root_logger.addHandler(import_logger)
        _log.debug('log file: %s', log_name)

        if not staged_item.lock():
                _log.error('cannot lock staged data for HL7 import')
                root_logger.removeHandler(import_logger)
                return False, log_name

        _log.debug('reference ID of staged HL7 data: %s', staged_item['external_data_id'])

        filename = staged_item.save_to_file()
        _log.debug('unstaged HL7 data into: %s', filename)

        if staged_item['pk_identity'] is None:
                emr = None
        else:
                emr = gmPerson.cPatient(staged_item['pk_identity']).emr

        success = False
        try:
                success = __import_single_PID_hl7_file(filename, emr = emr)
                if success:
                        gmIncomingData.delete_incoming_data(pk_incoming_data = staged_item['pk_incoming_data'])
                        staged_item.unlock()
                        root_logger.removeHandler(import_logger)
                        return True, log_name
                _log.error('error when importing single-PID/single-MSH file')
        except Exception:
                _log.exception('error when importing single-PID/single-MSH file')

        if not success:
                staged_item['comment'] = _('failed import: %s\n') % gmDateTime.pydt_now_here().isoformat()
                staged_item['comment'] += '\n'
                staged_item['comment'] += ('-' * 80)
                staged_item['comment'] += '\n\n'
                log = open(log_name, mode = 'rt', encoding = 'utf-8-sig')
                staged_item['comment'] += log.read()
                log.close()
                staged_item['comment'] += '\n'
                staged_item['comment'] += ('-' * 80)
                staged_item['comment'] += '\n\n'
                staged_item['comment'] += format_hl7_file (
                        filename,
                        skip_empty_fields = True,
                        eol = '\n ',
                        return_filename = False
                )
                staged_item.save()

        staged_item.unlock()
        root_logger.removeHandler(import_logger)
        return success, log_name
def split_hl7_file(filename, target_dir=None, encoding='utf8')
Expand source code
def split_hl7_file(filename, target_dir=None, encoding='utf8'):
        """Multi-step processing of HL7 files.

        - input can be multi-MSH / multi-PID / partially malformed HL7
        - tries to fix oddities
        - splits by MSH
        - splits by PID into <target_dir>

        - needs write permissions in dir_of(filename)
        - moves HL7 files which were successfully split up into dir_of(filename)/done/

        - returns (True|False, list_of_PID_files)
        """
        local_log_name = gmTools.get_unique_filename (
                prefix = gmTools.fname_stem(filename) + '-',
                suffix = '.split.log'
        )
        local_logger = logging.FileHandler(local_log_name)
        local_logger.setLevel(logging.DEBUG)
        root_logger = logging.getLogger('')
        root_logger.addHandler(local_logger)
        _log.info('splitting HL7 file: %s', filename)
        _log.debug('log file: %s', local_log_name)

        # sanity checks/setup
        try:
                open(filename).close()
                orig_dir = os.path.split(filename)[0]
                done_dir = os.path.join(orig_dir, 'done')
                gmTools.mkdir(done_dir)
                error_dir = os.path.join(orig_dir, 'failed')
                gmTools.mkdir(error_dir)
                work_filename = gmTools.get_unique_filename(prefix = gmTools.fname_stem(filename) + '-', suffix = '.hl7')
                if target_dir is None:
                        target_dir = os.path.join(orig_dir, 'PID')
                _log.debug('target dir: %s', target_dir)
                gmTools.mkdir(target_dir)
        except Exception:
                _log.exception('cannot setup splitting environment')
                root_logger.removeHandler(local_logger)
                return False, None

        # split
        target_names = []
        try:
                shutil.copy(filename, work_filename)
                fixed_filename = __fix_malformed_hl7_file(work_filename, encoding = encoding)
                MSH_fnames = __split_hl7_file_by_MSH(fixed_filename, encoding)
                PID_fnames = []
                for MSH_fname in MSH_fnames:
                        PID_fnames.extend(__split_MSH_by_PID(MSH_fname))
                for PID_fname in PID_fnames:
                        shutil.move(PID_fname, target_dir)
                        target_names.append(os.path.join(target_dir, os.path.split(PID_fname)[1]))
        except Exception:
                _log.exception('cannot split HL7 file')
                for target_name in target_names:
                        try: os.remove(target_name)
                        except Exception: pass
                root_logger.removeHandler(local_logger)
                shutil.move(local_log_name, error_dir)
                return False, None

        _log.info('successfully split')
        root_logger.removeHandler(local_logger)
        try:
                shutil.move(filename, done_dir)
                shutil.move(local_log_name, done_dir)
        except shutil.Error:
                _log.exception('cannot move hl7 file or log file to holding area')
        return True, target_names

Multi-step processing of HL7 files.

  • input can be multi-MSH / multi-PID / partially malformed HL7
  • tries to fix oddities
  • splits by MSH
  • splits by PID into

  • needs write permissions in dir_of(filename)

  • moves HL7 files which were successfully split up into dir_of(filename)/done/

  • returns (True|False, list_of_PID_files)

def stage_single_PID_hl7_file(filename, source=None, encoding='utf8')
Expand source code
def stage_single_PID_hl7_file(filename, source=None, encoding='utf8'):
        """Multi-step processing of HL7 files.

        - input must be single-MSH / single-PID / normalized HL7

        - imports into clin.incoming_data

        - needs write permissions in dir_of(filename)
        - moves PID files which were successfully staged into dir_of(filename)/done/PID/
        """
        local_log_name = gmTools.get_unique_filename (
                prefix = gmTools.fname_stem(filename) + '-',
                suffix = '.stage.log'
        )
        local_logger = logging.FileHandler(local_log_name)
        local_logger.setLevel(logging.DEBUG)
        root_logger = logging.getLogger('')
        root_logger.addHandler(local_logger)
        _log.info('staging [%s] as unmatched incoming HL7%s', filename, gmTools.coalesce(source, '', ' (%s)'))
        _log.debug('log file: %s', local_log_name)

        # sanity checks/setup
        try:
                open(filename).close()
                orig_dir = os.path.split(filename)[0]
                done_dir = os.path.join(orig_dir, 'done')
                gmTools.mkdir(done_dir)
                error_dir = os.path.join(orig_dir, 'failed')
                gmTools.mkdir(error_dir)
        except Exception:
                _log.exception('cannot setup staging environment')
                root_logger.removeHandler(local_logger)
                return False

        # stage
        try:
                incoming = gmIncomingData.create_incoming_data('HL7%s' % gmTools.coalesce(source, '', ' (%s)'), filename)
                if incoming is None:
                        _log.error('cannot stage PID file: %s', filename)
                        root_logger.removeHandler(local_logger)
                        shutil.move(filename, error_dir)
                        shutil.move(local_log_name, error_dir)
                        return False
                incoming.update_data_from_file(fname = filename)
        except Exception:
                _log.exception('error staging PID file')
                root_logger.removeHandler(local_logger)
                shutil.move(filename, error_dir)
                shutil.move(local_log_name, error_dir)
                return False

        # set additional data
        MSH_file = open(filename, mode = 'rt', encoding = 'utf-8-sig', newline = '')
        raw_hl7 = MSH_file.read(1024 * 1024 * 5)        # 5 MB max
        MSH_file.close()
        shutil.move(filename, done_dir)
        incoming['comment'] = format_hl7_message (
                message = raw_hl7,
                skip_empty_fields = True,
                eol = '\n'
        )
        HL7 = pyhl7.parse(raw_hl7)
        del raw_hl7
        incoming['comment'] += '\n'
        incoming['comment'] += ('-' * 80)
        incoming['comment'] += '\n\n'
        log = open(local_log_name, mode = 'rt', encoding = 'utf-8-sig')
        incoming['comment'] += log.read()
        log.close()
        try:
                incoming['lastnames'] = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__lastname)
                incoming['firstnames'] = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__firstname)
                val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__middlename)
                if val is not None:
                        incoming['firstnames'] += ' '
                        incoming['firstnames'] += val
                val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__dob)
                if val is not None:
                        tmp = time.strptime(val, '%Y%m%d')
                        incoming['dob'] = pyDT.datetime(tmp.tm_year, tmp.tm_mon, tmp.tm_mday, tzinfo = gmDateTime.gmCurrentLocalTimezone)
                val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__gender)
                if val is not None:
                        incoming['gender'] = val
                incoming['external_data_id'] = filename
                #u'fk_patient_candidates',
                #       u'request_id',                                          # request ID as found in <data>
                #       u'postcode',
                #       u'other_info',                                          # other identifying info in .data
                #       u'requestor',                                           # Requestor of data (e.g. who ordered test results) if available in source data.
                #       u'fk_identity',
                #       u'comment',                                                     # a free text comment on this row, eg. why is it here, error logs etc
                #       u'fk_provider_disambiguated'            # The provider the data is relevant to.
        except Exception:
                _log.exception('cannot add more data')
        incoming.save()

        _log.info('successfully staged')
        root_logger.removeHandler(local_logger)
        shutil.move(local_log_name, done_dir)
        return True

Multi-step processing of HL7 files.

  • input must be single-MSH / single-PID / normalized HL7

  • imports into clin.incoming_data

  • needs write permissions in dir_of(filename)

  • moves PID files which were successfully staged into dir_of(filename)/done/PID/