Module Gnumed.business.gmStaff

GNUmed staff objects.

Expand source code
# -*- coding: utf-8 -*-
"""GNUmed staff objects."""
#============================================================
__author__ = "K.Hilbert <Karsten.Hilbert@gmx.net>"
__license__ = "GPL"

# std lib
import sys
import logging

# GNUmed
if __name__ == '__main__':
        sys.path.insert(0, '../../')
        _ = lambda x:x
from Gnumed.pycommon import gmBusinessDBObject
from Gnumed.pycommon import gmPG2
from Gnumed.pycommon import gmNull
from Gnumed.pycommon import gmBorg
from Gnumed.pycommon import gmLog2
from Gnumed.pycommon import gmCfgDB
from Gnumed.pycommon import gmTools


_log = logging.getLogger('gm.staff')

_map_gm_role2pg_group = {
        'public access': 'gm-public',
        'non-clinical access': 'gm-staff',
        'full clinical access': 'gm-doctors'
}

#============================================================
_SQL_get_staff_fields = 'SELECT *, _(role) AS l10n_role FROM dem.v_staff WHERE %s'

class cStaff(gmBusinessDBObject.cBusinessDBObject):
        _cmd_fetch_payload = _SQL_get_staff_fields % "pk_staff = %s"
        _cmds_store_payload = [
                """UPDATE dem.staff SET
                                short_alias = %(short_alias)s,
                                comment = gm.nullify_empty_string(%(comment)s),
                                is_active = %(is_active)s,
                                db_user = %(db_user)s,
                                public_key = %(public_key)s
                        WHERE
                                pk = %(pk_staff)s
                                        AND
                                xmin = %(xmin_staff)s
                        RETURNING
                                xmin AS xmin_staff"""
        ]
        _updatable_fields = ['short_alias', 'comment', 'is_active', 'db_user', 'public_key']

        #--------------------------------------------------------
        def __init__(self, aPK_obj=None, row=None):
                # by default get staff corresponding to CURRENT_USER
                if (aPK_obj is None) and (row is None):
                        cmd = _SQL_get_staff_fields % "db_user = CURRENT_USER"
                        try:
                                rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx=True)
                        except Exception:
                                _log.exception('cannot instantiate staff instance')
                                gmLog2.log_stack_trace()
                                raise ValueError('cannot instantiate staff instance for database account CURRENT_USER')
                        if len(rows) == 0:
                                raise ValueError('no staff record for database account CURRENT_USER')
                        row = {
                                'pk_field': 'pk_staff',
                                'idx': idx,
                                'data': rows[0]
                        }
                        gmBusinessDBObject.cBusinessDBObject.__init__(self, row = row)
                else:
                        gmBusinessDBObject.cBusinessDBObject.__init__(self, aPK_obj = aPK_obj, row = row)

                # are we SELF ?
                self.__is_current_user = (gmPG2.get_current_user() == self._payload[self._idx['db_user']])

                self.__inbox = None

        #--------------------------------------------------------
        def __setitem__(self, attribute, value):
                if attribute == 'db_user':
                        if self.__is_current_user:
                                _log.debug('will not modify database account association of CURRENT_USER staff member')
                                return

                gmBusinessDBObject.cBusinessDBObject.__setitem__(self, attribute, value)

        #--------------------------------------------------------
        def _get_db_lang(self):
                rows, idx = gmPG2.run_ro_queries (
                        queries = [{
                                'cmd': 'select i18n.get_curr_lang(%(usr)s)',
                                'args': {'usr': self._payload[self._idx['db_user']]}
                        }]
                )
                return rows[0][0]

        def _set_db_lang(self, language):
                if not gmPG2.set_user_language(language = language):
                        raise ValueError (
                                'Cannot set database language to [%s] for user [%s].' % (language, self._payload[self._idx['db_user']])
                        )
                return

        database_language = property(_get_db_lang, _set_db_lang)

        #--------------------------------------------------------
        def _get_inbox(self):
                if self.__inbox is None:
                        from Gnumed.business import gmProviderInbox
                        self.__inbox = gmProviderInbox.cProviderInbox(provider_id = self._payload[self._idx['pk_staff']])
                return self.__inbox

        inbox = property(_get_inbox)

        #--------------------------------------------------------
        def _get_identity(self):
                from Gnumed.business.gmPerson import cPerson
                return cPerson(self._payload[self._idx['pk_identity']])

        identity = property(_get_identity)

        #--------------------------------------------------------
        def set_role(self, conn=None, role=None):
                if role.strip() == self._payload[self._idx['role']]:
                        return True

                cmd = 'SELECT gm.add_user_to_permission_group(%(usr)s::name, %(grp)s::name)'
                args = {
                        'usr': self._payload[self._idx['db_user']],
                        'grp': _map_gm_role2pg_group[role.strip()]
                }
                rows, idx = gmPG2.run_rw_queries (
                        link_obj = conn,
                        queries = [{'cmd': cmd, 'args': args}],
                        get_col_idx = False,
                        return_data = True,
                        end_tx = True
                )
                if not rows[0][0]:
                        return False

                self.refetch_payload()
                return True

        role = property(lambda x:x, set_role)

        #--------------------------------------------------------
        def _get_public_key_file(self):
                if not self._payload['public_key']:
                        return None

                pk_fname = gmTools.get_unique_filename()
                with open(pk_fname, mode = 'wt', encoding = 'utf8') as pk_f:
                        pk_f.write(self._payload['public_key'])
                return pk_fname

        def _set_public_key_file(self, pk_fname):
                with open(pk_fname, mode = 'r', encoding = 'utf8') as pk_f:
                        self['public_key'] = pk_f.read()
                self.save()

        public_key_file = property(_get_public_key_file, _set_public_key_file)

