Module Gnumed.business.gmDocuments
This module encapsulates a document stored in a GNUmed database.
Expand source code
"""This module encapsulates a document stored in a GNUmed database."""
#============================================================
__author__ = "Karsten Hilbert <Karsten.Hilbert@gmx.net>"
__license__ = "GPL v2 or later"
import sys
import os
import time
import logging
if __name__ == '__main__':
sys.path.insert(0, '../../')
_ = lambda x:x
from Gnumed.pycommon import gmExceptions
from Gnumed.pycommon import gmBusinessDBObject
from Gnumed.pycommon import gmPG2
from Gnumed.pycommon import gmTools
from Gnumed.pycommon import gmMimeLib
from Gnumed.pycommon import gmWorkerThread
from Gnumed.business import gmOrganization
_log = logging.getLogger('gm.docs')
MUGSHOT=26
DOCUMENT_TYPE_VISUAL_PROGRESS_NOTE = 'visual progress note'
DOCUMENT_TYPE_PRESCRIPTION = 'prescription'
#============================================================
_SQL_get_document_part_fields = "SELECT * FROM blobs.v_obj4doc_no_data WHERE %s"
class cDocumentPart(gmBusinessDBObject.cBusinessDBObject):
"""Represents one part of a medical document."""
_cmd_fetch_payload = _SQL_get_document_part_fields % "pk_obj = %s"
_cmds_store_payload = [
"""UPDATE blobs.doc_obj SET
seq_idx = %(seq_idx)s,
comment = gm.nullify_empty_string(%(obj_comment)s),
filename = gm.nullify_empty_string(%(filename)s),
fk_intended_reviewer = %(pk_intended_reviewer)s
WHERE
pk = %(pk_obj)s
AND
xmin = %(xmin_doc_obj)s
RETURNING
xmin AS xmin_doc_obj"""
]
_updatable_fields = [
'seq_idx',
'obj_comment',
'pk_intended_reviewer',
'filename'
]
#--------------------------------------------------------
# retrieve data
#--------------------------------------------------------
def save_to_file(self, aChunkSize=0, filename=None, target_mime=None, target_extension=None, ignore_conversion_problems=False, directory=None, conn=None):
if filename is None:
filename = self.get_useful_filename(make_unique = True, directory = directory)
dl_fname = self.__download_to_file(filename = filename)
if dl_fname is None:
return None
if target_mime is None:
return gmMimeLib.adjust_extension_by_mimetype(dl_fname)
converted_fname = self.__convert_file_to (
filename = dl_fname,
target_mime = target_mime,
target_extension = target_extension
)
if converted_fname is None:
if ignore_conversion_problems:
return dl_fname
return None
gmTools.remove_file(dl_fname)
return converted_fname
#--------------------------------------------------------
def get_reviews(self):
SQL = """
SELECT
reviewer,
reviewed_when,
is_technically_abnormal,
clinically_relevant,
is_review_by_responsible_reviewer,
is_your_review,
coalesce(comment, '')
FROM blobs.v_reviewed_doc_objects
WHERE pk_doc_obj = %(pk_doc_obj)s
ORDER BY
is_your_review desc,
is_review_by_responsible_reviewer desc,
reviewed_when desc
"""
args = {'pk_doc_obj': self.pk_obj}
rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
return rows
#--------------------------------------------------------
def __get_containing_document(self):
return cDocument(aPK_obj = self._payload['pk_doc'])
containing_document = property(__get_containing_document)
#--------------------------------------------------------
# store data
#--------------------------------------------------------
def update_data_from_incoming(self, conn=None, pk_incoming:int=None):
SQL = """
UPDATE blobs.doc_obj
SET data = (SELECT data FROM clin.incoming_data WHERE pk = %(pk_incoming)s)
WHERE pk = %(pk_part)s
"""
args = {'pk_part': self.pk_obj, 'pk_incoming': pk_incoming}
gmPG2.run_rw_queries(link_obj = conn, queries = [{'sql': SQL, 'args': args}], return_data = False)
# must update XMIN now ...
self.refetch_payload(link_obj = conn)
return True
#--------------------------------------------------------
def update_data_from_file(self, fname=None, link_obj=None):
# sanity check
if not (os.access(fname, os.R_OK) and os.path.isfile(fname)):
_log.error('[%s] is not a readable file' % fname)
return False
cmd = "UPDATE blobs.doc_obj SET data = %(data)s::BYTEA WHERE pk = %(pk)s RETURNING md5(data) AS md5"
args = {'pk': self.pk_obj}
md5 = gmTools.file2md5(filename = fname, return_hex = True)
if not gmPG2.file2bytea(conn = link_obj, query = cmd, filename = fname, args = args, file_md5 = md5):
return False
# must update XMIN now ...
self.refetch_payload(link_obj = link_obj)
return True
#--------------------------------------------------------
def set_reviewed(self, technically_abnormal:bool=None, clinically_relevant:bool=None):
# row already there ?
SQL = """
SELECT pk
FROM blobs.reviewed_doc_objs
WHERE
fk_reviewed_row = %(pk_obj)s and
fk_reviewer = (SELECT pk FROM dem.staff WHERE db_user = current_user)
"""
args = {'pk_obj': self.pk_obj}
rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
# INSERT needed
if not rows:
cols = [
"fk_reviewer",
"fk_reviewed_row",
"is_technically_abnormal",
"clinically_relevant"
]
vals = [
'%(fk_row)s',
'%(abnormal)s',
'%(relevant)s'
]
args = {
'fk_row': self.pk_obj,
'abnormal': technically_abnormal,
'relevant': clinically_relevant
}
cmd = """
insert into blobs.reviewed_doc_objs (
%s
) values (
(SELECT pk FROM dem.staff WHERE db_user=current_user),
%s
)""" % (', '.join(cols), ', '.join(vals))
# UPDATE needed
elif len(rows) == 1:
pk_review = rows[0][0]
args = {
'abnormal': technically_abnormal,
'relevant': clinically_relevant,
'pk_review': pk_review
}
cmd = """
UPDATE blobs.reviewed_doc_objs SET
is_technically_abnormal = %(abnormal)s,
clinically_relevant = %(relevant)s
WHERE
pk = %(pk_review)s
"""
else:
raise AssertionError('more than one review by one reviewer for one particular row')
rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
return True
#--------------------------------------------------------
def set_as_active_photograph(self):
if self._payload['type'] != 'patient photograph':
return False
# set seq_idx to current max + 1
cmd = 'SELECT coalesce(max(seq_idx)+1, 1) FROM blobs.doc_obj WHERE fk_doc = %(doc_id)s'
rows = gmPG2.run_ro_queries (
queries = [{
'sql': cmd,
'args': {'doc_id': self._payload['pk_doc']}
}]
)
self._payload['seq_idx'] = rows[0][0]
self._is_modified = True
return self.save_payload()
#--------------------------------------------------------
def reattach(self, pk_doc=None):
if pk_doc == self._payload['pk_doc']:
return True
cmd = """
UPDATE blobs.doc_obj SET
fk_doc = %(pk_doc_target)s,
-- coalesce needed for no-parts target docs
seq_idx = (SELECT coalesce(max(seq_idx) + 1, 1) FROM blobs.doc_obj WHERE fk_doc = %(pk_doc_target)s)
WHERE
EXISTS(SELECT 1 FROM blobs.doc_med WHERE pk = %(pk_doc_target)s)
AND
pk = %(pk_obj)s
AND
xmin = %(xmin_doc_obj)s
RETURNING fk_doc
"""
args = {
'pk_doc_target': pk_doc,
'pk_obj': self.pk_obj,
'xmin_doc_obj': self._payload['xmin_doc_obj']
}
rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}], return_data = True)
if len(rows) == 0:
return False
# The following should never hold true because the target
# fk_doc is returned from the query and it is checked for
# equality before the UPDATE already. Assuming the update
# failed to update a row because the target fk_doc did
# not exist we would not get *any* rows in return - for
# which condition we also already checked
if rows[0]['fk_doc'] == self._payload['pk_doc']:
return False
self.refetch_payload()
return True
#--------------------------------------------------------
def display_via_mime(self, chunksize=0, block=None):
fname = self.save_to_file(aChunkSize = chunksize)
if fname is None:
return False, ''
success, msg = gmMimeLib.call_viewer_on_file(fname, block = block)
if not success:
return False, msg
return True, ''
#--------------------------------------------------------
def format_single_line(self):
f_ext = ''
if self._payload['filename'] is not None:
f_ext = os.path.splitext(self._payload['filename'])[1].strip('.').strip()
if f_ext != '':
f_ext = ' .' + f_ext.upper()
txt = _('part %s, %s%s%s of document %s from %s%s') % (
self._payload['seq_idx'],
gmTools.size2str(self._payload['size']),
f_ext,
gmTools.coalesce(self._payload['obj_comment'], '', ' ("%s")'),
self._payload['l10n_type'],
self._payload['date_generated'].strftime('%Y %b %d'),
gmTools.coalesce(self._payload['doc_comment'], '', ' ("%s")')
)
return txt
#--------------------------------------------------------
def format(self, single_line=False):
if single_line:
return self.format_single_line()
txt = _('%s document part [#%s]\n') % (
gmTools.bool2str (
boolean = self._payload['reviewed'],
true_str = _('Reviewed'),
false_str = _('Unreviewed')
),
self._payload['pk_obj']
)
f_ext = ''
if self._payload['filename'] is not None:
f_ext = os.path.splitext(self._payload['filename'])[1].strip('.').strip()
if f_ext != '':
f_ext = '.' + f_ext.upper() + ' '
txt += _(' Part %s: %s %s(%s Bytes)\n') % (
self._payload['seq_idx'],
gmTools.size2str(self._payload['size']),
f_ext,
self._payload['size']
)
if self._payload['filename'] is not None:
path, fname = os.path.split(self._payload['filename'])
if not path.endswith(os.path.sep):
if path != '':
path += os.path.sep
if path != '':
path = ' (%s)' % path
txt += _(' Filename: %s%s\n') % (fname, path)
if self._payload['obj_comment'] is not None:
txt += '\n%s\n' % self._payload['obj_comment']
return txt
#--------------------------------------------------------
def format_metainfo(self, callback=None):
"""If <callback> is not None it will receive a tuple (status, description, pk_obj)."""
if callback is None:
return self.__run_metainfo_formatter()
gmWorkerThread.execute_in_worker_thread (
payload_function = self.__run_metainfo_formatter,
completion_callback = callback,
worker_name = 'doc_part-metainfo_formatter-'
)
#--------------------------------------------------------
def get_useful_filename(self, patient=None, make_unique=False, directory=None, include_gnumed_tag=True, date_before_type=False, name_first=True):
patient_part = ''
if patient:
if name_first:
patient_part = '%s-' % patient.subdir_name
else:
patient_part = '-%s' % patient.subdir_name
# preserve original filename extension if available
suffix = ''
if self._payload['filename'] is not None:
tmp, suffix = os.path.splitext (
gmTools.fname_sanitize(self._payload['filename']).casefold()
)
if not suffix:
suffix = '.dat'
fname_template = '%%s-part_%s' % self._payload['seq_idx']
if include_gnumed_tag:
fname_template += '-gm_doc'
if date_before_type:
date_type_part = '%s-%s' % (
self._payload['date_generated'].strftime('%Y-%m-%d'),
self._payload['l10n_type'].replace(' ', '_').replace('-', '_'),
)
else:
date_type_part = '%s-%s' % (
self._payload['l10n_type'].replace(' ', '_').replace('-', '_'),
self._payload['date_generated'].strftime('%Y-%m-%d')
)
if name_first:
date_type_name_part = patient_part + date_type_part
else:
date_type_name_part = date_type_part + patient_part
fname = fname_template % date_type_name_part
if make_unique:
fname = gmTools.get_unique_filename (
prefix = '%s-' % gmTools.fname_sanitize(fname),
suffix = suffix,
tmp_dir = directory
)
else:
fname = gmTools.fname_sanitize(os.path.join(gmTools.coalesce(directory, gmTools.gmPaths().tmp_dir), fname + suffix))
return fname
useful_filename = property(get_useful_filename)
#--------------------------------------------------------
# internal helpers
#--------------------------------------------------------
def __download_to_file(self, filename=None, aChunkSize=0, conn=None):
if self._payload['size'] == 0:
_log.debug('part size 0, nothing to download')
return None
if filename is None:
filename = gmTools.get_unique_filename()
success = gmPG2.bytea2file (
data_query = {
'sql': 'SELECT substring(data FROM %(start)s for %(size)s) FROM blobs.doc_obj WHERE pk=%(pk)s',
'args': {'pk': self.pk_obj}
},
filename = filename,
chunk_size = aChunkSize,
data_size = self._payload['size'],
conn = conn
)
if not success:
return None
return filename
#--------------------------------------------------------
def __convert_file_to(self, filename=None, target_mime=None, target_extension=None):
assert (filename is not None), '<filename> must not be None'
assert (target_mime is not None), '<target_mime> must not be None'
if target_extension is None:
target_extension = gmMimeLib.guess_ext_by_mimetype(mimetype = target_mime)
src_path, src_name = os.path.split(filename)
src_stem, src_ext = os.path.splitext(src_name)
conversion_tmp_name = gmTools.get_unique_filename (
prefix = '%s.conv2.' % src_stem,
suffix = target_extension
)
_log.debug('attempting conversion: [%s] -> [<%s>:%s]', filename, target_mime, conversion_tmp_name)
converted_fname = gmMimeLib.convert_file (
filename = filename,
target_mime = target_mime,
target_filename = conversion_tmp_name
)
if converted_fname is None:
_log.warning('conversion failed')
return None
tmp_path, conv_name = os.path.split(converted_fname)
conv_name_in_src_path = os.path.join(src_path, conv_name)
try:
os.replace(converted_fname, conv_name_in_src_path)
except OSError:
_log.exception('cannot os.replace(%s, %s)', converted_fname, conv_name_in_src_path)
return None
return gmMimeLib.adjust_extension_by_mimetype(conv_name_in_src_path)
#--------------------------------------------------------
def __run_metainfo_formatter(self):
filename = self.__download_to_file()
if filename is None:
_log.error('problem downloading part')
return (False, _('problem downloading document part'))
status, desc, cookie = gmMimeLib.describe_file(filename)
return (status, desc, self.pk_obj)
#------------------------------------------------------------
def delete_document_part(part_pk=None, encounter_pk=None):
cmd = """
SELECT blobs.delete_document_part(%(pk)s, %(enc)s)
WHERE NOT EXISTS
(SELECT 1 FROM clin.export_item WHERE fk_doc_obj = %(pk)s)
"""
args = {'pk': part_pk, 'enc': encounter_pk}
gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
return
#============================================================
_SQL_get_document_fields = "SELECT * FROM blobs.v_doc_med b_vdm WHERE %s"
class cDocument(gmBusinessDBObject.cBusinessDBObject):
"""Represents one medical document."""
_cmd_fetch_payload = _SQL_get_document_fields % "pk_doc = %s"
_cmds_store_payload = [
"""UPDATE blobs.doc_med SET
fk_type = %(pk_type)s,
fk_episode = %(pk_episode)s,
fk_encounter = %(pk_encounter)s,
fk_org_unit = %(pk_org_unit)s,
unit_is_receiver = %(unit_is_receiver)s,
clin_when = %(clin_when)s,
comment = gm.nullify_empty_string(%(comment)s),
ext_ref = gm.nullify_empty_string(%(ext_ref)s),
fk_hospital_stay = %(pk_hospital_stay)s
WHERE
pk = %(pk_doc)s and
xmin = %(xmin_doc_med)s
RETURNING
xmin AS xmin_doc_med"""
]
_updatable_fields = [
'pk_type',
'comment',
'clin_when',
'ext_ref',
'pk_episode',
'pk_encounter', # mainly useful when moving visual progress notes to their respective encounters
'pk_org_unit',
'unit_is_receiver',
'pk_hospital_stay'
]
#--------------------------------------------------------
def refetch_payload(self, ignore_changes=False, link_obj=None):
try: del self.__has_unreviewed_parts
except AttributeError: pass
return super().refetch_payload(ignore_changes = ignore_changes, link_obj = link_obj)
#--------------------------------------------------------
def get_descriptions(self, max_lng:int=250):
"""Get document descriptions.
- will return a list of rows
"""
if max_lng is None:
SQL = "SELECT pk, text FROM blobs.doc_desc WHERE fk_doc = %(pk_doc)s"
else:
SQL = "SELECT pk, substring(text FROM 1 for %s) FROM blobs.doc_desc WHERE fk_doc = %%(pk_doc)s" % max_lng
args = {'pk_doc': self.pk_obj}
rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
return rows
#--------------------------------------------------------
def add_description(self, description=None):
SQL = 'INSERT INTO blobs.doc_desc (fk_doc, text) values (%(pk_doc)s, %(desc)s)'
args = {'pk_doc': self.pk_obj, 'desc': description}
gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
return True
#--------------------------------------------------------
def update_description(self, pk=None, description=None):
cmd = "update blobs.doc_desc set text = %(desc)s WHERE fk_doc = %(doc)s and pk = %(pk_desc)s"
gmPG2.run_rw_queries(queries = [
{'sql': cmd, 'args': {'doc': self.pk_obj, 'pk_desc': pk, 'desc': description}}
])
return True
#--------------------------------------------------------
def delete_description(self, pk=None):
cmd = "DELETE FROM blobs.doc_desc WHERE fk_doc = %(doc)s and pk = %(desc)s"
gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': {'doc': self.pk_obj, 'desc': pk}}])
return True
#--------------------------------------------------------
def _get_parts(self):
SQL = _SQL_get_document_part_fields % 'pk_doc = %(pk_doc)s ORDER BY seq_idx'
args = {'pk_doc': self.pk_obj}
rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
return [ cDocumentPart(row = {'pk_field': 'pk_obj', 'data': r}) for r in rows ]
parts = property(_get_parts)
#--------------------------------------------------------
def add_part(self, file=None, link_obj=None):
"""Add a part to the document."""
# create dummy part
cmd = """
INSERT INTO blobs.doc_obj (
fk_doc, data, seq_idx
) VALUES (
%(doc_id)s,
''::bytea,
(SELECT coalesce(max(seq_idx)+1, 1) FROM blobs.doc_obj WHERE fk_doc = %(doc_id)s)
) RETURNING pk"""
rows = gmPG2.run_rw_queries (
link_obj = link_obj,
queries = [{'sql': cmd, 'args': {'doc_id': self.pk_obj}}],
return_data = True
)
# init document part instance
pk_part = rows[0][0]
new_part = cDocumentPart(aPK_obj = pk_part, link_obj = link_obj)
if not new_part.update_data_from_file(link_obj = link_obj, fname = file):
_log.error('cannot import binary data from [%s] into document part' % file)
SQL = 'DELETE FROM blobs.doc_obj WHERE pk = %(pk_part)s'
args = {'pk_part': pk_part}
gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'sql': SQL, 'args': args}])
return None
new_part['filename'] = file
new_part.save_payload(conn = link_obj)
return new_part
#--------------------------------------------------------
def add_parts_from_files(self, files=None, reviewer=None):
new_parts = []
for filename in files:
new_part = self.add_part(file = filename)
if new_part is None:
msg = 'cannot instantiate document part object from [%s]' % filename
_log.error(msg)
return (False, msg, filename)
new_parts.append(new_part)
if reviewer is not None:
new_part['pk_intended_reviewer'] = reviewer # None == Null
success, data = new_part.save_payload()
if not success:
msg = 'cannot set reviewer to [%s] on [%s]' % (reviewer, filename)
_log.error(msg)
_log.error(str(data))
return (False, msg, filename)
return (True, '', new_parts)
#--------------------------------------------------------
def save_parts_to_files(self, export_dir=None, chunksize=0, conn=None):
fnames = []
for part in self.parts:
fname = part.save_to_file(aChunkSize = chunksize, directory = export_dir, conn = conn)
if fname is None:
_log.error('cannot export document part [%s]', part)
continue
fnames.append(fname)
return fnames
#--------------------------------------------------------
def add_part_from_incoming(self, pk_incoming:int=None, pk_reviewer:int=None, conn=None):
SQL = """
INSERT INTO blobs.doc_obj (
fk_doc, data, seq_idx
) VALUES (
%(doc_id)s,
''::bytea,
(SELECT coalesce(max(seq_idx)+1, 1) FROM blobs.doc_obj WHERE fk_doc = %(doc_id)s)
) RETURNING pk
"""
args = {'doc_id': self.pk_obj}
rows = gmPG2.run_rw_queries(link_obj = conn, queries = [{'sql': SQL, 'args': args}], return_data = True)
new_part = cDocumentPart(aPK_obj = rows[0]['pk'], link_obj = conn)
if not new_part.update_data_from_incoming(conn = conn, pk_incoming = pk_incoming):
return None
if not pk_reviewer:
return new_part
new_part['pk_intended_reviewer'] = pk_reviewer
success, data = new_part.save(conn = conn)
if success:
return new_part
msg = 'cannot set reviewer to [%s] on [%s]' % (pk_reviewer, pk_incoming)
_log.error(msg)
_log.error(str(data))
return None
#--------------------------------------------------------
def add_parts_from_incoming(self, incoming_data:list=None, pk_reviewer:int=None, conn=None):
"""Add parts to document from gmIncomingData.cIncomingData instances.
Returns:
A tuple (success state, data). On failure data is an error message. On success data is the list of new parts.
"""
new_parts = []
for incoming in incoming_data:
part = self.add_part_from_incoming (
pk_incoming = incoming['pk_incoming_data'],
pk_reviewer = pk_reviewer,
conn = conn
)
if part:
new_parts.append(part)
continue
msg = 'cannot instantiate document part from incoming [%s]' % incoming['pk_incoming_data']
_log.error(msg)
return False, msg
return True, new_parts
#--------------------------------------------------------
def _get_has_unreviewed_parts(self):
try:
return self.__has_unreviewed_parts # pylint: disable=access-member-before-definition
except AttributeError:
pass
cmd = "SELECT EXISTS(SELECT 1 FROM blobs.v_obj4doc_no_data WHERE pk_doc = %(pk)s AND reviewed IS FALSE)"
args = {'pk': self.pk_obj}
rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
self.__has_unreviewed_parts = rows[0][0]
return self.__has_unreviewed_parts
has_unreviewed_parts = property(_get_has_unreviewed_parts)
#--------------------------------------------------------
def set_reviewed(self, technically_abnormal=None, clinically_relevant=None):
# FIXME: this is probably inefficient
for part in self.parts:
if not part.set_reviewed(technically_abnormal, clinically_relevant):
return False
return True
#--------------------------------------------------------
def set_primary_reviewer(self, reviewer=None):
for part in self.parts:
part['pk_intended_reviewer'] = reviewer
success, data = part.save_payload()
if not success:
_log.error('cannot set reviewer to [%s]' % reviewer)
_log.error(str(data))
return False
return True
#--------------------------------------------------------
def format_single_line(self):
part_count = len(self._payload['seq_idx_list'])
if part_count == 0:
parts = _('no parts')
elif part_count == 1:
parts = _('1 part')
else:
parts = _('%s parts') % part_count
detail = ''
if self._payload['ext_ref'] is not None:
detail = self._payload['ext_ref']
if self._payload['unit'] is not None:
template = _('%s of %s')
if detail == '':
detail = _('%s of %s') % (
self._payload['unit'],
self._payload['organization']
)
else:
detail += (' @ ' + template % (
self._payload['unit'],
self._payload['organization']
))
if detail != '':
detail = ' (%s)' % detail
return '%s %s (%s):%s%s' % (
self._payload['clin_when'].strftime('%Y %b %d'),
self._payload['l10n_type'],
parts,
gmTools.coalesce(self._payload['comment'], '', ' "%s"'),
detail
)
#--------------------------------------------------------
def format(self, single_line=False):
if single_line:
return self.format_single_line()
part_count = len(self._payload['seq_idx_list'])
if part_count == 0:
parts = _('no parts')
elif part_count == 1:
parts = _('1 part')
else:
parts = _('%s parts') % part_count
org = ''
if self._payload['unit'] is not None:
if self._payload['unit_is_receiver']:
org = _(' Receiver: %s @ %s\n') % (
self._payload['unit'],
self._payload['organization']
)
else:
org = _(' Sender: %s @ %s\n') % (
self._payload['unit'],
self._payload['organization']
)
stay = ''
if self._payload['pk_hospital_stay'] is not None:
stay = _('Hospital stay') + ': %s\n' % self.hospital_stay.format (
left_margin = 0,
include_procedures = False,
include_docs = False,
include_episode = False
)
txt = _(
'%s (%s) #%s\n'
' Created: %s\n'
' Episode: %s\n'
'%s'
'%s'
'%s'
'%s'
'%s'
) % (
self._payload['l10n_type'],
parts,
self._payload['pk_doc'],
self._payload['clin_when'].strftime('%Y %b %d'),
self._payload['episode'],
gmTools.coalesce(self._payload['health_issue'], '', _(' Health issue: %s\n')),
gmTools.coalesce(self._payload['ext_ref'], '', _(' External reference: %s\n')),
org,
stay,
gmTools.coalesce(self._payload['comment'], '', ' %s')
)
return txt
#--------------------------------------------------------
def format_for_failsafe_output(self, max_width:int=80) -> list[str]:
lines = [ '%s: %s' % (self._payload['clin_when'].strftime('%Y %B %d'), self._payload['l10n_type']) ]
if self._payload['unit'] and not self._payload['unit_is_receiver']:
lines.append(' ' + _('From: %s @ %s') % (self._payload['unit'], self._payload['organization']))
return lines
#--------------------------------------------------------
def _get_hospital_stay(self):
if self._payload['pk_hospital_stay'] is None:
return None
from Gnumed.business import gmEMRStructItems
return gmEMRStructItems.cHospitalStay(self._payload['pk_hospital_stay'])
hospital_stay = property(_get_hospital_stay)
#--------------------------------------------------------
def _get_org_unit(self):
if self._payload['pk_org_unit'] is None:
return None
return gmOrganization.cOrgUnit(self._payload['pk_org_unit'])
org_unit = property(_get_org_unit)
#--------------------------------------------------------
def _get_procedures(self):
from Gnumed.business.gmEMRStructItems import get_procedures4document
return get_procedures4document(pk_document = self.pk_obj)
procedures = property(_get_procedures)
#--------------------------------------------------------
def _get_bills(self):
from Gnumed.business.gmBilling import get_bills4document
return get_bills4document(pk_document = self.pk_obj)
bills = property(_get_bills)
#------------------------------------------------------------
def create_document(document_type=None, encounter=None, episode=None, link_obj=None):
"""Returns new document instance or raises an exception."""
try:
int(document_type)
SQL = "INSERT INTO blobs.doc_med (fk_type, fk_encounter, fk_episode) VALUES (%(type)s, %(pk_enc)s, %(pk_epi)s) RETURNING pk"
except ValueError:
create_document_type(document_type = document_type)
SQL = """
INSERT INTO blobs.doc_med (
fk_type,
fk_encounter,
fk_episode
) VALUES (
coalesce (
(SELECT pk FROM blobs.doc_type bdt WHERE bdt.name = %(type)s),
(SELECT pk FROM blobs.doc_type bdt WHERE _(bdt.name) = %(type)s)
),
%(pk_enc)s,
%(pk_epi)s
) RETURNING pk"""
args = {'type': document_type}
try: args['pk_enc'] = int(encounter)
except (TypeError, ValueError): args['pk_enc'] = encounter['pk_encounter']
try: args['pk_epi'] = int(episode)
except (TypeError, ValueError): args['pk_epi'] = episode['pk_episode']
rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'sql': SQL, 'args': args}], return_data = True)
doc = cDocument(aPK_obj = rows[0][0], link_obj = link_obj)
return doc
#------------------------------------------------------------
def search_for_documents(patient_id=None, type_id=None, external_reference=None, pk_episode=None, pk_types=None):
"""Searches for documents with the given patient and type ID."""
if (patient_id is None) and (pk_episode is None):
raise ValueError('need patient_id or pk_episode to search for document')
where_parts = []
args = {
'pat_id': patient_id,
'type_id': type_id,
'ref': external_reference,
'pk_epi': pk_episode
}
if patient_id is not None:
where_parts.append('pk_patient = %(pat_id)s')
if type_id is not None:
where_parts.append('pk_type = %(type_id)s')
if external_reference is not None:
where_parts.append('ext_ref = %(ref)s')
if pk_episode is not None:
where_parts.append('pk_episode = %(pk_epi)s')
if pk_types is not None:
where_parts.append('pk_type = ANY(%(pk_types)s)')
args['pk_types'] = pk_types
cmd = _SQL_get_document_fields % ' AND '.join(where_parts)
rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
return [ cDocument(row = {'data': r, 'pk_field': 'pk_doc'}) for r in rows ]
#------------------------------------------------------------
def delete_document(document_id=None, encounter_id=None):
# cascades to doc_obj and doc_desc but not bill.bill
cmd = "SELECT blobs.delete_document(%(pk)s, %(enc)s)"
args = {'pk': document_id, 'enc': encounter_id}
rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}], return_data = True)
if not rows[0][0]:
_log.error('cannot delete document [%s]', document_id)
return False
return True
#------------------------------------------------------------
def reclassify_documents_by_type(original_type=None, target_type=None):
_log.debug('reclassifying documents by type')
_log.debug('original: %s', original_type)
_log.debug('target: %s', target_type)
if target_type['pk_doc_type'] == original_type['pk_doc_type']:
return True
cmd = """
update blobs.doc_med set
fk_type = %(new_type)s
WHERE
fk_type = %(old_type)s
"""
args = {'new_type': target_type['pk_doc_type'], 'old_type': original_type['pk_doc_type']}
gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}])
return True
#------------------------------------------------------------
def generate_failsafe_document_list_entries(pk_patient:int=None, max_width:int=80, eol:str=None) -> str|list:
lines = []
doc_folder = cDocumentFolder(aPKey = pk_patient)
docs = doc_folder.get_documents(order_by = 'clin_when DESC, l10n_type')
for doc in docs:
lines.append('')
lines.extend(doc.format_for_failsafe_output(max_width = max_width))
if not eol:
return lines
return eol.join(lines)
#============================================================
class cDocumentFolder:
"""Represents a folder with medical documents for a single patient."""
def __init__(self, aPKey = None):
"""Fails if
- patient referenced by aPKey does not exist
"""
self.pk_patient = aPKey # == identity.pk == primary key
if not self._pkey_exists():
raise gmExceptions.ConstructorError("No patient with PK [%s] in database." % aPKey)
# register backend notification interests
# (keep this last so we won't hang on threads when
# failing this constructor for other reasons ...)
# if not self._register_interests():
# raise gmExceptions.ConstructorError, "cannot register signal interests"
_log.debug('instantiated document folder for patient [%s]' % self.pk_patient)
#--------------------------------------------------------
def cleanup(self):
pass
#--------------------------------------------------------
# internal helper
#--------------------------------------------------------
def _pkey_exists(self) -> bool:
"""Does this primary key (= patient) exist ?
- true/false/None
"""
# patient in demographic database ?
SQL = 'SELECT exists(SELECT 1 FROM dem.identity WHERE pk = %(pk_pat)s)'
args = {'pk_pat': self.pk_patient}
rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
if not rows[0][0]:
_log.error("patient [%s] not in demographic database" % self.pk_patient)
return None
return True
#--------------------------------------------------------
# API
#--------------------------------------------------------
def get_latest_freediams_prescription(self):
cmd = """
SELECT pk_doc
FROM blobs.v_doc_med
WHERE
pk_patient = %(pat)s
AND
type = %(typ)s
AND
ext_ref = %(ref)s
ORDER BY
clin_when DESC
LIMIT 1
"""
args = {
'pat': self.pk_patient,
'typ': DOCUMENT_TYPE_PRESCRIPTION,
'ref': 'FreeDiams'
}
rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
if len(rows) == 0:
_log.info('no FreeDiams prescription available for patient [%s]' % self.pk_patient)
return None
prescription = cDocument(aPK_obj = rows[0][0])
return prescription
#--------------------------------------------------------
def get_latest_mugshot(self):
SQL = "SELECT pk_obj FROM blobs.v_latest_mugshot WHERE pk_patient = %(pk_pat)s"
args = {'pk_pat': self.pk_patient}
rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
if rows:
return cDocumentPart(aPK_obj = rows[0][0])
_log.info('no mugshots available for patient [%s]' % self.pk_patient)
return None
latest_mugshot = property(get_latest_mugshot)
#--------------------------------------------------------
def get_mugshot_list(self, latest_only=True):
if latest_only:
SQL = "SELECT pk_doc, pk_obj FROM blobs.v_latest_mugshot WHERE pk_patient = %(pk_pat)s"
else:
SQL = """
SELECT
vdm.pk_doc as pk_doc,
dobj.pk as pk_obj
FROM
blobs.v_doc_med vdm
blobs.doc_obj dobj
WHERE
vdm.pk_type = (SELECT pk FROM blobs.doc_type WHERE name = 'patient photograph')
and vdm.pk_patient = %(pk_pat)s
and dobj.fk_doc = vdm.pk_doc
"""
args = {'pk_pat': self.pk_patient}
rows = gmPG2.run_ro_queries(queries = [{'SQL': SQL, 'args': args}])
return rows
#--------------------------------------------------------
def get_doc_list(self, doc_type=None):
"""return flat list of document IDs"""
args = {
'ID': self.pk_patient,
'TYP': doc_type
}
cmd = """
SELECT vdm.pk_doc
FROM blobs.v_doc_med vdm
WHERE
vdm.pk_patient = %%(ID)s
%s
order by vdm.clin_when"""
if doc_type is None:
cmd = cmd % ''
else:
try:
int(doc_type)
cmd = cmd % 'and vdm.pk_type = %(TYP)s'
except (TypeError, ValueError):
cmd = cmd % 'and vdm.pk_type = (SELECT pk FROM blobs.doc_type WHERE name = %(TYP)s)'
rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
doc_ids = []
for row in rows:
doc_ids.append(row[0])
return doc_ids
#--------------------------------------------------------
def get_visual_progress_notes(self, episodes=None, encounter=None):
return self.get_documents (
doc_type = DOCUMENT_TYPE_VISUAL_PROGRESS_NOTE,
pk_episodes = episodes,
encounter = encounter
)
#--------------------------------------------------------
def get_unsigned_documents(self):
args = {'pat': self.pk_patient}
cmd = _SQL_get_document_fields % """
pk_doc = ANY (
SELECT DISTINCT ON (b_vo.pk_doc) b_vo.pk_doc
FROM blobs.v_obj4doc_no_data b_vo
WHERE
pk_patient = %(pat)s
AND
reviewed IS FALSE
)
ORDER BY clin_when DESC"""
rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
return [ cDocument(row = {'pk_field': 'pk_doc', 'data': r}) for r in rows ]
#--------------------------------------------------------
def get_documents(self, doc_type:str|int=None, pk_episodes:list[int]=None, encounter:int=None, order_by:str=None, exclude_unsigned:bool=False, pk_types:list[int]=None) -> list[cDocument]:
"""Return list of documents."""
args = {
'pat': self.pk_patient,
'type': doc_type,
'enc': encounter
}
where_parts = ['pk_patient = %(pat)s']
if doc_type is not None:
try:
int(doc_type)
where_parts.append('pk_type = %(type)s')
except (TypeError, ValueError):
where_parts.append('pk_type = (SELECT pk FROM blobs.doc_type WHERE name = %(type)s)')
if pk_types:
where_parts.append('pk_type = ANY(%(pk_types)s)')
args['pk_types'] = pk_types
if pk_episodes:
where_parts.append('pk_episode = ANY(%(epis)s)')
args['epis'] = pk_episodes
if encounter is not None:
where_parts.append('pk_encounter = %(enc)s')
if exclude_unsigned:
where_parts.append('pk_doc = ANY(SELECT b_vo.pk_doc FROM blobs.v_obj4doc_no_data b_vo WHERE b_vo.pk_patient = %(pat)s AND b_vo.reviewed IS TRUE)')
if not order_by:
order_by = 'clin_when'
order_by_clause = 'ORDER BY %s' % order_by
cmd = "%s\n%s" % (_SQL_get_document_fields % ' AND '.join(where_parts), order_by_clause)
rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
return [ cDocument(row = {'pk_field': 'pk_doc', 'data': r}) for r in rows ]
documents = property(get_documents)
#--------------------------------------------------------
def add_document(self, document_type=None, encounter=None, episode=None, link_obj=None):
return create_document(link_obj = link_obj, document_type = document_type, encounter = encounter, episode = episode)
#--------------------------------------------------------
def add_prescription(self, encounter=None, episode=None, link_obj=None):
return self.add_document (
link_obj = link_obj,
document_type = create_document_type (
document_type = DOCUMENT_TYPE_PRESCRIPTION
)['pk_doc_type'],
encounter = encounter,
episode = episode
)
#--------------------------------------------------------
def _get_all_document_org_units(self):
cmd = gmOrganization._SQL_get_org_unit % (
'pk_org_unit IN (SELECT DISTINCT ON (pk_org_unit) pk_org_unit FROM blobs.v_doc_med WHERE pk_patient = %(pat)s)'
)
args = {'pat': self.pk_patient}
rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
return [ gmOrganization.cOrgUnit(row = {'data': r, 'pk_field': 'pk_org_unit'}) for r in rows ]
all_document_org_units = property(_get_all_document_org_units)
#============================================================
class cDocumentType(gmBusinessDBObject.cBusinessDBObject):
"""Represents a document type."""
_cmd_fetch_payload = """SELECT * FROM blobs.v_doc_type WHERE pk_doc_type=%s"""
_cmds_store_payload = [
"""update blobs.doc_type set
name = %(type)s
WHERE
pk=%(pk_obj)s and
xmin=%(xmin_doc_type)s""",
"""SELECT xmin_doc_type FROM blobs.v_doc_type WHERE pk_doc_type = %(pk_obj)s"""
]
_updatable_fields = ['type']
#--------------------------------------------------------
def set_translation(self, translation=None):
if translation.strip() == '':
return False
if translation.strip() == self._payload['l10n_type'].strip():
return True
args = {
'orig': self._payload['type'],
'tx': translation
}
queries = [{
'sql': 'SELECT i18n.i18n(%(orig)s)',
'args': args
}, {
'sql': 'SELECT i18n.upd_tx((SELECT i18n.get_curr_lang()), %(orig)s, %(tx)s)',
'args': args
}]
rows = gmPG2.run_rw_queries(queries = queries, return_data = True)
if not rows[0][0]:
_log.error('cannot set translation to [%s]' % translation)
return False
return self.refetch_payload()
#------------------------------------------------------------
def get_document_types():
rows = gmPG2.run_ro_queries (
queries = [{'sql': "SELECT * FROM blobs.v_doc_type"}]
)
doc_types = []
for row in rows:
row_def = {'pk_field': 'pk_doc_type', 'data': row}
doc_types.append(cDocumentType(row = row_def))
return doc_types
#------------------------------------------------------------
def get_document_type_pk(document_type=None):
args = {'typ': document_type.strip()}
cmd = 'SELECT pk FROM blobs.doc_type WHERE name = %(typ)s'
rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
if len(rows) == 0:
cmd = 'SELECT pk FROM blobs.doc_type WHERE _(name) = %(typ)s'
rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
if len(rows) == 0:
return None
return rows[0]['pk']
#------------------------------------------------------------
def map_types2pk(document_types=None):
args = {'types': document_types}
cmd = 'SELECT pk_doc_type, coalesce(l10n_type, type) as desc FROM blobs.v_doc_type WHERE l10n_type = ANY(%(types)s) OR type = ANY(%(types)s)'
rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}])
return rows
#------------------------------------------------------------
def create_document_type(document_type:str=None) -> cDocumentType:
# check for potential dupes:
SQL = 'SELECT pk FROM blobs.doc_type WHERE name = %(doc_type)s'
args = {'doc_type': document_type}
rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}])
if rows:
return cDocumentType(aPK_obj = rows[0][0])
_log.debug('creating document type [%s]', document_type)
SQL = "INSERT INTO blobs.doc_type (name) VALUES (%(doc_type)s) RETURNING pk"
args = {'doc_type': document_type}
rows = gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}], return_data = True)
return cDocumentType(aPK_obj = rows[0][0])
#------------------------------------------------------------
def delete_document_type(document_type=None):
if document_type['is_in_use']:
return False
SQL = 'DELETE FROM blobs.doc_type WHERE pk = %(pk_doc_type)s'
args = {'pk_doc_type': document_type['pk_doc_type']}
gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}])
return True
#------------------------------------------------------------
def get_ext_ref():
"""This needs *considerably* more smarts."""
dirname = gmTools.get_unique_filename (
prefix = '',
suffix = time.strftime(".%Y%m%d-%H%M%S", time.localtime())
)
# extract name for dir
path, doc_ID = os.path.split(dirname)
return doc_ID
#============================================================
def check_mimetypes_in_archive():
mimetypes = {}
cmd = 'SELECT pk FROM blobs.doc_med'
doc_pks = gmPG2.run_ro_queries(queries = [{'sql': cmd}])
print('Detecting mimetypes in document archive ...')
doc_idx = 0
part_count = 0
for pk_row in doc_pks:
doc_idx += 1
print('\n#%s - document %s of %s: ' % (pk_row['pk'], doc_idx, len(doc_pks)), end = '')
doc = cDocument(aPK_obj = pk_row['pk'])
for part in doc.parts:
part_count += 1
print('#%s:%s bytes, ' % (part['pk_obj'], part['size']), end = '')
part_fname = part.save_to_file()
mimetype = gmMimeLib.guess_mimetype(part_fname)
try:
mimetypes[mimetype]['count'] += 1
except KeyError:
mimetypes[mimetype] = {
'count': 1,
'viewer': gmMimeLib.get_viewer_cmd(mimetype),
'editor': gmMimeLib.get_editor_cmd(mimetype),
'extension': gmMimeLib.guess_ext_by_mimetype(mimetype)
}
print('')
print('')
print('Number of documents :', len(doc_pks))
print('Number of parts :', part_count)
print('Number of mime types:', len(mimetypes))
for mimetype in mimetypes:
print('')
print('<%s>' % mimetype)
print(' Extension:', mimetypes[mimetype]['extension'])
print(' Use count:', mimetypes[mimetype]['count'])
print(' Viewer:', mimetypes[mimetype]['viewer'])
print(' Editor:', mimetypes[mimetype]['editor'])
return 0
#============================================================
# main
#------------------------------------------------------------
if __name__ == '__main__':
if len(sys.argv) < 2:
sys.exit()
if sys.argv[1] != 'test':
sys.exit()
#--------------------------------------------------------
def test_doc_types():
print("----------------------")
print("listing document types")
print("----------------------")
for dt in get_document_types():
print(dt)
print("------------------------------")
print("testing document type handling")
print("------------------------------")
dt = create_document_type(document_type = 'dummy doc type for unit test 1')
print("created:", dt)
dt['type'] = 'dummy doc type for unit test 2'
dt.save_payload()
print("changed base name:", dt)
dt.set_translation(translation = 'Dummy-Dokumenten-Typ fuer Unit-Test')
print("translated:", dt)
print("deleted:", delete_document_type(document_type = dt))
return
#--------------------------------------------------------
def test_adding_doc_part():
print("-----------------------")
print("testing document import")
print("-----------------------")
docs = search_for_documents(patient_id=12)
doc = docs[0]
print("adding to doc:", doc)
fname = sys.argv[1]
print("adding from file:", fname)
part = doc.add_part(file=fname)
print("new part:", part)
return
#--------------------------------------------------------
def test_get_documents():
doc_folder = cDocumentFolder(aPKey=12)
#photo = doc_folder.get_latest_mugshot()
#print type(photo), photo
docs = doc_folder.get_documents()#pk_types = [16])
for doc in docs:
#print type(doc), doc
#print doc.parts
#print doc.format_single_line()
print('--------------------------')
#print(doc.format(single_line = True))
#print(doc.format())
print(doc.format_for_failsafe_output())
#print(doc['pk_type'])
#--------------------------------------------------------
def test_save_to_file():
doc_folder = cDocumentFolder(aPKey=12)
docs = doc_folder.get_documents()
for doc in docs:
for part in doc.parts:
print(part.save_to_file(target_mime = 'application/pdf', ignore_conversion_problems = True))
#--------------------------------------------------------
def test_get_useful_filename():
pk = 12
from Gnumed.business.gmPerson import cPatient
pat = cPatient(pk)
doc_folder = cDocumentFolder(aPKey = pk)
for doc in doc_folder.documents:
for part in doc.parts:
print(part.get_useful_filename (
patient = pat,
make_unique = True,
directory = None,
include_gnumed_tag = False,
date_before_type = True,
name_first = False
))
#--------------------------------------------------------
def test_check_mimetypes_in_archive():
check_mimetypes_in_archive()
#--------------------------------------------------------
def test_part_metainfo_formatter():
#--------------------------------
def desc_printer(worker_result):
status, desc = worker_result
print('printer callback:')
print(status)
print(desc)
print('<hit key> for next')
return
#--------------------------------
pk = 12
# from Gnumed.business.gmPerson import cPatient
# pat = cPatient(pk)
doc_folder = cDocumentFolder(aPKey = pk)
for doc in doc_folder.documents:
for part in doc.parts:
part.format_metainfo(callback = desc_printer)
input('waiting ...')
# success, desc = part.format_metainfo()
# print(success)
# print(desc)
# input('next')
# print(part.get_useful_filename (
# patient = pat,
# make_unique = True,
# directory = None,
# include_gnumed_tag = False,
# date_before_type = True,
# name_first = False
# ))
#--------------------------------------------------------
from Gnumed.pycommon import gmI18N
gmI18N.activate_locale()
gmI18N.install_domain()
gmPG2.request_login_params(setup_pool = True)
#test_doc_types()
#test_adding_doc_part()
test_get_documents()
#test_get_useful_filename()
#test_part_metainfo_formatter()
#test_check_mimetypes_in_archive()
#test_save_to_file()
# print get_ext_ref()
Functions
def check_mimetypes_in_archive()
-
Expand source code
def check_mimetypes_in_archive(): mimetypes = {} cmd = 'SELECT pk FROM blobs.doc_med' doc_pks = gmPG2.run_ro_queries(queries = [{'sql': cmd}]) print('Detecting mimetypes in document archive ...') doc_idx = 0 part_count = 0 for pk_row in doc_pks: doc_idx += 1 print('\n#%s - document %s of %s: ' % (pk_row['pk'], doc_idx, len(doc_pks)), end = '') doc = cDocument(aPK_obj = pk_row['pk']) for part in doc.parts: part_count += 1 print('#%s:%s bytes, ' % (part['pk_obj'], part['size']), end = '') part_fname = part.save_to_file() mimetype = gmMimeLib.guess_mimetype(part_fname) try: mimetypes[mimetype]['count'] += 1 except KeyError: mimetypes[mimetype] = { 'count': 1, 'viewer': gmMimeLib.get_viewer_cmd(mimetype), 'editor': gmMimeLib.get_editor_cmd(mimetype), 'extension': gmMimeLib.guess_ext_by_mimetype(mimetype) } print('') print('') print('Number of documents :', len(doc_pks)) print('Number of parts :', part_count) print('Number of mime types:', len(mimetypes)) for mimetype in mimetypes: print('') print('<%s>' % mimetype) print(' Extension:', mimetypes[mimetype]['extension']) print(' Use count:', mimetypes[mimetype]['count']) print(' Viewer:', mimetypes[mimetype]['viewer']) print(' Editor:', mimetypes[mimetype]['editor']) return 0
def create_document(document_type=None, encounter=None, episode=None, link_obj=None)
-
Returns new document instance or raises an exception.
Expand source code
def create_document(document_type=None, encounter=None, episode=None, link_obj=None): """Returns new document instance or raises an exception.""" try: int(document_type) SQL = "INSERT INTO blobs.doc_med (fk_type, fk_encounter, fk_episode) VALUES (%(type)s, %(pk_enc)s, %(pk_epi)s) RETURNING pk" except ValueError: create_document_type(document_type = document_type) SQL = """ INSERT INTO blobs.doc_med ( fk_type, fk_encounter, fk_episode ) VALUES ( coalesce ( (SELECT pk FROM blobs.doc_type bdt WHERE bdt.name = %(type)s), (SELECT pk FROM blobs.doc_type bdt WHERE _(bdt.name) = %(type)s) ), %(pk_enc)s, %(pk_epi)s ) RETURNING pk""" args = {'type': document_type} try: args['pk_enc'] = int(encounter) except (TypeError, ValueError): args['pk_enc'] = encounter['pk_encounter'] try: args['pk_epi'] = int(episode) except (TypeError, ValueError): args['pk_epi'] = episode['pk_episode'] rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'sql': SQL, 'args': args}], return_data = True) doc = cDocument(aPK_obj = rows[0][0], link_obj = link_obj) return doc
def create_document_type(document_type: str = None) ‑> cDocumentType
-
Expand source code
def create_document_type(document_type:str=None) -> cDocumentType: # check for potential dupes: SQL = 'SELECT pk FROM blobs.doc_type WHERE name = %(doc_type)s' args = {'doc_type': document_type} rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}]) if rows: return cDocumentType(aPK_obj = rows[0][0]) _log.debug('creating document type [%s]', document_type) SQL = "INSERT INTO blobs.doc_type (name) VALUES (%(doc_type)s) RETURNING pk" args = {'doc_type': document_type} rows = gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}], return_data = True) return cDocumentType(aPK_obj = rows[0][0])
def delete_document(document_id=None, encounter_id=None)
-
Expand source code
def delete_document(document_id=None, encounter_id=None): # cascades to doc_obj and doc_desc but not bill.bill cmd = "SELECT blobs.delete_document(%(pk)s, %(enc)s)" args = {'pk': document_id, 'enc': encounter_id} rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}], return_data = True) if not rows[0][0]: _log.error('cannot delete document [%s]', document_id) return False return True
def delete_document_part(part_pk=None, encounter_pk=None)
-
Expand source code
def delete_document_part(part_pk=None, encounter_pk=None): cmd = """ SELECT blobs.delete_document_part(%(pk)s, %(enc)s) WHERE NOT EXISTS (SELECT 1 FROM clin.export_item WHERE fk_doc_obj = %(pk)s) """ args = {'pk': part_pk, 'enc': encounter_pk} gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return
def delete_document_type(document_type=None)
-
Expand source code
def delete_document_type(document_type=None): if document_type['is_in_use']: return False SQL = 'DELETE FROM blobs.doc_type WHERE pk = %(pk_doc_type)s' args = {'pk_doc_type': document_type['pk_doc_type']} gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}]) return True
def generate_failsafe_document_list_entries(pk_patient: int = None, max_width: int = 80, eol: str = None) ‑> str | list
-
Expand source code
def generate_failsafe_document_list_entries(pk_patient:int=None, max_width:int=80, eol:str=None) -> str|list: lines = [] doc_folder = cDocumentFolder(aPKey = pk_patient) docs = doc_folder.get_documents(order_by = 'clin_when DESC, l10n_type') for doc in docs: lines.append('') lines.extend(doc.format_for_failsafe_output(max_width = max_width)) if not eol: return lines return eol.join(lines)
def get_document_type_pk(document_type=None)
-
Expand source code
def get_document_type_pk(document_type=None): args = {'typ': document_type.strip()} cmd = 'SELECT pk FROM blobs.doc_type WHERE name = %(typ)s' rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: cmd = 'SELECT pk FROM blobs.doc_type WHERE _(name) = %(typ)s' rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return rows[0]['pk']
def get_document_types()
-
Expand source code
def get_document_types(): rows = gmPG2.run_ro_queries ( queries = [{'sql': "SELECT * FROM blobs.v_doc_type"}] ) doc_types = [] for row in rows: row_def = {'pk_field': 'pk_doc_type', 'data': row} doc_types.append(cDocumentType(row = row_def)) return doc_types
def get_ext_ref()
-
This needs considerably more smarts.
Expand source code
def get_ext_ref(): """This needs *considerably* more smarts.""" dirname = gmTools.get_unique_filename ( prefix = '', suffix = time.strftime(".%Y%m%d-%H%M%S", time.localtime()) ) # extract name for dir path, doc_ID = os.path.split(dirname) return doc_ID
def map_types2pk(document_types=None)
-
Expand source code
def map_types2pk(document_types=None): args = {'types': document_types} cmd = 'SELECT pk_doc_type, coalesce(l10n_type, type) as desc FROM blobs.v_doc_type WHERE l10n_type = ANY(%(types)s) OR type = ANY(%(types)s)' rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return rows
def reclassify_documents_by_type(original_type=None, target_type=None)
-
Expand source code
def reclassify_documents_by_type(original_type=None, target_type=None): _log.debug('reclassifying documents by type') _log.debug('original: %s', original_type) _log.debug('target: %s', target_type) if target_type['pk_doc_type'] == original_type['pk_doc_type']: return True cmd = """ update blobs.doc_med set fk_type = %(new_type)s WHERE fk_type = %(old_type)s """ args = {'new_type': target_type['pk_doc_type'], 'old_type': original_type['pk_doc_type']} gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True
def search_for_documents(patient_id=None, type_id=None, external_reference=None, pk_episode=None, pk_types=None)
-
Searches for documents with the given patient and type ID.
Expand source code
def search_for_documents(patient_id=None, type_id=None, external_reference=None, pk_episode=None, pk_types=None): """Searches for documents with the given patient and type ID.""" if (patient_id is None) and (pk_episode is None): raise ValueError('need patient_id or pk_episode to search for document') where_parts = [] args = { 'pat_id': patient_id, 'type_id': type_id, 'ref': external_reference, 'pk_epi': pk_episode } if patient_id is not None: where_parts.append('pk_patient = %(pat_id)s') if type_id is not None: where_parts.append('pk_type = %(type_id)s') if external_reference is not None: where_parts.append('ext_ref = %(ref)s') if pk_episode is not None: where_parts.append('pk_episode = %(pk_epi)s') if pk_types is not None: where_parts.append('pk_type = ANY(%(pk_types)s)') args['pk_types'] = pk_types cmd = _SQL_get_document_fields % ' AND '.join(where_parts) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cDocument(row = {'data': r, 'pk_field': 'pk_doc'}) for r in rows ]
Classes
class cDocument (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Represents one medical document.
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Expand source code
class cDocument(gmBusinessDBObject.cBusinessDBObject): """Represents one medical document.""" _cmd_fetch_payload = _SQL_get_document_fields % "pk_doc = %s" _cmds_store_payload = [ """UPDATE blobs.doc_med SET fk_type = %(pk_type)s, fk_episode = %(pk_episode)s, fk_encounter = %(pk_encounter)s, fk_org_unit = %(pk_org_unit)s, unit_is_receiver = %(unit_is_receiver)s, clin_when = %(clin_when)s, comment = gm.nullify_empty_string(%(comment)s), ext_ref = gm.nullify_empty_string(%(ext_ref)s), fk_hospital_stay = %(pk_hospital_stay)s WHERE pk = %(pk_doc)s and xmin = %(xmin_doc_med)s RETURNING xmin AS xmin_doc_med""" ] _updatable_fields = [ 'pk_type', 'comment', 'clin_when', 'ext_ref', 'pk_episode', 'pk_encounter', # mainly useful when moving visual progress notes to their respective encounters 'pk_org_unit', 'unit_is_receiver', 'pk_hospital_stay' ] #-------------------------------------------------------- def refetch_payload(self, ignore_changes=False, link_obj=None): try: del self.__has_unreviewed_parts except AttributeError: pass return super().refetch_payload(ignore_changes = ignore_changes, link_obj = link_obj) #-------------------------------------------------------- def get_descriptions(self, max_lng:int=250): """Get document descriptions. - will return a list of rows """ if max_lng is None: SQL = "SELECT pk, text FROM blobs.doc_desc WHERE fk_doc = %(pk_doc)s" else: SQL = "SELECT pk, substring(text FROM 1 for %s) FROM blobs.doc_desc WHERE fk_doc = %%(pk_doc)s" % max_lng args = {'pk_doc': self.pk_obj} rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}]) return rows #-------------------------------------------------------- def add_description(self, description=None): SQL = 'INSERT INTO blobs.doc_desc (fk_doc, text) values (%(pk_doc)s, %(desc)s)' args = {'pk_doc': self.pk_obj, 'desc': description} gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}]) return True #-------------------------------------------------------- def update_description(self, pk=None, description=None): cmd = "update blobs.doc_desc set text = %(desc)s WHERE fk_doc = %(doc)s and pk = %(pk_desc)s" gmPG2.run_rw_queries(queries = [ {'sql': cmd, 'args': {'doc': self.pk_obj, 'pk_desc': pk, 'desc': description}} ]) return True #-------------------------------------------------------- def delete_description(self, pk=None): cmd = "DELETE FROM blobs.doc_desc WHERE fk_doc = %(doc)s and pk = %(desc)s" gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': {'doc': self.pk_obj, 'desc': pk}}]) return True #-------------------------------------------------------- def _get_parts(self): SQL = _SQL_get_document_part_fields % 'pk_doc = %(pk_doc)s ORDER BY seq_idx' args = {'pk_doc': self.pk_obj} rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}]) return [ cDocumentPart(row = {'pk_field': 'pk_obj', 'data': r}) for r in rows ] parts = property(_get_parts) #-------------------------------------------------------- def add_part(self, file=None, link_obj=None): """Add a part to the document.""" # create dummy part cmd = """ INSERT INTO blobs.doc_obj ( fk_doc, data, seq_idx ) VALUES ( %(doc_id)s, ''::bytea, (SELECT coalesce(max(seq_idx)+1, 1) FROM blobs.doc_obj WHERE fk_doc = %(doc_id)s) ) RETURNING pk""" rows = gmPG2.run_rw_queries ( link_obj = link_obj, queries = [{'sql': cmd, 'args': {'doc_id': self.pk_obj}}], return_data = True ) # init document part instance pk_part = rows[0][0] new_part = cDocumentPart(aPK_obj = pk_part, link_obj = link_obj) if not new_part.update_data_from_file(link_obj = link_obj, fname = file): _log.error('cannot import binary data from [%s] into document part' % file) SQL = 'DELETE FROM blobs.doc_obj WHERE pk = %(pk_part)s' args = {'pk_part': pk_part} gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'sql': SQL, 'args': args}]) return None new_part['filename'] = file new_part.save_payload(conn = link_obj) return new_part #-------------------------------------------------------- def add_parts_from_files(self, files=None, reviewer=None): new_parts = [] for filename in files: new_part = self.add_part(file = filename) if new_part is None: msg = 'cannot instantiate document part object from [%s]' % filename _log.error(msg) return (False, msg, filename) new_parts.append(new_part) if reviewer is not None: new_part['pk_intended_reviewer'] = reviewer # None == Null success, data = new_part.save_payload() if not success: msg = 'cannot set reviewer to [%s] on [%s]' % (reviewer, filename) _log.error(msg) _log.error(str(data)) return (False, msg, filename) return (True, '', new_parts) #-------------------------------------------------------- def save_parts_to_files(self, export_dir=None, chunksize=0, conn=None): fnames = [] for part in self.parts: fname = part.save_to_file(aChunkSize = chunksize, directory = export_dir, conn = conn) if fname is None: _log.error('cannot export document part [%s]', part) continue fnames.append(fname) return fnames #-------------------------------------------------------- def add_part_from_incoming(self, pk_incoming:int=None, pk_reviewer:int=None, conn=None): SQL = """ INSERT INTO blobs.doc_obj ( fk_doc, data, seq_idx ) VALUES ( %(doc_id)s, ''::bytea, (SELECT coalesce(max(seq_idx)+1, 1) FROM blobs.doc_obj WHERE fk_doc = %(doc_id)s) ) RETURNING pk """ args = {'doc_id': self.pk_obj} rows = gmPG2.run_rw_queries(link_obj = conn, queries = [{'sql': SQL, 'args': args}], return_data = True) new_part = cDocumentPart(aPK_obj = rows[0]['pk'], link_obj = conn) if not new_part.update_data_from_incoming(conn = conn, pk_incoming = pk_incoming): return None if not pk_reviewer: return new_part new_part['pk_intended_reviewer'] = pk_reviewer success, data = new_part.save(conn = conn) if success: return new_part msg = 'cannot set reviewer to [%s] on [%s]' % (pk_reviewer, pk_incoming) _log.error(msg) _log.error(str(data)) return None #-------------------------------------------------------- def add_parts_from_incoming(self, incoming_data:list=None, pk_reviewer:int=None, conn=None): """Add parts to document from gmIncomingData.cIncomingData instances. Returns: A tuple (success state, data). On failure data is an error message. On success data is the list of new parts. """ new_parts = [] for incoming in incoming_data: part = self.add_part_from_incoming ( pk_incoming = incoming['pk_incoming_data'], pk_reviewer = pk_reviewer, conn = conn ) if part: new_parts.append(part) continue msg = 'cannot instantiate document part from incoming [%s]' % incoming['pk_incoming_data'] _log.error(msg) return False, msg return True, new_parts #-------------------------------------------------------- def _get_has_unreviewed_parts(self): try: return self.__has_unreviewed_parts # pylint: disable=access-member-before-definition except AttributeError: pass cmd = "SELECT EXISTS(SELECT 1 FROM blobs.v_obj4doc_no_data WHERE pk_doc = %(pk)s AND reviewed IS FALSE)" args = {'pk': self.pk_obj} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) self.__has_unreviewed_parts = rows[0][0] return self.__has_unreviewed_parts has_unreviewed_parts = property(_get_has_unreviewed_parts) #-------------------------------------------------------- def set_reviewed(self, technically_abnormal=None, clinically_relevant=None): # FIXME: this is probably inefficient for part in self.parts: if not part.set_reviewed(technically_abnormal, clinically_relevant): return False return True #-------------------------------------------------------- def set_primary_reviewer(self, reviewer=None): for part in self.parts: part['pk_intended_reviewer'] = reviewer success, data = part.save_payload() if not success: _log.error('cannot set reviewer to [%s]' % reviewer) _log.error(str(data)) return False return True #-------------------------------------------------------- def format_single_line(self): part_count = len(self._payload['seq_idx_list']) if part_count == 0: parts = _('no parts') elif part_count == 1: parts = _('1 part') else: parts = _('%s parts') % part_count detail = '' if self._payload['ext_ref'] is not None: detail = self._payload['ext_ref'] if self._payload['unit'] is not None: template = _('%s of %s') if detail == '': detail = _('%s of %s') % ( self._payload['unit'], self._payload['organization'] ) else: detail += (' @ ' + template % ( self._payload['unit'], self._payload['organization'] )) if detail != '': detail = ' (%s)' % detail return '%s %s (%s):%s%s' % ( self._payload['clin_when'].strftime('%Y %b %d'), self._payload['l10n_type'], parts, gmTools.coalesce(self._payload['comment'], '', ' "%s"'), detail ) #-------------------------------------------------------- def format(self, single_line=False): if single_line: return self.format_single_line() part_count = len(self._payload['seq_idx_list']) if part_count == 0: parts = _('no parts') elif part_count == 1: parts = _('1 part') else: parts = _('%s parts') % part_count org = '' if self._payload['unit'] is not None: if self._payload['unit_is_receiver']: org = _(' Receiver: %s @ %s\n') % ( self._payload['unit'], self._payload['organization'] ) else: org = _(' Sender: %s @ %s\n') % ( self._payload['unit'], self._payload['organization'] ) stay = '' if self._payload['pk_hospital_stay'] is not None: stay = _('Hospital stay') + ': %s\n' % self.hospital_stay.format ( left_margin = 0, include_procedures = False, include_docs = False, include_episode = False ) txt = _( '%s (%s) #%s\n' ' Created: %s\n' ' Episode: %s\n' '%s' '%s' '%s' '%s' '%s' ) % ( self._payload['l10n_type'], parts, self._payload['pk_doc'], self._payload['clin_when'].strftime('%Y %b %d'), self._payload['episode'], gmTools.coalesce(self._payload['health_issue'], '', _(' Health issue: %s\n')), gmTools.coalesce(self._payload['ext_ref'], '', _(' External reference: %s\n')), org, stay, gmTools.coalesce(self._payload['comment'], '', ' %s') ) return txt #-------------------------------------------------------- def format_for_failsafe_output(self, max_width:int=80) -> list[str]: lines = [ '%s: %s' % (self._payload['clin_when'].strftime('%Y %B %d'), self._payload['l10n_type']) ] if self._payload['unit'] and not self._payload['unit_is_receiver']: lines.append(' ' + _('From: %s @ %s') % (self._payload['unit'], self._payload['organization'])) return lines #-------------------------------------------------------- def _get_hospital_stay(self): if self._payload['pk_hospital_stay'] is None: return None from Gnumed.business import gmEMRStructItems return gmEMRStructItems.cHospitalStay(self._payload['pk_hospital_stay']) hospital_stay = property(_get_hospital_stay) #-------------------------------------------------------- def _get_org_unit(self): if self._payload['pk_org_unit'] is None: return None return gmOrganization.cOrgUnit(self._payload['pk_org_unit']) org_unit = property(_get_org_unit) #-------------------------------------------------------- def _get_procedures(self): from Gnumed.business.gmEMRStructItems import get_procedures4document return get_procedures4document(pk_document = self.pk_obj) procedures = property(_get_procedures) #-------------------------------------------------------- def _get_bills(self): from Gnumed.business.gmBilling import get_bills4document return get_bills4document(pk_document = self.pk_obj) bills = property(_get_bills)
Ancestors
Instance variables
var bills
-
Expand source code
def _get_bills(self): from Gnumed.business.gmBilling import get_bills4document return get_bills4document(pk_document = self.pk_obj)
var has_unreviewed_parts
-
Expand source code
def _get_has_unreviewed_parts(self): try: return self.__has_unreviewed_parts # pylint: disable=access-member-before-definition except AttributeError: pass cmd = "SELECT EXISTS(SELECT 1 FROM blobs.v_obj4doc_no_data WHERE pk_doc = %(pk)s AND reviewed IS FALSE)" args = {'pk': self.pk_obj} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) self.__has_unreviewed_parts = rows[0][0] return self.__has_unreviewed_parts
var hospital_stay
-
Expand source code
def _get_hospital_stay(self): if self._payload['pk_hospital_stay'] is None: return None from Gnumed.business import gmEMRStructItems return gmEMRStructItems.cHospitalStay(self._payload['pk_hospital_stay'])
var org_unit
-
Expand source code
def _get_org_unit(self): if self._payload['pk_org_unit'] is None: return None return gmOrganization.cOrgUnit(self._payload['pk_org_unit'])
var parts
-
Expand source code
def _get_parts(self): SQL = _SQL_get_document_part_fields % 'pk_doc = %(pk_doc)s ORDER BY seq_idx' args = {'pk_doc': self.pk_obj} rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}]) return [ cDocumentPart(row = {'pk_field': 'pk_obj', 'data': r}) for r in rows ]
var procedures
-
Expand source code
def _get_procedures(self): from Gnumed.business.gmEMRStructItems import get_procedures4document return get_procedures4document(pk_document = self.pk_obj)
Methods
def add_description(self, description=None)
-
Expand source code
def add_description(self, description=None): SQL = 'INSERT INTO blobs.doc_desc (fk_doc, text) values (%(pk_doc)s, %(desc)s)' args = {'pk_doc': self.pk_obj, 'desc': description} gmPG2.run_rw_queries(queries = [{'sql': SQL, 'args': args}]) return True
def add_part(self, file=None, link_obj=None)
-
Add a part to the document.
Expand source code
def add_part(self, file=None, link_obj=None): """Add a part to the document.""" # create dummy part cmd = """ INSERT INTO blobs.doc_obj ( fk_doc, data, seq_idx ) VALUES ( %(doc_id)s, ''::bytea, (SELECT coalesce(max(seq_idx)+1, 1) FROM blobs.doc_obj WHERE fk_doc = %(doc_id)s) ) RETURNING pk""" rows = gmPG2.run_rw_queries ( link_obj = link_obj, queries = [{'sql': cmd, 'args': {'doc_id': self.pk_obj}}], return_data = True ) # init document part instance pk_part = rows[0][0] new_part = cDocumentPart(aPK_obj = pk_part, link_obj = link_obj) if not new_part.update_data_from_file(link_obj = link_obj, fname = file): _log.error('cannot import binary data from [%s] into document part' % file) SQL = 'DELETE FROM blobs.doc_obj WHERE pk = %(pk_part)s' args = {'pk_part': pk_part} gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'sql': SQL, 'args': args}]) return None new_part['filename'] = file new_part.save_payload(conn = link_obj) return new_part
def add_part_from_incoming(self, pk_incoming: int = None, pk_reviewer: int = None, conn=None)
-
Expand source code
def add_part_from_incoming(self, pk_incoming:int=None, pk_reviewer:int=None, conn=None): SQL = """ INSERT INTO blobs.doc_obj ( fk_doc, data, seq_idx ) VALUES ( %(doc_id)s, ''::bytea, (SELECT coalesce(max(seq_idx)+1, 1) FROM blobs.doc_obj WHERE fk_doc = %(doc_id)s) ) RETURNING pk """ args = {'doc_id': self.pk_obj} rows = gmPG2.run_rw_queries(link_obj = conn, queries = [{'sql': SQL, 'args': args}], return_data = True) new_part = cDocumentPart(aPK_obj = rows[0]['pk'], link_obj = conn) if not new_part.update_data_from_incoming(conn = conn, pk_incoming = pk_incoming): return None if not pk_reviewer: return new_part new_part['pk_intended_reviewer'] = pk_reviewer success, data = new_part.save(conn = conn) if success: return new_part msg = 'cannot set reviewer to [%s] on [%s]' % (pk_reviewer, pk_incoming) _log.error(msg) _log.error(str(data)) return None
def add_parts_from_files(self, files=None, reviewer=None)
-
Expand source code
def add_parts_from_files(self, files=None, reviewer=None): new_parts = [] for filename in files: new_part = self.add_part(file = filename) if new_part is None: msg = 'cannot instantiate document part object from [%s]' % filename _log.error(msg) return (False, msg, filename) new_parts.append(new_part) if reviewer is not None: new_part['pk_intended_reviewer'] = reviewer # None == Null success, data = new_part.save_payload() if not success: msg = 'cannot set reviewer to [%s] on [%s]' % (reviewer, filename) _log.error(msg) _log.error(str(data)) return (False, msg, filename) return (True, '', new_parts)
def add_parts_from_incoming(self, incoming_data: list = None, pk_reviewer: int = None, conn=None)
-
Add parts to document from gmIncomingData.cIncomingData instances.
Returns
A tuple (success state, data). On failure data is an error message. On success data is the list of new parts.
Expand source code
def add_parts_from_incoming(self, incoming_data:list=None, pk_reviewer:int=None, conn=None): """Add parts to document from gmIncomingData.cIncomingData instances. Returns: A tuple (success state, data). On failure data is an error message. On success data is the list of new parts. """ new_parts = [] for incoming in incoming_data: part = self.add_part_from_incoming ( pk_incoming = incoming['pk_incoming_data'], pk_reviewer = pk_reviewer, conn = conn ) if part: new_parts.append(part) continue msg = 'cannot instantiate document part from incoming [%s]' % incoming['pk_incoming_data'] _log.error(msg) return False, msg return True, new_parts
def delete_description(self, pk=None)
-
Expand source code
def delete_description(self, pk=None): cmd = "DELETE FROM blobs.doc_desc WHERE fk_doc = %(doc)s and pk = %(desc)s" gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': {'doc': self.pk_obj, 'desc': pk}}]) return True
def format_for_failsafe_output(self, max_width: int = 80) ‑> list[str]
-
Expand source code
def format_for_failsafe_output(self, max_width:int=80) -> list[str]: lines = [ '%s: %s' % (self._payload['clin_when'].strftime('%Y %B %d'), self._payload['l10n_type']) ] if self._payload['unit'] and not self._payload['unit_is_receiver']: lines.append(' ' + _('From: %s @ %s') % (self._payload['unit'], self._payload['organization'])) return lines
def format_single_line(self)
-
Expand source code
def format_single_line(self): part_count = len(self._payload['seq_idx_list']) if part_count == 0: parts = _('no parts') elif part_count == 1: parts = _('1 part') else: parts = _('%s parts') % part_count detail = '' if self._payload['ext_ref'] is not None: detail = self._payload['ext_ref'] if self._payload['unit'] is not None: template = _('%s of %s') if detail == '': detail = _('%s of %s') % ( self._payload['unit'], self._payload['organization'] ) else: detail += (' @ ' + template % ( self._payload['unit'], self._payload['organization'] )) if detail != '': detail = ' (%s)' % detail return '%s %s (%s):%s%s' % ( self._payload['clin_when'].strftime('%Y %b %d'), self._payload['l10n_type'], parts, gmTools.coalesce(self._payload['comment'], '', ' "%s"'), detail )
def get_descriptions(self, max_lng: int = 250)
-
Get document descriptions.
- will return a list of rows
Expand source code
def get_descriptions(self, max_lng:int=250): """Get document descriptions. - will return a list of rows """ if max_lng is None: SQL = "SELECT pk, text FROM blobs.doc_desc WHERE fk_doc = %(pk_doc)s" else: SQL = "SELECT pk, substring(text FROM 1 for %s) FROM blobs.doc_desc WHERE fk_doc = %%(pk_doc)s" % max_lng args = {'pk_doc': self.pk_obj} rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}]) return rows
def save_parts_to_files(self, export_dir=None, chunksize=0, conn=None)
-
Expand source code
def save_parts_to_files(self, export_dir=None, chunksize=0, conn=None): fnames = [] for part in self.parts: fname = part.save_to_file(aChunkSize = chunksize, directory = export_dir, conn = conn) if fname is None: _log.error('cannot export document part [%s]', part) continue fnames.append(fname) return fnames
def set_primary_reviewer(self, reviewer=None)
-
Expand source code
def set_primary_reviewer(self, reviewer=None): for part in self.parts: part['pk_intended_reviewer'] = reviewer success, data = part.save_payload() if not success: _log.error('cannot set reviewer to [%s]' % reviewer) _log.error(str(data)) return False return True
def set_reviewed(self, technically_abnormal=None, clinically_relevant=None)
-
Expand source code
def set_reviewed(self, technically_abnormal=None, clinically_relevant=None): # FIXME: this is probably inefficient for part in self.parts: if not part.set_reviewed(technically_abnormal, clinically_relevant): return False return True
def update_description(self, pk=None, description=None)
-
Expand source code
def update_description(self, pk=None, description=None): cmd = "update blobs.doc_desc set text = %(desc)s WHERE fk_doc = %(doc)s and pk = %(pk_desc)s" gmPG2.run_rw_queries(queries = [ {'sql': cmd, 'args': {'doc': self.pk_obj, 'pk_desc': pk, 'desc': description}} ]) return True
Inherited members
class cDocumentFolder (aPKey=None)
-
Represents a folder with medical documents for a single patient.
Fails if
- patient referenced by aPKey does not exist
Expand source code
class cDocumentFolder: """Represents a folder with medical documents for a single patient.""" def __init__(self, aPKey = None): """Fails if - patient referenced by aPKey does not exist """ self.pk_patient = aPKey # == identity.pk == primary key if not self._pkey_exists(): raise gmExceptions.ConstructorError("No patient with PK [%s] in database." % aPKey) # register backend notification interests # (keep this last so we won't hang on threads when # failing this constructor for other reasons ...) # if not self._register_interests(): # raise gmExceptions.ConstructorError, "cannot register signal interests" _log.debug('instantiated document folder for patient [%s]' % self.pk_patient) #-------------------------------------------------------- def cleanup(self): pass #-------------------------------------------------------- # internal helper #-------------------------------------------------------- def _pkey_exists(self) -> bool: """Does this primary key (= patient) exist ? - true/false/None """ # patient in demographic database ? SQL = 'SELECT exists(SELECT 1 FROM dem.identity WHERE pk = %(pk_pat)s)' args = {'pk_pat': self.pk_patient} rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}]) if not rows[0][0]: _log.error("patient [%s] not in demographic database" % self.pk_patient) return None return True #-------------------------------------------------------- # API #-------------------------------------------------------- def get_latest_freediams_prescription(self): cmd = """ SELECT pk_doc FROM blobs.v_doc_med WHERE pk_patient = %(pat)s AND type = %(typ)s AND ext_ref = %(ref)s ORDER BY clin_when DESC LIMIT 1 """ args = { 'pat': self.pk_patient, 'typ': DOCUMENT_TYPE_PRESCRIPTION, 'ref': 'FreeDiams' } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: _log.info('no FreeDiams prescription available for patient [%s]' % self.pk_patient) return None prescription = cDocument(aPK_obj = rows[0][0]) return prescription #-------------------------------------------------------- def get_latest_mugshot(self): SQL = "SELECT pk_obj FROM blobs.v_latest_mugshot WHERE pk_patient = %(pk_pat)s" args = {'pk_pat': self.pk_patient} rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}]) if rows: return cDocumentPart(aPK_obj = rows[0][0]) _log.info('no mugshots available for patient [%s]' % self.pk_patient) return None latest_mugshot = property(get_latest_mugshot) #-------------------------------------------------------- def get_mugshot_list(self, latest_only=True): if latest_only: SQL = "SELECT pk_doc, pk_obj FROM blobs.v_latest_mugshot WHERE pk_patient = %(pk_pat)s" else: SQL = """ SELECT vdm.pk_doc as pk_doc, dobj.pk as pk_obj FROM blobs.v_doc_med vdm blobs.doc_obj dobj WHERE vdm.pk_type = (SELECT pk FROM blobs.doc_type WHERE name = 'patient photograph') and vdm.pk_patient = %(pk_pat)s and dobj.fk_doc = vdm.pk_doc """ args = {'pk_pat': self.pk_patient} rows = gmPG2.run_ro_queries(queries = [{'SQL': SQL, 'args': args}]) return rows #-------------------------------------------------------- def get_doc_list(self, doc_type=None): """return flat list of document IDs""" args = { 'ID': self.pk_patient, 'TYP': doc_type } cmd = """ SELECT vdm.pk_doc FROM blobs.v_doc_med vdm WHERE vdm.pk_patient = %%(ID)s %s order by vdm.clin_when""" if doc_type is None: cmd = cmd % '' else: try: int(doc_type) cmd = cmd % 'and vdm.pk_type = %(TYP)s' except (TypeError, ValueError): cmd = cmd % 'and vdm.pk_type = (SELECT pk FROM blobs.doc_type WHERE name = %(TYP)s)' rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) doc_ids = [] for row in rows: doc_ids.append(row[0]) return doc_ids #-------------------------------------------------------- def get_visual_progress_notes(self, episodes=None, encounter=None): return self.get_documents ( doc_type = DOCUMENT_TYPE_VISUAL_PROGRESS_NOTE, pk_episodes = episodes, encounter = encounter ) #-------------------------------------------------------- def get_unsigned_documents(self): args = {'pat': self.pk_patient} cmd = _SQL_get_document_fields % """ pk_doc = ANY ( SELECT DISTINCT ON (b_vo.pk_doc) b_vo.pk_doc FROM blobs.v_obj4doc_no_data b_vo WHERE pk_patient = %(pat)s AND reviewed IS FALSE ) ORDER BY clin_when DESC""" rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cDocument(row = {'pk_field': 'pk_doc', 'data': r}) for r in rows ] #-------------------------------------------------------- def get_documents(self, doc_type:str|int=None, pk_episodes:list[int]=None, encounter:int=None, order_by:str=None, exclude_unsigned:bool=False, pk_types:list[int]=None) -> list[cDocument]: """Return list of documents.""" args = { 'pat': self.pk_patient, 'type': doc_type, 'enc': encounter } where_parts = ['pk_patient = %(pat)s'] if doc_type is not None: try: int(doc_type) where_parts.append('pk_type = %(type)s') except (TypeError, ValueError): where_parts.append('pk_type = (SELECT pk FROM blobs.doc_type WHERE name = %(type)s)') if pk_types: where_parts.append('pk_type = ANY(%(pk_types)s)') args['pk_types'] = pk_types if pk_episodes: where_parts.append('pk_episode = ANY(%(epis)s)') args['epis'] = pk_episodes if encounter is not None: where_parts.append('pk_encounter = %(enc)s') if exclude_unsigned: where_parts.append('pk_doc = ANY(SELECT b_vo.pk_doc FROM blobs.v_obj4doc_no_data b_vo WHERE b_vo.pk_patient = %(pat)s AND b_vo.reviewed IS TRUE)') if not order_by: order_by = 'clin_when' order_by_clause = 'ORDER BY %s' % order_by cmd = "%s\n%s" % (_SQL_get_document_fields % ' AND '.join(where_parts), order_by_clause) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cDocument(row = {'pk_field': 'pk_doc', 'data': r}) for r in rows ] documents = property(get_documents) #-------------------------------------------------------- def add_document(self, document_type=None, encounter=None, episode=None, link_obj=None): return create_document(link_obj = link_obj, document_type = document_type, encounter = encounter, episode = episode) #-------------------------------------------------------- def add_prescription(self, encounter=None, episode=None, link_obj=None): return self.add_document ( link_obj = link_obj, document_type = create_document_type ( document_type = DOCUMENT_TYPE_PRESCRIPTION )['pk_doc_type'], encounter = encounter, episode = episode ) #-------------------------------------------------------- def _get_all_document_org_units(self): cmd = gmOrganization._SQL_get_org_unit % ( 'pk_org_unit IN (SELECT DISTINCT ON (pk_org_unit) pk_org_unit FROM blobs.v_doc_med WHERE pk_patient = %(pat)s)' ) args = {'pat': self.pk_patient} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ gmOrganization.cOrgUnit(row = {'data': r, 'pk_field': 'pk_org_unit'}) for r in rows ] all_document_org_units = property(_get_all_document_org_units)
Instance variables
var all_document_org_units
-
Expand source code
def _get_all_document_org_units(self): cmd = gmOrganization._SQL_get_org_unit % ( 'pk_org_unit IN (SELECT DISTINCT ON (pk_org_unit) pk_org_unit FROM blobs.v_doc_med WHERE pk_patient = %(pat)s)' ) args = {'pat': self.pk_patient} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ gmOrganization.cOrgUnit(row = {'data': r, 'pk_field': 'pk_org_unit'}) for r in rows ]
var documents : list[cDocument]
-
Return list of documents.
Expand source code
def get_documents(self, doc_type:str|int=None, pk_episodes:list[int]=None, encounter:int=None, order_by:str=None, exclude_unsigned:bool=False, pk_types:list[int]=None) -> list[cDocument]: """Return list of documents.""" args = { 'pat': self.pk_patient, 'type': doc_type, 'enc': encounter } where_parts = ['pk_patient = %(pat)s'] if doc_type is not None: try: int(doc_type) where_parts.append('pk_type = %(type)s') except (TypeError, ValueError): where_parts.append('pk_type = (SELECT pk FROM blobs.doc_type WHERE name = %(type)s)') if pk_types: where_parts.append('pk_type = ANY(%(pk_types)s)') args['pk_types'] = pk_types if pk_episodes: where_parts.append('pk_episode = ANY(%(epis)s)') args['epis'] = pk_episodes if encounter is not None: where_parts.append('pk_encounter = %(enc)s') if exclude_unsigned: where_parts.append('pk_doc = ANY(SELECT b_vo.pk_doc FROM blobs.v_obj4doc_no_data b_vo WHERE b_vo.pk_patient = %(pat)s AND b_vo.reviewed IS TRUE)') if not order_by: order_by = 'clin_when' order_by_clause = 'ORDER BY %s' % order_by cmd = "%s\n%s" % (_SQL_get_document_fields % ' AND '.join(where_parts), order_by_clause) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cDocument(row = {'pk_field': 'pk_doc', 'data': r}) for r in rows ]
var latest_mugshot
-
Expand source code
def get_latest_mugshot(self): SQL = "SELECT pk_obj FROM blobs.v_latest_mugshot WHERE pk_patient = %(pk_pat)s" args = {'pk_pat': self.pk_patient} rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}]) if rows: return cDocumentPart(aPK_obj = rows[0][0]) _log.info('no mugshots available for patient [%s]' % self.pk_patient) return None
Methods
def add_document(self, document_type=None, encounter=None, episode=None, link_obj=None)
-
Expand source code
def add_document(self, document_type=None, encounter=None, episode=None, link_obj=None): return create_document(link_obj = link_obj, document_type = document_type, encounter = encounter, episode = episode)
def add_prescription(self, encounter=None, episode=None, link_obj=None)
-
Expand source code
def add_prescription(self, encounter=None, episode=None, link_obj=None): return self.add_document ( link_obj = link_obj, document_type = create_document_type ( document_type = DOCUMENT_TYPE_PRESCRIPTION )['pk_doc_type'], encounter = encounter, episode = episode )
def cleanup(self)
-
Expand source code
def cleanup(self): pass
def get_doc_list(self, doc_type=None)
-
return flat list of document IDs
Expand source code
def get_doc_list(self, doc_type=None): """return flat list of document IDs""" args = { 'ID': self.pk_patient, 'TYP': doc_type } cmd = """ SELECT vdm.pk_doc FROM blobs.v_doc_med vdm WHERE vdm.pk_patient = %%(ID)s %s order by vdm.clin_when""" if doc_type is None: cmd = cmd % '' else: try: int(doc_type) cmd = cmd % 'and vdm.pk_type = %(TYP)s' except (TypeError, ValueError): cmd = cmd % 'and vdm.pk_type = (SELECT pk FROM blobs.doc_type WHERE name = %(TYP)s)' rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) doc_ids = [] for row in rows: doc_ids.append(row[0]) return doc_ids
def get_documents(self, doc_type: str | int = None, pk_episodes: list[int] = None, encounter: int = None, order_by: str = None, exclude_unsigned: bool = False, pk_types: list[int] = None) ‑> list[cDocument]
-
Return list of documents.
Expand source code
def get_documents(self, doc_type:str|int=None, pk_episodes:list[int]=None, encounter:int=None, order_by:str=None, exclude_unsigned:bool=False, pk_types:list[int]=None) -> list[cDocument]: """Return list of documents.""" args = { 'pat': self.pk_patient, 'type': doc_type, 'enc': encounter } where_parts = ['pk_patient = %(pat)s'] if doc_type is not None: try: int(doc_type) where_parts.append('pk_type = %(type)s') except (TypeError, ValueError): where_parts.append('pk_type = (SELECT pk FROM blobs.doc_type WHERE name = %(type)s)') if pk_types: where_parts.append('pk_type = ANY(%(pk_types)s)') args['pk_types'] = pk_types if pk_episodes: where_parts.append('pk_episode = ANY(%(epis)s)') args['epis'] = pk_episodes if encounter is not None: where_parts.append('pk_encounter = %(enc)s') if exclude_unsigned: where_parts.append('pk_doc = ANY(SELECT b_vo.pk_doc FROM blobs.v_obj4doc_no_data b_vo WHERE b_vo.pk_patient = %(pat)s AND b_vo.reviewed IS TRUE)') if not order_by: order_by = 'clin_when' order_by_clause = 'ORDER BY %s' % order_by cmd = "%s\n%s" % (_SQL_get_document_fields % ' AND '.join(where_parts), order_by_clause) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cDocument(row = {'pk_field': 'pk_doc', 'data': r}) for r in rows ]
def get_latest_freediams_prescription(self)
-
Expand source code
def get_latest_freediams_prescription(self): cmd = """ SELECT pk_doc FROM blobs.v_doc_med WHERE pk_patient = %(pat)s AND type = %(typ)s AND ext_ref = %(ref)s ORDER BY clin_when DESC LIMIT 1 """ args = { 'pat': self.pk_patient, 'typ': DOCUMENT_TYPE_PRESCRIPTION, 'ref': 'FreeDiams' } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: _log.info('no FreeDiams prescription available for patient [%s]' % self.pk_patient) return None prescription = cDocument(aPK_obj = rows[0][0]) return prescription
def get_latest_mugshot(self)
-
Expand source code
def get_latest_mugshot(self): SQL = "SELECT pk_obj FROM blobs.v_latest_mugshot WHERE pk_patient = %(pk_pat)s" args = {'pk_pat': self.pk_patient} rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}]) if rows: return cDocumentPart(aPK_obj = rows[0][0]) _log.info('no mugshots available for patient [%s]' % self.pk_patient) return None
def get_mugshot_list(self, latest_only=True)
-
Expand source code
def get_mugshot_list(self, latest_only=True): if latest_only: SQL = "SELECT pk_doc, pk_obj FROM blobs.v_latest_mugshot WHERE pk_patient = %(pk_pat)s" else: SQL = """ SELECT vdm.pk_doc as pk_doc, dobj.pk as pk_obj FROM blobs.v_doc_med vdm blobs.doc_obj dobj WHERE vdm.pk_type = (SELECT pk FROM blobs.doc_type WHERE name = 'patient photograph') and vdm.pk_patient = %(pk_pat)s and dobj.fk_doc = vdm.pk_doc """ args = {'pk_pat': self.pk_patient} rows = gmPG2.run_ro_queries(queries = [{'SQL': SQL, 'args': args}]) return rows
def get_unsigned_documents(self)
-
Expand source code
def get_unsigned_documents(self): args = {'pat': self.pk_patient} cmd = _SQL_get_document_fields % """ pk_doc = ANY ( SELECT DISTINCT ON (b_vo.pk_doc) b_vo.pk_doc FROM blobs.v_obj4doc_no_data b_vo WHERE pk_patient = %(pat)s AND reviewed IS FALSE ) ORDER BY clin_when DESC""" rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) return [ cDocument(row = {'pk_field': 'pk_doc', 'data': r}) for r in rows ]
def get_visual_progress_notes(self, episodes=None, encounter=None)
-
Expand source code
def get_visual_progress_notes(self, episodes=None, encounter=None): return self.get_documents ( doc_type = DOCUMENT_TYPE_VISUAL_PROGRESS_NOTE, pk_episodes = episodes, encounter = encounter )
class cDocumentPart (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Represents one part of a medical document.
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Expand source code
class cDocumentPart(gmBusinessDBObject.cBusinessDBObject): """Represents one part of a medical document.""" _cmd_fetch_payload = _SQL_get_document_part_fields % "pk_obj = %s" _cmds_store_payload = [ """UPDATE blobs.doc_obj SET seq_idx = %(seq_idx)s, comment = gm.nullify_empty_string(%(obj_comment)s), filename = gm.nullify_empty_string(%(filename)s), fk_intended_reviewer = %(pk_intended_reviewer)s WHERE pk = %(pk_obj)s AND xmin = %(xmin_doc_obj)s RETURNING xmin AS xmin_doc_obj""" ] _updatable_fields = [ 'seq_idx', 'obj_comment', 'pk_intended_reviewer', 'filename' ] #-------------------------------------------------------- # retrieve data #-------------------------------------------------------- def save_to_file(self, aChunkSize=0, filename=None, target_mime=None, target_extension=None, ignore_conversion_problems=False, directory=None, conn=None): if filename is None: filename = self.get_useful_filename(make_unique = True, directory = directory) dl_fname = self.__download_to_file(filename = filename) if dl_fname is None: return None if target_mime is None: return gmMimeLib.adjust_extension_by_mimetype(dl_fname) converted_fname = self.__convert_file_to ( filename = dl_fname, target_mime = target_mime, target_extension = target_extension ) if converted_fname is None: if ignore_conversion_problems: return dl_fname return None gmTools.remove_file(dl_fname) return converted_fname #-------------------------------------------------------- def get_reviews(self): SQL = """ SELECT reviewer, reviewed_when, is_technically_abnormal, clinically_relevant, is_review_by_responsible_reviewer, is_your_review, coalesce(comment, '') FROM blobs.v_reviewed_doc_objects WHERE pk_doc_obj = %(pk_doc_obj)s ORDER BY is_your_review desc, is_review_by_responsible_reviewer desc, reviewed_when desc """ args = {'pk_doc_obj': self.pk_obj} rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}]) return rows #-------------------------------------------------------- def __get_containing_document(self): return cDocument(aPK_obj = self._payload['pk_doc']) containing_document = property(__get_containing_document) #-------------------------------------------------------- # store data #-------------------------------------------------------- def update_data_from_incoming(self, conn=None, pk_incoming:int=None): SQL = """ UPDATE blobs.doc_obj SET data = (SELECT data FROM clin.incoming_data WHERE pk = %(pk_incoming)s) WHERE pk = %(pk_part)s """ args = {'pk_part': self.pk_obj, 'pk_incoming': pk_incoming} gmPG2.run_rw_queries(link_obj = conn, queries = [{'sql': SQL, 'args': args}], return_data = False) # must update XMIN now ... self.refetch_payload(link_obj = conn) return True #-------------------------------------------------------- def update_data_from_file(self, fname=None, link_obj=None): # sanity check if not (os.access(fname, os.R_OK) and os.path.isfile(fname)): _log.error('[%s] is not a readable file' % fname) return False cmd = "UPDATE blobs.doc_obj SET data = %(data)s::BYTEA WHERE pk = %(pk)s RETURNING md5(data) AS md5" args = {'pk': self.pk_obj} md5 = gmTools.file2md5(filename = fname, return_hex = True) if not gmPG2.file2bytea(conn = link_obj, query = cmd, filename = fname, args = args, file_md5 = md5): return False # must update XMIN now ... self.refetch_payload(link_obj = link_obj) return True #-------------------------------------------------------- def set_reviewed(self, technically_abnormal:bool=None, clinically_relevant:bool=None): # row already there ? SQL = """ SELECT pk FROM blobs.reviewed_doc_objs WHERE fk_reviewed_row = %(pk_obj)s and fk_reviewer = (SELECT pk FROM dem.staff WHERE db_user = current_user) """ args = {'pk_obj': self.pk_obj} rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}]) # INSERT needed if not rows: cols = [ "fk_reviewer", "fk_reviewed_row", "is_technically_abnormal", "clinically_relevant" ] vals = [ '%(fk_row)s', '%(abnormal)s', '%(relevant)s' ] args = { 'fk_row': self.pk_obj, 'abnormal': technically_abnormal, 'relevant': clinically_relevant } cmd = """ insert into blobs.reviewed_doc_objs ( %s ) values ( (SELECT pk FROM dem.staff WHERE db_user=current_user), %s )""" % (', '.join(cols), ', '.join(vals)) # UPDATE needed elif len(rows) == 1: pk_review = rows[0][0] args = { 'abnormal': technically_abnormal, 'relevant': clinically_relevant, 'pk_review': pk_review } cmd = """ UPDATE blobs.reviewed_doc_objs SET is_technically_abnormal = %(abnormal)s, clinically_relevant = %(relevant)s WHERE pk = %(pk_review)s """ else: raise AssertionError('more than one review by one reviewer for one particular row') rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True #-------------------------------------------------------- def set_as_active_photograph(self): if self._payload['type'] != 'patient photograph': return False # set seq_idx to current max + 1 cmd = 'SELECT coalesce(max(seq_idx)+1, 1) FROM blobs.doc_obj WHERE fk_doc = %(doc_id)s' rows = gmPG2.run_ro_queries ( queries = [{ 'sql': cmd, 'args': {'doc_id': self._payload['pk_doc']} }] ) self._payload['seq_idx'] = rows[0][0] self._is_modified = True return self.save_payload() #-------------------------------------------------------- def reattach(self, pk_doc=None): if pk_doc == self._payload['pk_doc']: return True cmd = """ UPDATE blobs.doc_obj SET fk_doc = %(pk_doc_target)s, -- coalesce needed for no-parts target docs seq_idx = (SELECT coalesce(max(seq_idx) + 1, 1) FROM blobs.doc_obj WHERE fk_doc = %(pk_doc_target)s) WHERE EXISTS(SELECT 1 FROM blobs.doc_med WHERE pk = %(pk_doc_target)s) AND pk = %(pk_obj)s AND xmin = %(xmin_doc_obj)s RETURNING fk_doc """ args = { 'pk_doc_target': pk_doc, 'pk_obj': self.pk_obj, 'xmin_doc_obj': self._payload['xmin_doc_obj'] } rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}], return_data = True) if len(rows) == 0: return False # The following should never hold true because the target # fk_doc is returned from the query and it is checked for # equality before the UPDATE already. Assuming the update # failed to update a row because the target fk_doc did # not exist we would not get *any* rows in return - for # which condition we also already checked if rows[0]['fk_doc'] == self._payload['pk_doc']: return False self.refetch_payload() return True #-------------------------------------------------------- def display_via_mime(self, chunksize=0, block=None): fname = self.save_to_file(aChunkSize = chunksize) if fname is None: return False, '' success, msg = gmMimeLib.call_viewer_on_file(fname, block = block) if not success: return False, msg return True, '' #-------------------------------------------------------- def format_single_line(self): f_ext = '' if self._payload['filename'] is not None: f_ext = os.path.splitext(self._payload['filename'])[1].strip('.').strip() if f_ext != '': f_ext = ' .' + f_ext.upper() txt = _('part %s, %s%s%s of document %s from %s%s') % ( self._payload['seq_idx'], gmTools.size2str(self._payload['size']), f_ext, gmTools.coalesce(self._payload['obj_comment'], '', ' ("%s")'), self._payload['l10n_type'], self._payload['date_generated'].strftime('%Y %b %d'), gmTools.coalesce(self._payload['doc_comment'], '', ' ("%s")') ) return txt #-------------------------------------------------------- def format(self, single_line=False): if single_line: return self.format_single_line() txt = _('%s document part [#%s]\n') % ( gmTools.bool2str ( boolean = self._payload['reviewed'], true_str = _('Reviewed'), false_str = _('Unreviewed') ), self._payload['pk_obj'] ) f_ext = '' if self._payload['filename'] is not None: f_ext = os.path.splitext(self._payload['filename'])[1].strip('.').strip() if f_ext != '': f_ext = '.' + f_ext.upper() + ' ' txt += _(' Part %s: %s %s(%s Bytes)\n') % ( self._payload['seq_idx'], gmTools.size2str(self._payload['size']), f_ext, self._payload['size'] ) if self._payload['filename'] is not None: path, fname = os.path.split(self._payload['filename']) if not path.endswith(os.path.sep): if path != '': path += os.path.sep if path != '': path = ' (%s)' % path txt += _(' Filename: %s%s\n') % (fname, path) if self._payload['obj_comment'] is not None: txt += '\n%s\n' % self._payload['obj_comment'] return txt #-------------------------------------------------------- def format_metainfo(self, callback=None): """If <callback> is not None it will receive a tuple (status, description, pk_obj).""" if callback is None: return self.__run_metainfo_formatter() gmWorkerThread.execute_in_worker_thread ( payload_function = self.__run_metainfo_formatter, completion_callback = callback, worker_name = 'doc_part-metainfo_formatter-' ) #-------------------------------------------------------- def get_useful_filename(self, patient=None, make_unique=False, directory=None, include_gnumed_tag=True, date_before_type=False, name_first=True): patient_part = '' if patient: if name_first: patient_part = '%s-' % patient.subdir_name else: patient_part = '-%s' % patient.subdir_name # preserve original filename extension if available suffix = '' if self._payload['filename'] is not None: tmp, suffix = os.path.splitext ( gmTools.fname_sanitize(self._payload['filename']).casefold() ) if not suffix: suffix = '.dat' fname_template = '%%s-part_%s' % self._payload['seq_idx'] if include_gnumed_tag: fname_template += '-gm_doc' if date_before_type: date_type_part = '%s-%s' % ( self._payload['date_generated'].strftime('%Y-%m-%d'), self._payload['l10n_type'].replace(' ', '_').replace('-', '_'), ) else: date_type_part = '%s-%s' % ( self._payload['l10n_type'].replace(' ', '_').replace('-', '_'), self._payload['date_generated'].strftime('%Y-%m-%d') ) if name_first: date_type_name_part = patient_part + date_type_part else: date_type_name_part = date_type_part + patient_part fname = fname_template % date_type_name_part if make_unique: fname = gmTools.get_unique_filename ( prefix = '%s-' % gmTools.fname_sanitize(fname), suffix = suffix, tmp_dir = directory ) else: fname = gmTools.fname_sanitize(os.path.join(gmTools.coalesce(directory, gmTools.gmPaths().tmp_dir), fname + suffix)) return fname useful_filename = property(get_useful_filename) #-------------------------------------------------------- # internal helpers #-------------------------------------------------------- def __download_to_file(self, filename=None, aChunkSize=0, conn=None): if self._payload['size'] == 0: _log.debug('part size 0, nothing to download') return None if filename is None: filename = gmTools.get_unique_filename() success = gmPG2.bytea2file ( data_query = { 'sql': 'SELECT substring(data FROM %(start)s for %(size)s) FROM blobs.doc_obj WHERE pk=%(pk)s', 'args': {'pk': self.pk_obj} }, filename = filename, chunk_size = aChunkSize, data_size = self._payload['size'], conn = conn ) if not success: return None return filename #-------------------------------------------------------- def __convert_file_to(self, filename=None, target_mime=None, target_extension=None): assert (filename is not None), '<filename> must not be None' assert (target_mime is not None), '<target_mime> must not be None' if target_extension is None: target_extension = gmMimeLib.guess_ext_by_mimetype(mimetype = target_mime) src_path, src_name = os.path.split(filename) src_stem, src_ext = os.path.splitext(src_name) conversion_tmp_name = gmTools.get_unique_filename ( prefix = '%s.conv2.' % src_stem, suffix = target_extension ) _log.debug('attempting conversion: [%s] -> [<%s>:%s]', filename, target_mime, conversion_tmp_name) converted_fname = gmMimeLib.convert_file ( filename = filename, target_mime = target_mime, target_filename = conversion_tmp_name ) if converted_fname is None: _log.warning('conversion failed') return None tmp_path, conv_name = os.path.split(converted_fname) conv_name_in_src_path = os.path.join(src_path, conv_name) try: os.replace(converted_fname, conv_name_in_src_path) except OSError: _log.exception('cannot os.replace(%s, %s)', converted_fname, conv_name_in_src_path) return None return gmMimeLib.adjust_extension_by_mimetype(conv_name_in_src_path) #-------------------------------------------------------- def __run_metainfo_formatter(self): filename = self.__download_to_file() if filename is None: _log.error('problem downloading part') return (False, _('problem downloading document part')) status, desc, cookie = gmMimeLib.describe_file(filename) return (status, desc, self.pk_obj)
Ancestors
Instance variables
var containing_document
-
Expand source code
def __get_containing_document(self): return cDocument(aPK_obj = self._payload['pk_doc'])
var useful_filename
-
Expand source code
def get_useful_filename(self, patient=None, make_unique=False, directory=None, include_gnumed_tag=True, date_before_type=False, name_first=True): patient_part = '' if patient: if name_first: patient_part = '%s-' % patient.subdir_name else: patient_part = '-%s' % patient.subdir_name # preserve original filename extension if available suffix = '' if self._payload['filename'] is not None: tmp, suffix = os.path.splitext ( gmTools.fname_sanitize(self._payload['filename']).casefold() ) if not suffix: suffix = '.dat' fname_template = '%%s-part_%s' % self._payload['seq_idx'] if include_gnumed_tag: fname_template += '-gm_doc' if date_before_type: date_type_part = '%s-%s' % ( self._payload['date_generated'].strftime('%Y-%m-%d'), self._payload['l10n_type'].replace(' ', '_').replace('-', '_'), ) else: date_type_part = '%s-%s' % ( self._payload['l10n_type'].replace(' ', '_').replace('-', '_'), self._payload['date_generated'].strftime('%Y-%m-%d') ) if name_first: date_type_name_part = patient_part + date_type_part else: date_type_name_part = date_type_part + patient_part fname = fname_template % date_type_name_part if make_unique: fname = gmTools.get_unique_filename ( prefix = '%s-' % gmTools.fname_sanitize(fname), suffix = suffix, tmp_dir = directory ) else: fname = gmTools.fname_sanitize(os.path.join(gmTools.coalesce(directory, gmTools.gmPaths().tmp_dir), fname + suffix)) return fname
Methods
def display_via_mime(self, chunksize=0, block=None)
-
Expand source code
def display_via_mime(self, chunksize=0, block=None): fname = self.save_to_file(aChunkSize = chunksize) if fname is None: return False, '' success, msg = gmMimeLib.call_viewer_on_file(fname, block = block) if not success: return False, msg return True, ''
def format_metainfo(self, callback=None)
-
If
is not None it will receive a tuple (status, description, pk_obj). Expand source code
def format_metainfo(self, callback=None): """If <callback> is not None it will receive a tuple (status, description, pk_obj).""" if callback is None: return self.__run_metainfo_formatter() gmWorkerThread.execute_in_worker_thread ( payload_function = self.__run_metainfo_formatter, completion_callback = callback, worker_name = 'doc_part-metainfo_formatter-' )
def format_single_line(self)
-
Expand source code
def format_single_line(self): f_ext = '' if self._payload['filename'] is not None: f_ext = os.path.splitext(self._payload['filename'])[1].strip('.').strip() if f_ext != '': f_ext = ' .' + f_ext.upper() txt = _('part %s, %s%s%s of document %s from %s%s') % ( self._payload['seq_idx'], gmTools.size2str(self._payload['size']), f_ext, gmTools.coalesce(self._payload['obj_comment'], '', ' ("%s")'), self._payload['l10n_type'], self._payload['date_generated'].strftime('%Y %b %d'), gmTools.coalesce(self._payload['doc_comment'], '', ' ("%s")') ) return txt
def get_reviews(self)
-
Expand source code
def get_reviews(self): SQL = """ SELECT reviewer, reviewed_when, is_technically_abnormal, clinically_relevant, is_review_by_responsible_reviewer, is_your_review, coalesce(comment, '') FROM blobs.v_reviewed_doc_objects WHERE pk_doc_obj = %(pk_doc_obj)s ORDER BY is_your_review desc, is_review_by_responsible_reviewer desc, reviewed_when desc """ args = {'pk_doc_obj': self.pk_obj} rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}]) return rows
def get_useful_filename(self, patient=None, make_unique=False, directory=None, include_gnumed_tag=True, date_before_type=False, name_first=True)
-
Expand source code
def get_useful_filename(self, patient=None, make_unique=False, directory=None, include_gnumed_tag=True, date_before_type=False, name_first=True): patient_part = '' if patient: if name_first: patient_part = '%s-' % patient.subdir_name else: patient_part = '-%s' % patient.subdir_name # preserve original filename extension if available suffix = '' if self._payload['filename'] is not None: tmp, suffix = os.path.splitext ( gmTools.fname_sanitize(self._payload['filename']).casefold() ) if not suffix: suffix = '.dat' fname_template = '%%s-part_%s' % self._payload['seq_idx'] if include_gnumed_tag: fname_template += '-gm_doc' if date_before_type: date_type_part = '%s-%s' % ( self._payload['date_generated'].strftime('%Y-%m-%d'), self._payload['l10n_type'].replace(' ', '_').replace('-', '_'), ) else: date_type_part = '%s-%s' % ( self._payload['l10n_type'].replace(' ', '_').replace('-', '_'), self._payload['date_generated'].strftime('%Y-%m-%d') ) if name_first: date_type_name_part = patient_part + date_type_part else: date_type_name_part = date_type_part + patient_part fname = fname_template % date_type_name_part if make_unique: fname = gmTools.get_unique_filename ( prefix = '%s-' % gmTools.fname_sanitize(fname), suffix = suffix, tmp_dir = directory ) else: fname = gmTools.fname_sanitize(os.path.join(gmTools.coalesce(directory, gmTools.gmPaths().tmp_dir), fname + suffix)) return fname
def reattach(self, pk_doc=None)
-
Expand source code
def reattach(self, pk_doc=None): if pk_doc == self._payload['pk_doc']: return True cmd = """ UPDATE blobs.doc_obj SET fk_doc = %(pk_doc_target)s, -- coalesce needed for no-parts target docs seq_idx = (SELECT coalesce(max(seq_idx) + 1, 1) FROM blobs.doc_obj WHERE fk_doc = %(pk_doc_target)s) WHERE EXISTS(SELECT 1 FROM blobs.doc_med WHERE pk = %(pk_doc_target)s) AND pk = %(pk_obj)s AND xmin = %(xmin_doc_obj)s RETURNING fk_doc """ args = { 'pk_doc_target': pk_doc, 'pk_obj': self.pk_obj, 'xmin_doc_obj': self._payload['xmin_doc_obj'] } rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}], return_data = True) if len(rows) == 0: return False # The following should never hold true because the target # fk_doc is returned from the query and it is checked for # equality before the UPDATE already. Assuming the update # failed to update a row because the target fk_doc did # not exist we would not get *any* rows in return - for # which condition we also already checked if rows[0]['fk_doc'] == self._payload['pk_doc']: return False self.refetch_payload() return True
def save_to_file(self, aChunkSize=0, filename=None, target_mime=None, target_extension=None, ignore_conversion_problems=False, directory=None, conn=None)
-
Expand source code
def save_to_file(self, aChunkSize=0, filename=None, target_mime=None, target_extension=None, ignore_conversion_problems=False, directory=None, conn=None): if filename is None: filename = self.get_useful_filename(make_unique = True, directory = directory) dl_fname = self.__download_to_file(filename = filename) if dl_fname is None: return None if target_mime is None: return gmMimeLib.adjust_extension_by_mimetype(dl_fname) converted_fname = self.__convert_file_to ( filename = dl_fname, target_mime = target_mime, target_extension = target_extension ) if converted_fname is None: if ignore_conversion_problems: return dl_fname return None gmTools.remove_file(dl_fname) return converted_fname
def set_as_active_photograph(self)
-
Expand source code
def set_as_active_photograph(self): if self._payload['type'] != 'patient photograph': return False # set seq_idx to current max + 1 cmd = 'SELECT coalesce(max(seq_idx)+1, 1) FROM blobs.doc_obj WHERE fk_doc = %(doc_id)s' rows = gmPG2.run_ro_queries ( queries = [{ 'sql': cmd, 'args': {'doc_id': self._payload['pk_doc']} }] ) self._payload['seq_idx'] = rows[0][0] self._is_modified = True return self.save_payload()
def set_reviewed(self, technically_abnormal: bool = None, clinically_relevant: bool = None)
-
Expand source code
def set_reviewed(self, technically_abnormal:bool=None, clinically_relevant:bool=None): # row already there ? SQL = """ SELECT pk FROM blobs.reviewed_doc_objs WHERE fk_reviewed_row = %(pk_obj)s and fk_reviewer = (SELECT pk FROM dem.staff WHERE db_user = current_user) """ args = {'pk_obj': self.pk_obj} rows = gmPG2.run_ro_queries(queries = [{'sql': SQL, 'args': args}]) # INSERT needed if not rows: cols = [ "fk_reviewer", "fk_reviewed_row", "is_technically_abnormal", "clinically_relevant" ] vals = [ '%(fk_row)s', '%(abnormal)s', '%(relevant)s' ] args = { 'fk_row': self.pk_obj, 'abnormal': technically_abnormal, 'relevant': clinically_relevant } cmd = """ insert into blobs.reviewed_doc_objs ( %s ) values ( (SELECT pk FROM dem.staff WHERE db_user=current_user), %s )""" % (', '.join(cols), ', '.join(vals)) # UPDATE needed elif len(rows) == 1: pk_review = rows[0][0] args = { 'abnormal': technically_abnormal, 'relevant': clinically_relevant, 'pk_review': pk_review } cmd = """ UPDATE blobs.reviewed_doc_objs SET is_technically_abnormal = %(abnormal)s, clinically_relevant = %(relevant)s WHERE pk = %(pk_review)s """ else: raise AssertionError('more than one review by one reviewer for one particular row') rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) return True
def update_data_from_file(self, fname=None, link_obj=None)
-
Expand source code
def update_data_from_file(self, fname=None, link_obj=None): # sanity check if not (os.access(fname, os.R_OK) and os.path.isfile(fname)): _log.error('[%s] is not a readable file' % fname) return False cmd = "UPDATE blobs.doc_obj SET data = %(data)s::BYTEA WHERE pk = %(pk)s RETURNING md5(data) AS md5" args = {'pk': self.pk_obj} md5 = gmTools.file2md5(filename = fname, return_hex = True) if not gmPG2.file2bytea(conn = link_obj, query = cmd, filename = fname, args = args, file_md5 = md5): return False # must update XMIN now ... self.refetch_payload(link_obj = link_obj) return True
def update_data_from_incoming(self, conn=None, pk_incoming: int = None)
-
Expand source code
def update_data_from_incoming(self, conn=None, pk_incoming:int=None): SQL = """ UPDATE blobs.doc_obj SET data = (SELECT data FROM clin.incoming_data WHERE pk = %(pk_incoming)s) WHERE pk = %(pk_part)s """ args = {'pk_part': self.pk_obj, 'pk_incoming': pk_incoming} gmPG2.run_rw_queries(link_obj = conn, queries = [{'sql': SQL, 'args': args}], return_data = False) # must update XMIN now ... self.refetch_payload(link_obj = conn) return True
Inherited members
class cDocumentType (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Represents a document type.
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Expand source code
class cDocumentType(gmBusinessDBObject.cBusinessDBObject): """Represents a document type.""" _cmd_fetch_payload = """SELECT * FROM blobs.v_doc_type WHERE pk_doc_type=%s""" _cmds_store_payload = [ """update blobs.doc_type set name = %(type)s WHERE pk=%(pk_obj)s and xmin=%(xmin_doc_type)s""", """SELECT xmin_doc_type FROM blobs.v_doc_type WHERE pk_doc_type = %(pk_obj)s""" ] _updatable_fields = ['type'] #-------------------------------------------------------- def set_translation(self, translation=None): if translation.strip() == '': return False if translation.strip() == self._payload['l10n_type'].strip(): return True args = { 'orig': self._payload['type'], 'tx': translation } queries = [{ 'sql': 'SELECT i18n.i18n(%(orig)s)', 'args': args }, { 'sql': 'SELECT i18n.upd_tx((SELECT i18n.get_curr_lang()), %(orig)s, %(tx)s)', 'args': args }] rows = gmPG2.run_rw_queries(queries = queries, return_data = True) if not rows[0][0]: _log.error('cannot set translation to [%s]' % translation) return False return self.refetch_payload()
Ancestors
Methods
def set_translation(self, translation=None)
-
Expand source code
def set_translation(self, translation=None): if translation.strip() == '': return False if translation.strip() == self._payload['l10n_type'].strip(): return True args = { 'orig': self._payload['type'], 'tx': translation } queries = [{ 'sql': 'SELECT i18n.i18n(%(orig)s)', 'args': args }, { 'sql': 'SELECT i18n.upd_tx((SELECT i18n.get_curr_lang()), %(orig)s, %(tx)s)', 'args': args }] rows = gmPG2.run_rw_queries(queries = queries, return_data = True) if not rows[0][0]: _log.error('cannot set translation to [%s]' % translation) return False return self.refetch_payload()
Inherited members