Module Gnumed.pycommon.gmBackendListener

GNUmed database backend listener.

This module implements threaded listening for asynchronuous notifications from the database backend.

Classes

class gmBackendListener (conn=None, poll_interval: int = 3)
Expand source code
class gmBackendListener(gmBorg.cBorg):
        """The backend listener singleton class."""
        def __init__(self, conn=None, poll_interval:int=3):

                if hasattr(self, 'already_inited'):
                        return

                assert conn, '<conn> must be given'

                _log.info('setting up backend notifications listener')
                self.debug = False
                self.__notifications_received = 0
                self.__messages_sent = 0
                # the listener thread will regularly try to acquire
                # this lock, when it succeeds the thread will quit
                self._quit_lock = threading.Lock()
                # take the lock now so it cannot be taken by the worker
                # thread until it is released in shutdown()
                if not self._quit_lock.acquire(blocking = False):
                        _log.error('cannot acquire thread-quit lock, aborting')
                        raise EnvironmentError("cannot acquire thread-quit lock")

                self._conn = conn
                _log.debug('DB listener connection: %s', self._conn)
                self.backend_pid = self._conn.get_backend_pid()
                _log.debug('notification listener connection has backend PID [%s]', self.backend_pid)
                self._conn.set_isolation_level(0)               # autocommit mode = psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
                self._cursor = self._conn.cursor()
                try:
                        self._conn_fd = self._conn.fileno()
                except AttributeError:
                        self._conn_fd = self._cursor.fileno()
                self._conn_lock = threading.Lock()              # lock for access to connection object
                self.__register_interests()

                # check for messages every 'poll_interval' seconds
                self._poll_interval = poll_interval
                self._listener_thread = None
                self.__start_thread()

                self.already_inited = True

        #-------------------------------
        # public API
        #-------------------------------
        def shutdown(self):
                """Cleanly shut down listening.

                Unregister notifications. Rejoin listener thread.
                """
                _log.debug('received %s notifications', self.__notifications_received)
                _log.debug('sent %s messages', self.__messages_sent)
                if self._listener_thread is None:
                        self.__shutdown_connection()
                        return

                _log.info('stopping backend notifications listener thread')
                self._quit_lock.release()
                try:
                        # give the worker thread time to terminate
                        self._listener_thread.join(self._poll_interval+2.0)
                        try:
                                if self._listener_thread.is_alive():
                                        _log.error('listener thread still alive after join()')
                                        _log.debug('active threads: %s' % threading.enumerate())
                        except Exception:
                                pass
                except Exception:
                        print(sys.exc_info())
                self._listener_thread = None
                try:
                        self.__unregister_unspecific_notifications()
                except Exception:
                        _log.exception('unable to unregister unspecific notifications')

                self.__shutdown_connection()
                return

        #-------------------------------
        # event handlers
        #-------------------------------
        # internal helpers
        #-------------------------------
        def __register_interests(self):
                # determine unspecific notifications
                self.unspecific_notifications = signals2listen4
                _log.info('configured unspecific notifications:')
                _log.info('%s' % self.unspecific_notifications)
                gmDispatcher.known_signals.extend(self.unspecific_notifications)
                # listen to unspecific notifications
                self.__register_unspecific_notifications()

        #-------------------------------
        def __register_unspecific_notifications(self):
                for sig in self.unspecific_notifications:
                        _log.info('starting to listen for [%s]' % sig)
                        cmd = 'LISTEN "%s"' % sig
                        self._conn_lock.acquire(blocking = True)
                        try:
                                self._cursor.execute(cmd)
                        finally:
                                self._conn_lock.release()

        #-------------------------------
        def __unregister_unspecific_notifications(self):
                for sig in self.unspecific_notifications:
                        _log.info('stopping to listen for [%s]' % sig)
                        cmd = 'UNLISTEN "%s"' % sig
                        self._conn_lock.acquire(1)
                        try:
                                self._cursor.execute(cmd)
                        finally:
                                self._conn_lock.release()

        #-------------------------------
        def __shutdown_connection(self):
                _log.debug('shutting down connection with backend PID [%s]', self.backend_pid)
                self._conn_lock.acquire(1)
                try:
                        self._conn.rollback()
                except Exception:
                        pass
                finally:
                        self._conn_lock.release()

        #-------------------------------
        def __start_thread(self):
                if self._conn is None:
                        raise ValueError("no connection to backend available, useless to start thread")

                self._listener_thread = threading.Thread (
                        target = self._process_notifications,
                        name = self.__class__.__name__,
                        daemon = True
                )
                _log.info('starting listener thread')
                self._listener_thread.start()

        #-------------------------------
        def __parse_notification(self, notification) -> dict:
                if self.debug:
                        print(notification)
                _log.debug('#%s: %s (first param: PID of sending backend; this backend: %s)', self.__notifications_received, notification, self.backend_pid)
                payload = notification.payload.split('::')
                data = {
                        'channel': notification.channel,
                        'notification_pid': notification.pid,
                        'operation': None,
                        'table': None,
                        'pk_column_name': None,
                        'pk_of_row': None,
                        'pk_identity': None
                }
                for item in payload:
                        if item.startswith('operation='):
                                data['operation'] = item.split('=')[1]
                        if item.startswith('table='):
                                data['table'] = item.split('=')[1]
                        if item.startswith('PK name='):
                                data['pk_column_name'] = item.split('=')[1]
                        if item.startswith('row PK='):
                                data['pk_of_row'] = int(item.split('=')[1])
                        if item.startswith('person PK='):
                                data['pk_identity'] = -1
                                tmp = item.split('=')[1]
                                if tmp != 'NULL':
                                        try:
                                                data['pk_identity'] = int(tmp)
                                        except ValueError:
                                                _log.error(payload)
                                                _log.exception('error in change notification trigger')
                return data

        #-------------------------------
        def __send_old_style_table_signal(self, data:dict):
                if data['table'] is None:
                        return

                self.__messages_sent += 1
                signal = '%s_mod_db' % data['table']
                _log.debug('emulating old-style table specific signal [%s]', signal)
                try:
                        gmDispatcher.send (
                                signal = signal,
                                originated_in_database = True,
                                listener_pid = self.backend_pid,
                                sending_backend_pid = data['notification_pid'],
                                pk_identity = data['pk_identity'],
                                operation = data['operation'],
                                table = data['table'],
                                pk_column_name = data['pk_column_name'],
                                pk_of_row = data['pk_of_row'],
                                message_index = self.__messages_sent,
                                notification_index = self.__notifications_received
                        )
                except Exception:
                        print("problem routing notification [%s] from backend [%s] to intra-client dispatcher" % (signal, data['notification_pid']))
                        print(sys.exc_info())

        #-------------------------------
        def __send_generic_signal(self, data:dict):
                self.__messages_sent += 1
                try:
                        gmDispatcher.send (
                                signal = data['channel'],
                                originated_in_database = True,
                                listener_pid = self.backend_pid,
                                sending_backend_pid = data['notification_pid'],
                                pk_identity = data['pk_identity'],
                                operation = data['operation'],
                                table = data['table'],
                                pk_column_name = data['pk_column_name'],
                                pk_of_row = data['pk_of_row'],
                                message_index = self.__messages_sent,
                                notification_index = self.__notifications_received
                        )
                except Exception:
                        print("problem routing notification [%s] from backend [%s] to intra-client dispatcher" % (data['channel'], data['notification_pid']))
                        print(sys.exc_info())

        #-------------------------------
        # the actual thread code
        #-------------------------------
        def _process_notifications(self):

                # loop until quitting
                _have_quit_lock = None
                while not _have_quit_lock:
                        # quitting ?
                        if self._quit_lock.acquire(0):
                                break

                        # wait at most self._poll_interval for new data
                        self._conn_lock.acquire(1)
                        try:
                                ready_input_sockets = select.select([self._conn_fd], [], [], self._poll_interval)[0]
                        finally:
                                self._conn_lock.release()
                        # any input available ?
                        if len(ready_input_sockets) == 0:
                                # no, select.select() timed out
                                # give others a chance to grab the conn lock (eg listen/unlisten)
                                time.sleep(0.3)
                                continue
                        # data available, wait for it to fully arrive
                        self._conn_lock.acquire(1)
                        try:
                                self._conn.poll()
                        finally:
                                self._conn_lock.release()
                        # any notifications ?
                        while len(self._conn.notifies) > 0:
                                # if self._quit_lock can be acquired we may be in
                                # __del__ in which case gmDispatcher is not
                                # guaranteed to exist anymore
                                if self._quit_lock.acquire(0):
                                        _have_quit_lock = 1
                                        break

                                self._conn_lock.acquire(1)
                                try:
                                        notification = self._conn.notifies.pop()
                                finally:
                                        self._conn_lock.release()
                                self.__notifications_received += 1
                                data = self.__parse_notification(notification)
                                # try sending intra-client signals:
                                self.__send_generic_signal(data)
                                self.__send_old_style_table_signal(data)
                                if self._quit_lock.acquire(0):
                                        # there may be more notifications pendings
                                        # but we don't care when quitting
                                        _have_quit_lock = 1
                                        break

                # exit thread activity
                return

The backend listener singleton class.

Ancestors

Methods

def shutdown(self)
Expand source code
def shutdown(self):
        """Cleanly shut down listening.

        Unregister notifications. Rejoin listener thread.
        """
        _log.debug('received %s notifications', self.__notifications_received)
        _log.debug('sent %s messages', self.__messages_sent)
        if self._listener_thread is None:
                self.__shutdown_connection()
                return

        _log.info('stopping backend notifications listener thread')
        self._quit_lock.release()
        try:
                # give the worker thread time to terminate
                self._listener_thread.join(self._poll_interval+2.0)
                try:
                        if self._listener_thread.is_alive():
                                _log.error('listener thread still alive after join()')
                                _log.debug('active threads: %s' % threading.enumerate())
                except Exception:
                        pass
        except Exception:
                print(sys.exc_info())
        self._listener_thread = None
        try:
                self.__unregister_unspecific_notifications()
        except Exception:
                _log.exception('unable to unregister unspecific notifications')

        self.__shutdown_connection()
        return

Cleanly shut down listening.

Unregister notifications. Rejoin listener thread.