#============================================================
def get_staff_list(active_only=False):
        if active_only:
                cmd = _SQL_get_staff_fields % 'is_active ORDER BY can_login DESC, short_alias ASC'
        else:
                cmd = _SQL_get_staff_fields % 'TRUE ORDER BY can_login DESC, is_active DESC, short_alias ASC'
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx=True)
        staff_list = []
        for row in rows:
                obj_row = {
                        'idx': idx,
                        'data': row,
                        'pk_field': 'pk_staff'
                }
                staff_list.append(cStaff(row=obj_row))
        return staff_list

#------------------------------------------------------------
def create_staff(conn=None, db_account=None, password=None, identity=None, short_alias=None):
        args = {
                'pg_usr': db_account,
                'pwd': password,
                'person_id': identity,
                'sig': short_alias
        }

        queries = [
                {'cmd': 'SELECT gm.create_user(%(pg_usr)s, %(pwd)s)', 'args': args},
                {'cmd': """
                        INSERT INTO dem.staff
                                (fk_identity, db_user, short_alias)
                        VALUES (
                                %(person_id)s,
                                %(pg_usr)s,
                                %(sig)s
                        )""",
                 'args': args
                }
        ]

        created = False
        try:
                rows, idx = gmPG2.run_rw_queries(link_obj = conn, queries = queries, end_tx = True)
                created = True
        except gmPG2.dbapi.IntegrityError as e:
                if e.pgcode != gmPG2.PG_error_codes.UNIQUE_VIOLATION:
                        raise

        if created:
                return True, None

        msg = _(
                'Cannot add GNUmed user.\n'
                '\n'
                'The database account [%s] is already listed as a\n'
                'GNUmed user. There can only be one GNUmed user\n'
                'for each database account.\n'
        ) % db_account
        return False, msg

#------------------------------------------------------------
def delete_staff(conn=None, pk_staff=None):
        deleted = False
        queries = [{'cmd': 'DELETE FROM dem.staff WHERE pk = %(pk)s', 'args': {'pk': pk_staff}}]
        try:
                rows, idx = gmPG2.run_rw_queries(link_obj = conn, queries = queries, end_tx = True)
                deleted = True
        except gmPG2.dbapi.IntegrityError as e:
                if e.pgcode != gmPG2.PG_error_codes.FOREIGN_KEY_VIOLATION:              # 23503  foreign_key_violation
                        raise

        if deleted:
                return True, None

        deactivate_staff(conn = conn, pk_staff = pk_staff)
        msg = _(
                'Cannot delete GNUmed staff member because the\n'
                'database still contains data linked to it.\n'
                '\n'
                'The account was deactivated instead.'
        )
        return False, msg

