Module Gnumed.business.gmXdtObjects

GNUmed German XDT parsing objects.

This encapsulates some of the XDT data into objects for easy access.

Functions

def add_file_to_patlst(ID, name, patlst, new_file, ahash)
Expand source code
def add_file_to_patlst(ID, name, patlst, new_file, ahash):
        anIdentity = "%s:%s" % (ID, name)
        files = patlst.get(aGroup = anIdentity, anOption = "files")
        files.append("%s:%s" % (new_file, ahash))
        _log.debug("files now there : %s" % files)
        patlst.set(aGroup=anIdentity, anOption="files", aValue = files, aComment="")
def check_for_previous_records(ID, name, patlst)
Expand source code
def check_for_previous_records(ID, name, patlst):
        anIdentity = "%s:%s" % (ID, name)
        hashes = []
        # patient not listed yet
        if anIdentity not in patlst.getGroups():
                _log.debug("identity not yet in list" )
                patlst.set(aGroup = anIdentity, anOption = 'files', aValue = [], aComment = '')
        # file already listed ?
        file_defs = patlst.get(aGroup = anIdentity, anOption = "files")
        for line in file_defs:
                file, ahash = line.split(':')
                hashes.append(ahash)

        return hashes
def determine_xdt_encoding(filename=None, default_encoding=None)
Expand source code
def determine_xdt_encoding(filename=None, default_encoding=None):

        f = open(filename, mode = 'rt', encoding = 'utf-8-sig', errors = 'ignore')

        file_encoding = None
        for line in f:
                field = line[3:7]
                if field in gmXdtMappings._charset_fields:
                        _log.debug('found charset field [%s] in <%s>', field, filename)
                        val = line[7:8]
                        file_encoding = gmXdtMappings._map_field2charset[field][val]
                        _log.debug('encoding in file is "%s" (%s)', file_encoding, val)
                        break
        f.close()

        if file_encoding is None:
                _log.debug('no encoding found in <%s>, assuming [%s]', filename, default_encoding)
                return default_encoding

        return file_encoding
def get_pat_files(aFile, ID, name, patdir=None, patlst=None)
Expand source code
def get_pat_files(aFile, ID, name, patdir = None, patlst = None):
        _log.debug("getting files for patient [%s:%s]" % (ID, name))
        files = patlst.get(aGroup = "%s:%s" % (ID, name), anOption = "files")
        _log.debug("%s => %s" % (patdir, files))
        return [patdir, files]
def get_rand_fname(aDir)
Expand source code
def get_rand_fname(aDir):
        tmpname = gmTools.get_unique_filename(prefix='', suffix = time.strftime(".%Y%m%d-%H%M%S", time.localtime()), tmp_dir=aDir)
        path, fname = os.path.split(tmpname)
        return fname
def read_person_from_xdt(filename=None, encoding=None, dob_format=None)
Expand source code
def read_person_from_xdt(filename=None, encoding=None, dob_format=None):

        _map_id2name = {
                '3101': 'lastnames',
                '3102': 'firstnames',
                '3103': 'dob',
                '3110': 'gender',
                '3106': 'zipurb',
                '3107': 'street',
                '3112': 'zip',
                '3113': 'urb',
                '8316': 'source'
        }

        needed_fields = (
                '3101',
                '3102'
        )

        interesting_fields = list(_map_id2name)

        data = {}

        # try to find encoding if not given
        if encoding is None:
                encoding = determine_xdt_encoding(filename=filename)

        xdt_file = open(filename, mode = 'rt', encoding = encoding)

        for line in xdt_file:

#               # can't use more than what's interesting ... ;-)
#               if len(data) == len(interesting_fields):
#                       break

                line = line.replace('\015','')
                line = line.replace('\012','')

                # xDT line format: aaabbbbcccccccccccCRLF where aaa = length, bbbb = record type, cccc... = content
                field = line[3:7]
                # do we care about this line ?
                if field in interesting_fields:
                        try:
                                data[_map_id2name[field]]
                                break
                        except KeyError:
                                data[_map_id2name[field]] = line[7:]

        xdt_file.close()

        # found enough data ?
        if len(data) < len(needed_fields):
                raise ValueError('insufficient patient data in XDT file [%s], found only: %s' % (filename, data))

        from Gnumed.business import gmPerson
        dto = gmPerson.cDTO_person()

        dto.firstnames = data['firstnames']
        dto.lastnames = data['lastnames']

        # CAVE: different data orders are possible, so configuration may be needed
        # FIXME: detect xDT version and use default from the standard when dob_format is None
        try:
                dob = time.strptime(data['dob'], gmTools.coalesce(dob_format, '%d%m%Y'))
                dto.dob = pyDT.datetime(dob.tm_year, dob.tm_mon, dob.tm_mday, tzinfo = gmDateTime.gmCurrentLocalTimezone)
        except KeyError:
                dto.dob = None

        try:
                dto.gender = gmXdtMappings.map_gender_xdt2gm[data['gender'].casefold()]
        except KeyError:
                dto.gender = None

        dto.zip = None
        try:
                dto.zip = regex.match(r'\d{5}', data['zipurb']).group()
        except KeyError: pass
        try:
                dto.zip = data['zip']
        except KeyError: pass

        dto.urb = None
        try:
                dto.urb = regex.sub(r'\d{5} ', '', data['zipurb'])
        except KeyError: pass
        try:
                dto.urb = data['urb']
        except KeyError: pass

        try:
                dto.street = data['street']
        except KeyError:
                dto.street = None

        try:
                dto.source = data['source']
        except KeyError:
                dto.source = None

        return dto
