Module Gnumed.business.gmHL7
Some HL7 handling.
Functions
def extract_HL7_from_XML_CDATA(filename, xml_path, target_dir=None)
-
Expand source code
def extract_HL7_from_XML_CDATA(filename, xml_path, target_dir=None): _log.debug('extracting HL7 from CDATA of <%s> nodes in XML file [%s]', xml_path, filename) # sanity checks/setup try: open(filename).close() orig_dir = os.path.split(filename)[0] work_filename = gmTools.get_unique_filename(prefix = 'gm-x2h-%s-' % gmTools.fname_stem(filename), suffix = '.hl7') if target_dir is None: target_dir = os.path.join(orig_dir, 'HL7') done_dir = os.path.join(orig_dir, 'done') else: done_dir = os.path.join(target_dir, 'done') _log.debug('target dir: %s', target_dir) gmTools.mkdir(target_dir) gmTools.mkdir(done_dir) except Exception: _log.exception('cannot setup unwrapping environment') return None hl7_xml = pyxml.ElementTree() try: hl7_xml.parse(filename) except pyxml.ParseError: _log.exception('cannot parse [%s]' % filename) return None nodes = hl7_xml.findall(xml_path) if len(nodes) == 0: _log.debug('no nodes found for data extraction') return None _log.debug('unwrapping HL7 from XML into [%s]', work_filename) hl7_file = open(work_filename, mode = 'wt', encoding = 'utf8', newline = '') # universal newlines acceptance but no translation on output for node in nodes: # hl7_file.write(node.text.rstrip() + HL7_EOL) hl7_file.write(node.text + '') # trick to make node.text unicode hl7_file.close() target_fname = os.path.join(target_dir, os.path.split(work_filename)[1]) shutil.copy(work_filename, target_dir) shutil.move(filename, done_dir) return target_fname
def format_hl7_file(filename, skip_empty_fields=True, eol='\n ', return_filename=False, fix_hl7=True)
-
Expand source code
def format_hl7_file(filename, skip_empty_fields=True, eol='\n ', return_filename=False, fix_hl7=True): if fix_hl7: fixed_name = __fix_malformed_hl7_file(filename) hl7_file = open(fixed_name, mode = 'rt', encoding = 'utf-8-sig', newline = '') # read universal but pass on untranslated source = '%s (<- %s)' % (fixed_name, filename) else: hl7_file = open(filename, mode = 'rt', encoding = 'utf-8-sig', newline = '') # read universal but pass on untranslated source = filename output = format_hl7_message ( message = hl7_file.read(1024 * 1024 * 5), # 5 MB max skip_empty_fields = skip_empty_fields, eol = eol, source = source ) hl7_file.close() if not return_filename: return output max_len = 120 if eol is None: output = '\n '.join([ '%s: %s' % ((o[0] + (' ' * max_len))[:max_len], o[1]) for o in output ]) out_name = gmTools.get_unique_filename(prefix = 'gm-formatted_hl7-', suffix = '.hl7') out_file = open(out_name, mode = 'wt', encoding = 'utf8') out_file.write(output) out_file.close() return out_name
def format_hl7_message(message=None, skip_empty_fields=True, eol='\n ', source=None)
-
Expand source code
def format_hl7_message(message=None, skip_empty_fields=True, eol='\n ', source=None): # a segment is a line starting with a type msg = pyhl7.parse(message) output = [] if source is not None: output.append([_('HL7 Source'), '%s' % source]) output.append([_('HL7 data size'), _('%s bytes') % len(message)]) output.append([_('HL7 Message'), _(' %s segments (lines)%s') % (len(msg), gmTools.bool2subst(skip_empty_fields, _(', skipping empty fields'), ''))]) max_len = 0 for seg_idx in range(len(msg)): seg = msg[seg_idx] seg_type = seg[0][0] output.append([_('Segment #%s <%s>') % (seg_idx, seg_type), _('%s fields') % len(seg)]) for field_idx in range(len(seg)): field = seg[field_idx] try: label = HL7_field_labels[seg_type][field_idx] except KeyError: label = _('HL7 %s field') % seg_type max_len = max(max_len, len(label)) if len(field) == 0: if not skip_empty_fields: output.append(['%2s - %s' % (field_idx, label), _('<EMTPY>')]) continue if (len(field) == 1) and (('%s' % field[0]).strip() == ''): if not skip_empty_fields: output.append(['%2s - %s' % (field_idx, label), _('<EMTPY>')]) continue content_lines = ('%s' % field).split(HL7_BRK) output.append(['%2s - %s' % (field_idx, label), content_lines[0]]) for line in content_lines[1:]: output.append(['', line]) #output.append([u'%2s - %s' % (field_idx, label), u'%s' % field]) if eol is None: return output max_len += 7 return eol.join([ '%s: %s' % ((o[0] + (' ' * max_len))[:max_len], o[1]) for o in output ])
def import_single_PID_hl7_file(filename)
-
Expand source code
def import_single_PID_hl7_file(filename): log_name = '%s.import.log' % filename import_logger = logging.FileHandler(log_name) import_logger.setLevel(logging.DEBUG) root_logger = logging.getLogger('') root_logger.addHandler(import_logger) _log.debug('log file: %s', log_name) success = True try: success = __import_single_PID_hl7_file(filename) if not success: _log.error('error when importing single-PID/single-MSH file') except Exception: _log.exception('error when importing single-PID/single-MSH file') root_logger.removeHandler(import_logger) return success, log_name
def process_staged_single_PID_hl7_file(staged_item)
-
Expand source code
def process_staged_single_PID_hl7_file(staged_item): log_name = gmTools.get_unique_filename ( prefix = 'gm-staged_hl7_import-', suffix = '.log' ) import_logger = logging.FileHandler(log_name) import_logger.setLevel(logging.DEBUG) root_logger = logging.getLogger('') root_logger.addHandler(import_logger) _log.debug('log file: %s', log_name) if not staged_item.lock(): _log.error('cannot lock staged data for HL7 import') root_logger.removeHandler(import_logger) return False, log_name _log.debug('reference ID of staged HL7 data: %s', staged_item['external_data_id']) filename = staged_item.save_to_file() _log.debug('unstaged HL7 data into: %s', filename) if staged_item['pk_identity'] is None: emr = None else: emr = gmPerson.cPatient(staged_item['pk_identity']).emr success = False try: success = __import_single_PID_hl7_file(filename, emr = emr) if success: gmIncomingData.delete_incoming_data(pk_incoming_data = staged_item['pk_incoming_data']) staged_item.unlock() root_logger.removeHandler(import_logger) return True, log_name _log.error('error when importing single-PID/single-MSH file') except Exception: _log.exception('error when importing single-PID/single-MSH file') if not success: staged_item['comment'] = _('failed import: %s\n') % gmDateTime.pydt_now_here().isoformat() staged_item['comment'] += '\n' staged_item['comment'] += ('-' * 80) staged_item['comment'] += '\n\n' log = open(log_name, mode = 'rt', encoding = 'utf-8-sig') staged_item['comment'] += log.read() log.close() staged_item['comment'] += '\n' staged_item['comment'] += ('-' * 80) staged_item['comment'] += '\n\n' staged_item['comment'] += format_hl7_file ( filename, skip_empty_fields = True, eol = '\n ', return_filename = False ) staged_item.save() staged_item.unlock() root_logger.removeHandler(import_logger) return success, log_name
def split_hl7_file(filename, target_dir=None, encoding='utf8')
-
Expand source code
def split_hl7_file(filename, target_dir=None, encoding='utf8'): """Multi-step processing of HL7 files. - input can be multi-MSH / multi-PID / partially malformed HL7 - tries to fix oddities - splits by MSH - splits by PID into <target_dir> - needs write permissions in dir_of(filename) - moves HL7 files which were successfully split up into dir_of(filename)/done/ - returns (True|False, list_of_PID_files) """ local_log_name = gmTools.get_unique_filename ( prefix = gmTools.fname_stem(filename) + '-', suffix = '.split.log' ) local_logger = logging.FileHandler(local_log_name) local_logger.setLevel(logging.DEBUG) root_logger = logging.getLogger('') root_logger.addHandler(local_logger) _log.info('splitting HL7 file: %s', filename) _log.debug('log file: %s', local_log_name) # sanity checks/setup try: open(filename).close() orig_dir = os.path.split(filename)[0] done_dir = os.path.join(orig_dir, 'done') gmTools.mkdir(done_dir) error_dir = os.path.join(orig_dir, 'failed') gmTools.mkdir(error_dir) work_filename = gmTools.get_unique_filename(prefix = gmTools.fname_stem(filename) + '-', suffix = '.hl7') if target_dir is None: target_dir = os.path.join(orig_dir, 'PID') _log.debug('target dir: %s', target_dir) gmTools.mkdir(target_dir) except Exception: _log.exception('cannot setup splitting environment') root_logger.removeHandler(local_logger) return False, None # split target_names = [] try: shutil.copy(filename, work_filename) fixed_filename = __fix_malformed_hl7_file(work_filename, encoding = encoding) MSH_fnames = __split_hl7_file_by_MSH(fixed_filename, encoding) PID_fnames = [] for MSH_fname in MSH_fnames: PID_fnames.extend(__split_MSH_by_PID(MSH_fname)) for PID_fname in PID_fnames: shutil.move(PID_fname, target_dir) target_names.append(os.path.join(target_dir, os.path.split(PID_fname)[1])) except Exception: _log.exception('cannot split HL7 file') for target_name in target_names: try: os.remove(target_name) except Exception: pass root_logger.removeHandler(local_logger) shutil.move(local_log_name, error_dir) return False, None _log.info('successfully split') root_logger.removeHandler(local_logger) try: shutil.move(filename, done_dir) shutil.move(local_log_name, done_dir) except shutil.Error: _log.exception('cannot move hl7 file or log file to holding area') return True, target_names
Multi-step processing of HL7 files.
- input can be multi-MSH / multi-PID / partially malformed HL7
- tries to fix oddities
- splits by MSH
-
splits by PID into
-
needs write permissions in dir_of(filename)
-
moves HL7 files which were successfully split up into dir_of(filename)/done/
-
returns (True|False, list_of_PID_files)
def stage_single_PID_hl7_file(filename, source=None, encoding='utf8')
-
Expand source code
def stage_single_PID_hl7_file(filename, source=None, encoding='utf8'): """Multi-step processing of HL7 files. - input must be single-MSH / single-PID / normalized HL7 - imports into clin.incoming_data - needs write permissions in dir_of(filename) - moves PID files which were successfully staged into dir_of(filename)/done/PID/ """ local_log_name = gmTools.get_unique_filename ( prefix = gmTools.fname_stem(filename) + '-', suffix = '.stage.log' ) local_logger = logging.FileHandler(local_log_name) local_logger.setLevel(logging.DEBUG) root_logger = logging.getLogger('') root_logger.addHandler(local_logger) _log.info('staging [%s] as unmatched incoming HL7%s', filename, gmTools.coalesce(source, '', ' (%s)')) _log.debug('log file: %s', local_log_name) # sanity checks/setup try: open(filename).close() orig_dir = os.path.split(filename)[0] done_dir = os.path.join(orig_dir, 'done') gmTools.mkdir(done_dir) error_dir = os.path.join(orig_dir, 'failed') gmTools.mkdir(error_dir) except Exception: _log.exception('cannot setup staging environment') root_logger.removeHandler(local_logger) return False # stage try: incoming = gmIncomingData.create_incoming_data('HL7%s' % gmTools.coalesce(source, '', ' (%s)'), filename) if incoming is None: _log.error('cannot stage PID file: %s', filename) root_logger.removeHandler(local_logger) shutil.move(filename, error_dir) shutil.move(local_log_name, error_dir) return False incoming.update_data_from_file(fname = filename) except Exception: _log.exception('error staging PID file') root_logger.removeHandler(local_logger) shutil.move(filename, error_dir) shutil.move(local_log_name, error_dir) return False # set additional data MSH_file = open(filename, mode = 'rt', encoding = 'utf-8-sig', newline = '') raw_hl7 = MSH_file.read(1024 * 1024 * 5) # 5 MB max MSH_file.close() shutil.move(filename, done_dir) incoming['comment'] = format_hl7_message ( message = raw_hl7, skip_empty_fields = True, eol = '\n' ) HL7 = pyhl7.parse(raw_hl7) del raw_hl7 incoming['comment'] += '\n' incoming['comment'] += ('-' * 80) incoming['comment'] += '\n\n' log = open(local_log_name, mode = 'rt', encoding = 'utf-8-sig') incoming['comment'] += log.read() log.close() try: incoming['lastnames'] = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__lastname) incoming['firstnames'] = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__firstname) val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__middlename) if val is not None: incoming['firstnames'] += ' ' incoming['firstnames'] += val val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__dob) if val is not None: tmp = time.strptime(val, '%Y%m%d') incoming['dob'] = pyDT.datetime(tmp.tm_year, tmp.tm_mon, tmp.tm_mday, tzinfo = gmDateTime.gmCurrentLocalTimezone) val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__gender) if val is not None: incoming['gender'] = val incoming['external_data_id'] = filename #u'fk_patient_candidates', # u'request_id', # request ID as found in <data> # u'postcode', # u'other_info', # other identifying info in .data # u'requestor', # Requestor of data (e.g. who ordered test results) if available in source data. # u'fk_identity', # u'comment', # a free text comment on this row, eg. why is it here, error logs etc # u'fk_provider_disambiguated' # The provider the data is relevant to. except Exception: _log.exception('cannot add more data') incoming.save() _log.info('successfully staged') root_logger.removeHandler(local_logger) shutil.move(local_log_name, done_dir) return True
Multi-step processing of HL7 files.
-
input must be single-MSH / single-PID / normalized HL7
-
imports into clin.incoming_data
-
needs write permissions in dir_of(filename)
- moves PID files which were successfully staged into dir_of(filename)/done/PID/
-