#------------------------------------------------------------
def activate_staff(conn=None, pk_staff=None):
        # 1) activate staff entry
        staff = cStaff(aPK_obj = pk_staff)
        staff['is_active'] = True
        staff.save_payload(conn=conn)                           # FIXME: error handling
        # 2) enable database account login
        rowx, idx = gmPG2.run_rw_queries (
                link_obj = conn,
                # password does not matter because PG account must already exist
                queries = [{'cmd': 'select gm.create_user(%s, %s)', 'args': [staff['db_user'], 'flying wombat']}],
                end_tx = True
        )
        return True

#------------------------------------------------------------
def deactivate_staff(conn=None, pk_staff=None):

        # 1) inactivate staff entry
        staff = cStaff(aPK_obj = pk_staff)
        staff['is_active'] = False
        staff.save_payload(conn = conn)                         # FIXME: error handling
        # 2) disable database account login
        rows, idx = gmPG2.run_rw_queries (
                link_obj = conn,
                queries = [{'cmd': 'select gm.disable_user(%s)', 'args': [staff['db_user']]}],
                end_tx = True
        )
        return True

#------------------------------------------------------------
def get_public_keys_of_passphrase_trustees(as_files:bool=False) -> list[str]:
        """Retrieve the public keys of passphrase trustees.

        Returns:
                List of keys or list of keyfile names, depending on 'as_files'.
        """
        pub_keys = gmCfgDB.get4site (
                option = 'horstspace.export.public_keys_of_passphrase_trustees',
                default = []
        )
        if not as_files or not pub_keys:
                return pub_keys

        fnames = []
        for pub_key in pub_keys:
                pubk_fname = gmTools.get_unique_filename()
                with open(pubk_fname, mode = 'wt', encoding = 'utf8') as pubk_f:
                        pubk_f.write(pub_key)
                fnames.append(pubk_fname)
        return fnames

#============================================================
#============================================================
def set_current_provider_to_logged_on_user():
        gmCurrentProvider(provider = cStaff())

#============================================================
class gmCurrentProvider(gmBorg.cBorg):
        """Staff member Borg to hold currently logged on provider.

        There may be many instances of this but they all share state.
        """
        def __init__(self, provider=None):
                """Change or get currently logged on provider.

                provider:
                * None: get copy of current instance
                * cStaff instance: change logged on provider (role)
                """
                # make sure we do have a provider pointer
                try:
                        self.provider
                except AttributeError:
                        self.provider = gmNull.cNull()

                # user wants copy of currently logged on provider
                if provider is None:
                        return None

                # must be cStaff instance, then
                if not isinstance(provider, cStaff):
                        raise ValueError('cannot set logged on provider to [%s], must be either None or cStaff instance' % str(provider))

                # first invocation
                if isinstance(self.provider, gmNull.cNull):
                        self.provider = provider
                        return None

                # same ID, no change needed
                if self.provider['pk_staff'] == provider['pk_staff']:
                        return None

                # user wants different provider
                raise ValueError('provider change [%s] -> [%s] not yet supported' % (self.provider['pk_staff'], provider['pk_staff']))

        #--------------------------------------------------------
        def get_staff(self):
                return self.provider

        #--------------------------------------------------------
        # __getitem__ handling
        #--------------------------------------------------------
        def __getitem__(self, aVar):
                """Return any attribute if known how to retrieve it by proxy.
                """
                return self.provider[aVar]

        #--------------------------------------------------------
        # __s/getattr__ handling
        #--------------------------------------------------------
        def __getattr__(self, attribute):
                if attribute == 'provider':                     # so we can __init__ ourselves
                        raise AttributeError
                if not isinstance(self.provider, gmNull.cNull):
                        return getattr(self.provider, attribute)
#               raise AttributeError