def split_xdt_file(aFile, patlst, cfg)
Expand source code
def split_xdt_file(aFile,patlst,cfg):
        # xDT line format: aaabbbbcccccccccccCRLF where aaa = length, bbbb = record type, cccc... = content
        content = []
        record_start_lines = []
        # find record starts
        for line in fileinput.input(aFile):
                strippedline = line.replace('\015','')
                strippedline = strippedline.replace('\012','')
                # do we care about this line ? (records start with 8000)
                if strippedline[3:7] == '8000':
                        record_start_lines.append(fileinput.filelineno())
        # loop over patient records
        for aline in record_start_lines:
                # WHY +2 ?!? 
                line = linecache.getline(aFile,aline+2) 
                # remove trailing CR and/or LF
                strippedline = line.replace('\015','')
                strippedline = strippedline.replace('\012','')
                # do we care about this line ?
                field = strippedline[3:7]
                # extract patient id
                if field == '3000': 
                        ID = strippedline[7:]
                        line = linecache.getline(aFile,aline+3)
                        # remove trailing CR and/or LF
                        strippedline = line.replace('\015','')
                        strippedline = strippedline.replace('\012','')
                        # do we care about this line ?
                        field = strippedline[3:7]
                        if field == '3101':
                                name = strippedline [7:]
                        startline=aline
                        endline=record_start_lines[record_start_lines.index(aline)+1]
                        _log.debug("reading from%s" %str(startline)+' '+str(endline) )
                        for tmp in range(startline,endline):
                                content.append(linecache.getline(aFile,tmp))
                                _log.debug("reading %s" % tmp )
                        hashes = check_for_previous_records(ID,name,patlst)
                        # is this new content ?
                        #data_hash = md5.new()                  # FIXME: use hashlib
                        #map(data_hash.update, content)
                        data_hash = hashlib.md5(''.join(content).encode('utf8'))
                        digest = data_hash.hexdigest()
                        if digest not in hashes:
                                pat_dir = cfg.get("xdt-viewer", "export-dir")
                                file = write_xdt_pat_data(content, pat_dir)
                                add_file_to_patlst(ID, name, patlst, file, data_hash)
                        content = []
                else:
                        continue
        # cleanup
        fileinput.close()
        patlst.store()
        return 1
def write_xdt_pat_data(data, aDir)
Expand source code
def write_xdt_pat_data(data, aDir):
        """write record for this patient to new file"""
        fname = os.path.join(aDir, get_rand_fname(aDir))
        pat_file = open(fname, mode = "wt", encoding = 'utf8')
        map(pat_file.write, data)
        pat_file.close()
        return fname

write record for this patient to new file

def xdt_get_pats(aFile)
Expand source code
def xdt_get_pats(aFile):
        pat_ids = []
        pat_names = []
        pats = {}
        # xDT line format: aaabbbbcccccccccccCRLF where aaa = length, bbbb = record type, cccc... = content
        # read patient dat
        for line in fileinput.input(aFile):
                # remove trailing CR and/or LF
                line = line.replace('\015','')
                line = line.replace('\012','')
                # do we care about this line ?
                field = line[3:7]
                # yes, if type = patient id
                if field == '3000':
                        pat_id = line[7:]
                        if pat_id not in pat_ids:
                                pat_ids.append(pat_id)
                        continue
                # yes, if type = patient name
                if field == '3101':
                        pat_name = line [7:]
                        if pat_name not in pat_names:
                                pat_names.append(pat_name)
                                pats[pat_id] = pat_name
                        continue
        fileinput.close()

        _log.debug("patients found: %s" % len(pat_ids))
        return pats

Classes

class cDTO_xdt_person
Expand source code
class cDTO_xdt_person(gmPerson.cDTO_person):

        def store(self):
                pass

Ancestors

Methods

def store(self)
Expand source code
def store(self):
        pass

Inherited members

