Module Gnumed.business.gmKeywordExpansion
GNUmed keyword snippet expansions
Copyright: authors
Functions
def create_keyword_expansion(keyword=None, text=None, data_file=None, public=True)
-
Expand source code
def create_keyword_expansion(keyword=None, text=None, data_file=None, public=True): if text is not None: if text.strip() == '': text = None if None not in [text, data_file]: raise ValueError('either <text> or <data_file> must be non-NULL') # already exists ? cmd = "SELECT 1 FROM ref.v_your_keyword_expansions WHERE public_expansion IS %(public)s AND keyword = %(kwd)s" args = {'kwd': keyword, 'public': public} rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) != 0: # can't create duplicate return False if data_file is not None: text = 'fake data' args = {'kwd': keyword, 'txt': text} if public: cmd = """ INSERT INTO ref.keyword_expansion (keyword, textual_data, fk_staff) VALUES ( gm.nullify_empty_string(%(kwd)s), gm.nullify_empty_string(%(txt)s), null ) RETURNING pk """ else: cmd = """ INSERT INTO ref.keyword_expansion (keyword, textual_data, fk_staff) VALUES ( gm.nullify_empty_string(%(kwd)s), gm.nullify_empty_string(%(txt)s), (SELECT pk FROM dem.staff WHERE db_user = current_user) ) RETURNING pk """ rows = gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}], return_data = True) expansion = cKeywordExpansion(aPK_obj = rows[0]['pk']) if data_file is not None: expansion.update_data_from_file(filename = data_file) global __textual_expansion_keywords __textual_expansion_keywords = None global __keyword_expansions __keyword_expansions = None return expansion
def delete_keyword_expansion(pk=None)
-
Expand source code
def delete_keyword_expansion(pk=None): args = {'pk': pk} cmd = "DELETE FROM ref.keyword_expansion WHERE pk = %(pk)s" gmPG2.run_rw_queries(queries = [{'sql': cmd, 'args': args}]) global __textual_expansion_keywords __textual_expansion_keywords = None global __keyword_expansions __keyword_expansions = None return True
def expand_keyword(keyword=None)
-
Expand source code
def expand_keyword(keyword = None): # Easter Egg ;-) if keyword == '$$steffi': return 'Hai, play ! Versucht das ! (Keks dazu ?) :-)' cmd = """SELECT expansion FROM ref.v_your_keyword_expansions WHERE keyword = %(kwd)s""" rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': {'kwd': keyword}}]) if len(rows) == 0: return None return rows[0]['expansion']
def get_expansion(keyword=None, textual_only=True, binary_only=False)
-
Expand source code
def get_expansion(keyword=None, textual_only=True, binary_only=False): if False not in [textual_only, binary_only]: raise ValueError('one of <textual_only> and <binary_only> must be False') where_parts = ['keyword = %(kwd)s'] args = {'kwd': keyword} if textual_only: where_parts.append('is_textual IS TRUE') cmd = _SQL_get_keyword_expansions % ' AND '.join(where_parts) rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) if len(rows) == 0: return None return cKeywordExpansion(row = {'data': rows[0], 'pk_field': 'pk_expansion'})
def get_keyword_expansions(order_by=None, force_reload=False, return_pks=False)
-
Expand source code
def get_keyword_expansions(order_by=None, force_reload=False, return_pks=False): global __keyword_expansions if not force_reload: if __keyword_expansions is not None: return __keyword_expansions if order_by is None: order_by = 'true' else: order_by = 'true ORDER BY %s' % order_by cmd = _SQL_get_keyword_expansions % order_by rows = gmPG2.run_ro_queries(queries = [{'sql': cmd}]) if return_pks: return [ r['pk_expansion'] for r in rows ] __keyword_expansions = [ cKeywordExpansion(row = {'data': r, 'pk_field': 'pk_expansion'}) for r in rows ] return __keyword_expansions
def get_matching_textual_keywords(fragment=None)
-
Expand source code
def get_matching_textual_keywords(fragment=None): if fragment is None: return [] return [ kwd['keyword'] for kwd in get_textual_expansion_keywords() if kwd['keyword'].startswith(fragment) ]
def get_textual_expansion_keywords()
-
Expand source code
def get_textual_expansion_keywords(): global __textual_expansion_keywords if __textual_expansion_keywords is not None: return __textual_expansion_keywords cmd = """SELECT keyword, public_expansion, private_expansion, owner FROM ref.v_keyword_expansions WHERE is_textual IS TRUE""" rows = gmPG2.run_ro_queries(queries = [{'sql': cmd}]) __textual_expansion_keywords = rows _log.info('retrieved %s textual expansion keywords', len(__textual_expansion_keywords)) return __textual_expansion_keywords
Classes
class cKeywordExpansion (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Expand source code
class cKeywordExpansion(gmBusinessDBObject.cBusinessDBObject): """Keyword indexed text snippets or chunks of data. Used as text macros or to put into documents.""" _cmd_fetch_payload = _SQL_get_keyword_expansions % "pk_expansion = %s" _cmds_store_payload = [ """ UPDATE ref.keyword_expansion SET keyword = gm.nullify_empty_string(%(keyword)s), textual_data = CASE WHEN gm.nullify_empty_string(%(expansion)s) IS NULL THEN CASE WHEN binary_data IS NULL THEN '---fake_data---' ELSE NULL END ELSE gm.nullify_empty_string(%(expansion)s) END, binary_data = CASE WHEN gm.nullify_empty_string(%(expansion)s) IS NULL THEN binary_data ELSE NULL END, encrypted = %(is_encrypted)s WHERE pk = %(pk_expansion)s AND xmin = %(xmin_expansion)s RETURNING xmin as xmin_expansion """ ] _updatable_fields = [ 'keyword', 'expansion', 'is_encrypted' ] #-------------------------------------------------------- def save_to_file(self, aChunkSize=0, target_mime=None, target_extension=None, ignore_conversion_problems=False): if self._payload['is_textual']: return None if self._payload['data_size'] == 0: return None exported_fname = gmTools.get_unique_filename(prefix = 'gm-data_snippet-') success = gmPG2.bytea2file ( data_query = { 'sql': 'SELECT substring(binary_data from %(start)s for %(size)s) FROM ref.keyword_expansion WHERE pk = %(pk)s', 'args': {'pk': self.pk_obj} }, filename = exported_fname, chunk_size = aChunkSize, data_size = self._payload['data_size'] ) if not success: return None if target_mime is None: return exported_fname converted_fname = gmMimeLib.convert_file(filename = exported_fname, target_mime = target_mime) if converted_fname is not None: return converted_fname _log.warning('conversion failed') if ignore_conversion_problems: _log.warning('programmed to ignore conversion problems, hoping receiver can handle [%s]', exported_fname) return exported_fname return None #-------------------------------------------------------- def update_data_from_file(self, filename=None): if not (os.access(filename, os.R_OK) and os.path.isfile(filename)): _log.error('[%s] is not a readable file' % filename) return False gmPG2.file2bytea ( query = "UPDATE ref.keyword_expansion SET binary_data = %(data)s::bytea, textual_data = NULL WHERE pk = %(pk)s", filename = filename, args = {'pk': self.pk_obj} ) # must update XMIN now ... self.refetch_payload() global __textual_expansion_keywords __textual_expansion_keywords = None global __keyword_expansions __keyword_expansions = None return True #-------------------------------------------------------- def format(self): txt = '%s #%s\n' % ( gmTools.bool2subst ( self._payload['is_textual'], _('Textual keyword expansion'), _('Binary keyword expansion') ), self._payload['pk_expansion'] ) txt += ' %s%s\n' % ( gmTools.bool2subst ( self._payload['private_expansion'], _('private'), _('public') ), gmTools.bool2subst ( self._payload['is_encrypted'], ', %s' % _('encrypted'), '' ) ) txt += _(' Keyword: %s\n') % self._payload['keyword'] txt += _(' Owner: %s\n') % self._payload['owner'] if self._payload['is_textual']: txt += '\n%s' % self._payload['expansion'] else: txt += ' Data: %s (%s Bytes)' % (gmTools.size2str(self._payload['data_size']), self._payload['data_size']) return txt
Keyword indexed text snippets or chunks of data. Used as text macros or to put into documents.
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'sql': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Ancestors
Methods
def save_to_file(self,
aChunkSize=0,
target_mime=None,
target_extension=None,
ignore_conversion_problems=False)-
Expand source code
def save_to_file(self, aChunkSize=0, target_mime=None, target_extension=None, ignore_conversion_problems=False): if self._payload['is_textual']: return None if self._payload['data_size'] == 0: return None exported_fname = gmTools.get_unique_filename(prefix = 'gm-data_snippet-') success = gmPG2.bytea2file ( data_query = { 'sql': 'SELECT substring(binary_data from %(start)s for %(size)s) FROM ref.keyword_expansion WHERE pk = %(pk)s', 'args': {'pk': self.pk_obj} }, filename = exported_fname, chunk_size = aChunkSize, data_size = self._payload['data_size'] ) if not success: return None if target_mime is None: return exported_fname converted_fname = gmMimeLib.convert_file(filename = exported_fname, target_mime = target_mime) if converted_fname is not None: return converted_fname _log.warning('conversion failed') if ignore_conversion_problems: _log.warning('programmed to ignore conversion problems, hoping receiver can handle [%s]', exported_fname) return exported_fname return None
def update_data_from_file(self, filename=None)
-
Expand source code
def update_data_from_file(self, filename=None): if not (os.access(filename, os.R_OK) and os.path.isfile(filename)): _log.error('[%s] is not a readable file' % filename) return False gmPG2.file2bytea ( query = "UPDATE ref.keyword_expansion SET binary_data = %(data)s::bytea, textual_data = NULL WHERE pk = %(pk)s", filename = filename, args = {'pk': self.pk_obj} ) # must update XMIN now ... self.refetch_payload() global __textual_expansion_keywords __textual_expansion_keywords = None global __keyword_expansions __keyword_expansions = None return True
Inherited members