#============================================================
# main/testing
#============================================================
if __name__ == '__main__':

        if len(sys.argv) == 1:
                sys.exit()

        if sys.argv[1] != 'test':
                sys.exit()

        from Gnumed.pycommon import gmI18N
        from Gnumed.pycommon import gmDateTime

        gmI18N.activate_locale()
        gmI18N.install_domain()
        gmDateTime.init()

        #--------------------------------------------------------
        def test_staff():
                staff = cStaff()
                print(staff)
                #print(staff.inbox)
                #print(staff.inbox.messages)
                print(staff.public_key_file)

        #--------------------------------------------------------
        def test_current_provider():
                staff = cStaff()
                provider = gmCurrentProvider(provider = staff)
                print(provider)
                #print(provider.inbox)
                #print(provider.inbox.messages)
                print(provider.database_language)
                #tmp = provider.database_language
                #provider.database_language = None
                #print(provider.database_language)
                #provider.database_language = tmp
                #print(provider.database_language)
                print(provider.public_key_file)

        #--------------------------------------------------------
        def test_set_pubkey():
                staff = cStaff(aPK_obj = 1)
                print(staff)
                staff.public_key_file = sys.argv[2]
                print(staff.public_key_file)

        #--------------------------------------------------------
        def test_trustee_pkeys():
                print("passphrase trustees' public keys:")
                print('keys:', get_public_keys_of_passphrase_trustees(as_files = False))
                print('key files:', get_public_keys_of_passphrase_trustees(as_files = True))

        #--------------------------------------------------------
        gmPG2.request_login_params(setup_pool = True, force_tui = True)

        #test_staff()
        #test_current_provider()
        #test_trustee_pkeys()
        test_set_pubkey()

#============================================================

Functions

def activate_staff(conn=None, pk_staff=None)
Expand source code
def activate_staff(conn=None, pk_staff=None):
        # 1) activate staff entry
        staff = cStaff(aPK_obj = pk_staff)
        staff['is_active'] = True
        staff.save_payload(conn=conn)                           # FIXME: error handling
        # 2) enable database account login
        rowx, idx = gmPG2.run_rw_queries (
                link_obj = conn,
                # password does not matter because PG account must already exist
                queries = [{'cmd': 'select gm.create_user(%s, %s)', 'args': [staff['db_user'], 'flying wombat']}],
                end_tx = True
        )
        return True
def create_staff(conn=None, db_account=None, password=None, identity=None, short_alias=None)
Expand source code
def create_staff(conn=None, db_account=None, password=None, identity=None, short_alias=None):
        args = {
                'pg_usr': db_account,
                'pwd': password,
                'person_id': identity,
                'sig': short_alias
        }

        queries = [
                {'cmd': 'SELECT gm.create_user(%(pg_usr)s, %(pwd)s)', 'args': args},
                {'cmd': """
                        INSERT INTO dem.staff
                                (fk_identity, db_user, short_alias)
                        VALUES (
                                %(person_id)s,
                                %(pg_usr)s,
                                %(sig)s
                        )""",
                 'args': args
                }
        ]

        created = False
        try:
                rows, idx = gmPG2.run_rw_queries(link_obj = conn, queries = queries, end_tx = True)
                created = True
        except gmPG2.dbapi.IntegrityError as e:
                if e.pgcode != gmPG2.PG_error_codes.UNIQUE_VIOLATION:
                        raise

        if created:
                return True, None

        msg = _(
                'Cannot add GNUmed user.\n'
                '\n'
                'The database account [%s] is already listed as a\n'
                'GNUmed user. There can only be one GNUmed user\n'
                'for each database account.\n'
        ) % db_account
        return False, msg
def deactivate_staff(conn=None, pk_staff=None)
Expand source code
def deactivate_staff(conn=None, pk_staff=None):

        # 1) inactivate staff entry
        staff = cStaff(aPK_obj = pk_staff)
        staff['is_active'] = False
        staff.save_payload(conn = conn)                         # FIXME: error handling
        # 2) disable database account login
        rows, idx = gmPG2.run_rw_queries (
                link_obj = conn,
                queries = [{'cmd': 'select gm.disable_user(%s)', 'args': [staff['db_user']]}],
                end_tx = True
        )
        return True
def delete_staff(conn=None, pk_staff=None)
Expand source code
def delete_staff(conn=None, pk_staff=None):
        deleted = False
        queries = [{'cmd': 'DELETE FROM dem.staff WHERE pk = %(pk)s', 'args': {'pk': pk_staff}}]
        try:
                rows, idx = gmPG2.run_rw_queries(link_obj = conn, queries = queries, end_tx = True)
                deleted = True
        except gmPG2.dbapi.IntegrityError as e:
                if e.pgcode != gmPG2.PG_error_codes.FOREIGN_KEY_VIOLATION:              # 23503  foreign_key_violation
                        raise

        if deleted:
                return True, None

        deactivate_staff(conn = conn, pk_staff = pk_staff)
        msg = _(
                'Cannot delete GNUmed staff member because the\n'
                'database still contains data linked to it.\n'
                '\n'
                'The account was deactivated instead.'
        )
        return False, msg