class cLDTFile (filename=None, encoding=None, override_encoding=False)
Expand source code
class cLDTFile(object):

        def __init__(self, filename=None, encoding=None, override_encoding=False):

                file_encoding = determine_xdt_encoding(filename=filename)
                if file_encoding is None:
                        _log.warning('LDT file <%s> does not specify encoding', filename)
                        if encoding is None:
                                raise ValueError('no encoding specified in file <%s> or method call' % filename)

                if override_encoding:
                        if encoding is None:
                                raise ValueError('no encoding specified in method call for overriding encoding in file <%s>' % filename)
                        self.encoding = encoding
                else:
                        if file_encoding is None:
                                self.encoding = encoding
                        else:
                                self.encoding = file_encoding

                self.filename = filename

                self.__header = None
                self.__tail = None
        #----------------------------------------------------------
        def _get_header(self):

                if self.__header is not None:
                        return self.__header

                ldt_file = open(self.filename, mode = 'rt', encoding = self.encoding)
                self.__header = []
                for line in ldt_file:
                        #length, field, content = line[:3], line[3:7], line[7:].replace('\015','').replace('\012','')
                        field = line[3:7]
                        content = line[7:].replace('\015','').replace('\012','')
                        # loop until found first LG-Bericht
                        if field == '8000':
                                if content in ['8202']:
                                        break
                        self.__header.append(line)

                ldt_file.close()
                return self.__header

        header = property(_get_header)
        #----------------------------------------------------------
        def _get_tail(self):

                if self.__tail is not None:
                        return self.__tail

                ldt_file = open(self.filename, mode = 'rt', encoding = self.encoding)
                self.__tail = []
                in_tail = False
                for line in ldt_file:
                        if in_tail:
                                self.__tail.append(line)
                                continue

                        #length, field, content = line[:3], line[3:7], line[7:].replace('\015','').replace('\012','')
                        field = line[3:7]
                        content = line[7:].replace('\015','').replace('\012','')
                        # loop until found tail
                        if field == '8000':
                                if content not in ['8221']:
                                        continue
                                in_tail = True
                                self.__tail.append(line)

                ldt_file.close()
                return self.__tail

        tail = property(_get_tail)
        #----------------------------------------------------------
        def split_by_patient(self, dir=None, file=None):

                ldt_file = open(self.filename, mode = 'rt', encoding = self.encoding)
                out_file = None

                in_patient = False
                for line in ldt_file:

                        if in_patient:
                                out_file.write(line)
                                continue

                        #length, field, content = line[:3], line[3:7], line[7:].replace('\015','').replace('\012','')
                        content = line[7:].replace('\015','').replace('\012','')
                        field = line[3:7]
                        # start of record
                        if field == '8000':
                                # start of LG-Bericht
                                if content == '8202':
                                        in_patient = True
                                        if out_file is not None:
                                                out_file.write(''.join(self.tail))
                                                out_file.close()
                                        #out_file = open(filename=filename_xxxx, mode=xxxx_'rU', encoding=self.encoding)
                                        out_file.write(''.join(self.header))
                                else:
                                        in_patient = False
                                        if out_file is not None:
                                                out_file.write(''.join(self.tail))
                                                out_file.close()

                if out_file is not None:
                        if not out_file.closed:
                                out_file.write(''.join(self.tail))
                                out_file.close()

                ldt_file.close()

Instance variables

prop header
Expand source code
def _get_header(self):

        if self.__header is not None:
                return self.__header

        ldt_file = open(self.filename, mode = 'rt', encoding = self.encoding)
        self.__header = []
        for line in ldt_file:
                #length, field, content = line[:3], line[3:7], line[7:].replace('\015','').replace('\012','')
                field = line[3:7]
                content = line[7:].replace('\015','').replace('\012','')
                # loop until found first LG-Bericht
                if field == '8000':
                        if content in ['8202']:
                                break
                self.__header.append(line)

        ldt_file.close()
        return self.__header
prop tail
Expand source code
def _get_tail(self):

        if self.__tail is not None:
                return self.__tail

        ldt_file = open(self.filename, mode = 'rt', encoding = self.encoding)
        self.__tail = []
        in_tail = False
        for line in ldt_file:
                if in_tail:
                        self.__tail.append(line)
                        continue

                #length, field, content = line[:3], line[3:7], line[7:].replace('\015','').replace('\012','')
                field = line[3:7]
                content = line[7:].replace('\015','').replace('\012','')
                # loop until found tail
                if field == '8000':
                        if content not in ['8221']:
                                continue
                        in_tail = True
                        self.__tail.append(line)

        ldt_file.close()
        return self.__tail

Methods

def split_by_patient(self, dir=None, file=None)
Expand source code
def split_by_patient(self, dir=None, file=None):

        ldt_file = open(self.filename, mode = 'rt', encoding = self.encoding)
        out_file = None

        in_patient = False
        for line in ldt_file:

                if in_patient:
                        out_file.write(line)
                        continue

                #length, field, content = line[:3], line[3:7], line[7:].replace('\015','').replace('\012','')
                content = line[7:].replace('\015','').replace('\012','')
                field = line[3:7]
                # start of record
                if field == '8000':
                        # start of LG-Bericht
                        if content == '8202':
                                in_patient = True
                                if out_file is not None:
                                        out_file.write(''.join(self.tail))
                                        out_file.close()
                                #out_file = open(filename=filename_xxxx, mode=xxxx_'rU', encoding=self.encoding)
                                out_file.write(''.join(self.header))
                        else:
                                in_patient = False
                                if out_file is not None:
                                        out_file.write(''.join(self.tail))
                                        out_file.close()

        if out_file is not None:
                if not out_file.closed:
                        out_file.write(''.join(self.tail))
                        out_file.close()

        ldt_file.close()