def get_public_keys_of_passphrase_trustees(as_files: bool = False) ‑> list[str]

Retrieve the public keys of passphrase trustees.

Returns

List of keys or list of keyfile names, depending on 'as_files'.

Expand source code
def get_public_keys_of_passphrase_trustees(as_files:bool=False) -> list[str]:
        """Retrieve the public keys of passphrase trustees.

        Returns:
                List of keys or list of keyfile names, depending on 'as_files'.
        """
        pub_keys = gmCfgDB.get4site (
                option = 'horstspace.export.public_keys_of_passphrase_trustees',
                default = []
        )
        if not as_files or not pub_keys:
                return pub_keys

        fnames = []
        for pub_key in pub_keys:
                pubk_fname = gmTools.get_unique_filename()
                with open(pubk_fname, mode = 'wt', encoding = 'utf8') as pubk_f:
                        pubk_f.write(pub_key)
                fnames.append(pubk_fname)
        return fnames
def get_staff_list(active_only=False)
Expand source code
def get_staff_list(active_only=False):
        if active_only:
                cmd = _SQL_get_staff_fields % 'is_active ORDER BY can_login DESC, short_alias ASC'
        else:
                cmd = _SQL_get_staff_fields % 'TRUE ORDER BY can_login DESC, is_active DESC, short_alias ASC'
        rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx=True)
        staff_list = []
        for row in rows:
                obj_row = {
                        'idx': idx,
                        'data': row,
                        'pk_field': 'pk_staff'
                }
                staff_list.append(cStaff(row=obj_row))
        return staff_list
def set_current_provider_to_logged_on_user()
Expand source code
def set_current_provider_to_logged_on_user():
        gmCurrentProvider(provider = cStaff())

Classes

class cStaff (aPK_obj=None, row=None)

Represents business objects in the database.

Rules

  • instances ARE ASSUMED TO EXIST in the database
  • PK construction (aPK_obj): DOES verify its existence on instantiation (fetching data fails)
  • Row construction (row): allowed by using a dict of pairs of field name: field value (PERFORMANCE improvement)
  • does NOT verify FK target existence
  • does NOT create new entries in the database
  • does NOT lazy-fetch fields on access

Class scope SQL commands and variables:

_cmd_fetch_payload:

  • must return exactly one row
  • WHERE clause argument values are expected in self.pk_obj (taken from init(aPK_obj))
  • must return xmin of all rows that _cmds_store_payload will be updating, so views must support the xmin columns of their underlying tables

_cmds_store_payload:

  • one or multiple "update … set … where xmin_ = … and pk = …" statements which actually update the database from the data in self._payload,
  • the last query must refetch at least the XMIN values needed to detect concurrent updates, their field names had better be the same as in _cmd_fetch_payload,
  • the last query CAN return other fields which is particularly useful when those other fields are computed in the backend and may thus change upon save but will not have been set by the client code explicitly - this is only really of concern if the saved subclass is to be reused after saving rather than re-instantiated
  • when subclasses tend to live a while after save_payload() was called and they support computed fields (say, _(some_column) you need to return all columns (see cEncounter)

_updatable_fields:

  • a list of fields available for update via object['field']

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)

Args

aPK_obj
retrieve data from backend
  • a simple value the primary key WHERE condition must be a simple column
  • a dictionary of values the primary key WHERE condition must be a subselect consuming the dict and producing the single-value primary key
row
must hold the fields
  • idx: a dict mapping field names to position
  • data: the field values in a list (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key field OR
  • pk_obj: a dictionary suitable for passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'idx': idx,
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    }
    rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
    objects = [ cChildClass(row = {'data': r, 'idx': idx, 'pk_field': 'the PK column name'}) for r in rows ]
    
Expand source code
class cStaff(gmBusinessDBObject.cBusinessDBObject):
        _cmd_fetch_payload = _SQL_get_staff_fields % "pk_staff = %s"
        _cmds_store_payload = [
                """UPDATE dem.staff SET
                                short_alias = %(short_alias)s,
                                comment = gm.nullify_empty_string(%(comment)s),
                                is_active = %(is_active)s,
                                db_user = %(db_user)s,
                                public_key = %(public_key)s
                        WHERE
                                pk = %(pk_staff)s
                                        AND
                                xmin = %(xmin_staff)s
                        RETURNING
                                xmin AS xmin_staff"""
        ]
        _updatable_fields = ['short_alias', 'comment', 'is_active', 'db_user', 'public_key']

        #--------------------------------------------------------
        def __init__(self, aPK_obj=None, row=None):
                # by default get staff corresponding to CURRENT_USER
                if (aPK_obj is None) and (row is None):
                        cmd = _SQL_get_staff_fields % "db_user = CURRENT_USER"
                        try:
                                rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx=True)
                        except Exception:
                                _log.exception('cannot instantiate staff instance')
                                gmLog2.log_stack_trace()
                                raise ValueError('cannot instantiate staff instance for database account CURRENT_USER')
                        if len(rows) == 0:
                                raise ValueError('no staff record for database account CURRENT_USER')
                        row = {
                                'pk_field': 'pk_staff',
                                'idx': idx,
                                'data': rows[0]
                        }
                        gmBusinessDBObject.cBusinessDBObject.__init__(self, row = row)
                else:
                        gmBusinessDBObject.cBusinessDBObject.__init__(self, aPK_obj = aPK_obj, row = row)

                # are we SELF ?
                self.__is_current_user = (gmPG2.get_current_user() == self._payload[self._idx['db_user']])

                self.__inbox = None

        #--------------------------------------------------------
        def __setitem__(self, attribute, value):
                if attribute == 'db_user':
                        if self.__is_current_user:
                                _log.debug('will not modify database account association of CURRENT_USER staff member')
                                return

                gmBusinessDBObject.cBusinessDBObject.__setitem__(self, attribute, value)

        #--------------------------------------------------------
        def _get_db_lang(self):
                rows, idx = gmPG2.run_ro_queries (
                        queries = [{
                                'cmd': 'select i18n.get_curr_lang(%(usr)s)',
                                'args': {'usr': self._payload[self._idx['db_user']]}
                        }]
                )
                return rows[0][0]

        def _set_db_lang(self, language):
                if not gmPG2.set_user_language(language = language):
                        raise ValueError (
                                'Cannot set database language to [%s] for user [%s].' % (language, self._payload[self._idx['db_user']])
                        )
                return

        database_language = property(_get_db_lang, _set_db_lang)

        #--------------------------------------------------------
        def _get_inbox(self):
                if self.__inbox is None:
                        from Gnumed.business import gmProviderInbox
                        self.__inbox = gmProviderInbox.cProviderInbox(provider_id = self._payload[self._idx['pk_staff']])
                return self.__inbox

        inbox = property(_get_inbox)

        #--------------------------------------------------------
        def _get_identity(self):
                from Gnumed.business.gmPerson import cPerson
                return cPerson(self._payload[self._idx['pk_identity']])

        identity = property(_get_identity)

        #--------------------------------------------------------
        def set_role(self, conn=None, role=None):
                if role.strip() == self._payload[self._idx['role']]:
                        return True

                cmd = 'SELECT gm.add_user_to_permission_group(%(usr)s::name, %(grp)s::name)'
                args = {
                        'usr': self._payload[self._idx['db_user']],
                        'grp': _map_gm_role2pg_group[role.strip()]
                }
                rows, idx = gmPG2.run_rw_queries (
                        link_obj = conn,
                        queries = [{'cmd': cmd, 'args': args}],
                        get_col_idx = False,
                        return_data = True,
                        end_tx = True
                )
                if not rows[0][0]:
                        return False

                self.refetch_payload()
                return True

        role = property(lambda x:x, set_role)

        #--------------------------------------------------------
        def _get_public_key_file(self):
                if not self._payload['public_key']:
                        return None

                pk_fname = gmTools.get_unique_filename()
                with open(pk_fname, mode = 'wt', encoding = 'utf8') as pk_f:
                        pk_f.write(self._payload['public_key'])
                return pk_fname

        def _set_public_key_file(self, pk_fname):
                with open(pk_fname, mode = 'r', encoding = 'utf8') as pk_f:
                        self['public_key'] = pk_f.read()
                self.save()

        public_key_file = property(_get_public_key_file, _set_public_key_file)

Ancestors

Instance variables

var database_language
Expand source code
def _get_db_lang(self):
        rows, idx = gmPG2.run_ro_queries (
                queries = [{
                        'cmd': 'select i18n.get_curr_lang(%(usr)s)',
                        'args': {'usr': self._payload[self._idx['db_user']]}
                }]
        )
        return rows[0][0]
var identity
Expand source code
def _get_identity(self):
        from Gnumed.business.gmPerson import cPerson
        return cPerson(self._payload[self._idx['pk_identity']])
var inbox
Expand source code
def _get_inbox(self):
        if self.__inbox is None:
                from Gnumed.business import gmProviderInbox
                self.__inbox = gmProviderInbox.cProviderInbox(provider_id = self._payload[self._idx['pk_staff']])
        return self.__inbox
var public_key_file
Expand source code
def _get_public_key_file(self):
        if not self._payload['public_key']:
                return None

        pk_fname = gmTools.get_unique_filename()
        with open(pk_fname, mode = 'wt', encoding = 'utf8') as pk_f:
                pk_f.write(self._payload['public_key'])
        return pk_fname
var role
Expand source code
role = property(lambda x:x, set_role)

Methods

def set_role(self, conn=None, role=None)
Expand source code
def set_role(self, conn=None, role=None):
        if role.strip() == self._payload[self._idx['role']]:
                return True

        cmd = 'SELECT gm.add_user_to_permission_group(%(usr)s::name, %(grp)s::name)'
        args = {
                'usr': self._payload[self._idx['db_user']],
                'grp': _map_gm_role2pg_group[role.strip()]
        }
        rows, idx = gmPG2.run_rw_queries (
                link_obj = conn,
                queries = [{'cmd': cmd, 'args': args}],
                get_col_idx = False,
                return_data = True,
                end_tx = True
        )
        if not rows[0][0]:
                return False

        self.refetch_payload()
        return True

Inherited members

class gmCurrentProvider (provider=None)

Staff member Borg to hold currently logged on provider.

There may be many instances of this but they all share state.

Change or get currently logged on provider.

provider: * None: get copy of current instance * cStaff instance: change logged on provider (role)

Expand source code
class gmCurrentProvider(gmBorg.cBorg):
        """Staff member Borg to hold currently logged on provider.

        There may be many instances of this but they all share state.
        """
        def __init__(self, provider=None):
                """Change or get currently logged on provider.

                provider:
                * None: get copy of current instance
                * cStaff instance: change logged on provider (role)
                """
                # make sure we do have a provider pointer
                try:
                        self.provider
                except AttributeError:
                        self.provider = gmNull.cNull()

                # user wants copy of currently logged on provider
                if provider is None:
                        return None

                # must be cStaff instance, then
                if not isinstance(provider, cStaff):
                        raise ValueError('cannot set logged on provider to [%s], must be either None or cStaff instance' % str(provider))

                # first invocation
                if isinstance(self.provider, gmNull.cNull):
                        self.provider = provider
                        return None

                # same ID, no change needed
                if self.provider['pk_staff'] == provider['pk_staff']:
                        return None

                # user wants different provider
                raise ValueError('provider change [%s] -> [%s] not yet supported' % (self.provider['pk_staff'], provider['pk_staff']))

        #--------------------------------------------------------
        def get_staff(self):
                return self.provider

        #--------------------------------------------------------
        # __getitem__ handling
        #--------------------------------------------------------
        def __getitem__(self, aVar):
                """Return any attribute if known how to retrieve it by proxy.
                """
                return self.provider[aVar]

        #--------------------------------------------------------
        # __s/getattr__ handling
        #--------------------------------------------------------
        def __getattr__(self, attribute):
                if attribute == 'provider':                     # so we can __init__ ourselves
                        raise AttributeError
                if not isinstance(self.provider, gmNull.cNull):
                        return getattr(self.provider, attribute)

Ancestors

Methods

def get_staff(self)
Expand source code
def get_staff(self):
        return self.provider