Module Gnumed.pycommon.gmBackendListener
GNUmed database backend listener.
This module implements threaded listening for asynchronuous notifications from the database backend.
Classes
class gmBackendListener (conn=None, poll_interval: int = 3)
-
Expand source code
class gmBackendListener(gmBorg.cBorg): """The backend listener singleton class.""" def __init__(self, conn=None, poll_interval:int=3): if hasattr(self, 'already_inited'): return assert conn, '<conn> must be given' _log.info('setting up backend notifications listener') self.debug = False self.__notifications_received = 0 self.__messages_sent = 0 # the listener thread will regularly try to acquire # this lock, when it succeeds the thread will quit self._quit_lock = threading.Lock() # take the lock now so it cannot be taken by the worker # thread until it is released in shutdown() if not self._quit_lock.acquire(blocking = False): _log.error('cannot acquire thread-quit lock, aborting') raise EnvironmentError("cannot acquire thread-quit lock") self._conn = conn _log.debug('DB listener connection: %s', self._conn) self.backend_pid = self._conn.get_backend_pid() _log.debug('notification listener connection has backend PID [%s]', self.backend_pid) self._conn.set_isolation_level(0) # autocommit mode = psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT self._cursor = self._conn.cursor() try: self._conn_fd = self._conn.fileno() except AttributeError: self._conn_fd = self._cursor.fileno() self._conn_lock = threading.Lock() # lock for access to connection object self.__register_interests() # check for messages every 'poll_interval' seconds self._poll_interval = poll_interval self._listener_thread = None self.__start_thread() self.already_inited = True #------------------------------- # public API #------------------------------- def shutdown(self): """Cleanly shut down listening. Unregister notifications. Rejoin listener thread. """ _log.debug('received %s notifications', self.__notifications_received) _log.debug('sent %s messages', self.__messages_sent) if self._listener_thread is None: self.__shutdown_connection() return _log.info('stopping backend notifications listener thread') self._quit_lock.release() try: # give the worker thread time to terminate self._listener_thread.join(self._poll_interval+2.0) try: if self._listener_thread.is_alive(): _log.error('listener thread still alive after join()') _log.debug('active threads: %s' % threading.enumerate()) except Exception: pass except Exception: print(sys.exc_info()) self._listener_thread = None try: self.__unregister_unspecific_notifications() except Exception: _log.exception('unable to unregister unspecific notifications') self.__shutdown_connection() return #------------------------------- # event handlers #------------------------------- # internal helpers #------------------------------- def __register_interests(self): # determine unspecific notifications self.unspecific_notifications = signals2listen4 _log.info('configured unspecific notifications:') _log.info('%s' % self.unspecific_notifications) gmDispatcher.known_signals.extend(self.unspecific_notifications) # listen to unspecific notifications self.__register_unspecific_notifications() #------------------------------- def __register_unspecific_notifications(self): for sig in self.unspecific_notifications: _log.info('starting to listen for [%s]' % sig) cmd = 'LISTEN "%s"' % sig self._conn_lock.acquire(blocking = True) try: self._cursor.execute(cmd) finally: self._conn_lock.release() #------------------------------- def __unregister_unspecific_notifications(self): for sig in self.unspecific_notifications: _log.info('stopping to listen for [%s]' % sig) cmd = 'UNLISTEN "%s"' % sig self._conn_lock.acquire(1) try: self._cursor.execute(cmd) finally: self._conn_lock.release() #------------------------------- def __shutdown_connection(self): _log.debug('shutting down connection with backend PID [%s]', self.backend_pid) self._conn_lock.acquire(1) try: self._conn.rollback() except Exception: pass finally: self._conn_lock.release() #------------------------------- def __start_thread(self): if self._conn is None: raise ValueError("no connection to backend available, useless to start thread") self._listener_thread = threading.Thread ( target = self._process_notifications, name = self.__class__.__name__, daemon = True ) _log.info('starting listener thread') self._listener_thread.start() #------------------------------- def __parse_notification(self, notification) -> dict: if self.debug: print(notification) _log.debug('#%s: %s (first param: PID of sending backend; this backend: %s)', self.__notifications_received, notification, self.backend_pid) payload = notification.payload.split('::') data = { 'channel': notification.channel, 'notification_pid': notification.pid, 'operation': None, 'table': None, 'pk_column_name': None, 'pk_of_row': None, 'pk_identity': None } for item in payload: if item.startswith('operation='): data['operation'] = item.split('=')[1] if item.startswith('table='): data['table'] = item.split('=')[1] if item.startswith('PK name='): data['pk_column_name'] = item.split('=')[1] if item.startswith('row PK='): data['pk_of_row'] = int(item.split('=')[1]) if item.startswith('person PK='): data['pk_identity'] = -1 tmp = item.split('=')[1] if tmp != 'NULL': try: data['pk_identity'] = int(tmp) except ValueError: _log.error(payload) _log.exception('error in change notification trigger') return data #------------------------------- def __send_old_style_table_signal(self, data:dict): if data['table'] is None: return self.__messages_sent += 1 signal = '%s_mod_db' % data['table'] _log.debug('emulating old-style table specific signal [%s]', signal) try: gmDispatcher.send ( signal = signal, originated_in_database = True, listener_pid = self.backend_pid, sending_backend_pid = data['notification_pid'], pk_identity = data['pk_identity'], operation = data['operation'], table = data['table'], pk_column_name = data['pk_column_name'], pk_of_row = data['pk_of_row'], message_index = self.__messages_sent, notification_index = self.__notifications_received ) except Exception: print("problem routing notification [%s] from backend [%s] to intra-client dispatcher" % (signal, data['notification_pid'])) print(sys.exc_info()) #------------------------------- def __send_generic_signal(self, data:dict): self.__messages_sent += 1 try: gmDispatcher.send ( signal = data['channel'], originated_in_database = True, listener_pid = self.backend_pid, sending_backend_pid = data['notification_pid'], pk_identity = data['pk_identity'], operation = data['operation'], table = data['table'], pk_column_name = data['pk_column_name'], pk_of_row = data['pk_of_row'], message_index = self.__messages_sent, notification_index = self.__notifications_received ) except Exception: print("problem routing notification [%s] from backend [%s] to intra-client dispatcher" % (data['channel'], data['notification_pid'])) print(sys.exc_info()) #------------------------------- # the actual thread code #------------------------------- def _process_notifications(self): # loop until quitting _have_quit_lock = None while not _have_quit_lock: # quitting ? if self._quit_lock.acquire(0): break # wait at most self._poll_interval for new data self._conn_lock.acquire(1) try: ready_input_sockets = select.select([self._conn_fd], [], [], self._poll_interval)[0] finally: self._conn_lock.release() # any input available ? if len(ready_input_sockets) == 0: # no, select.select() timed out # give others a chance to grab the conn lock (eg listen/unlisten) time.sleep(0.3) continue # data available, wait for it to fully arrive self._conn_lock.acquire(1) try: self._conn.poll() finally: self._conn_lock.release() # any notifications ? while len(self._conn.notifies) > 0: # if self._quit_lock can be acquired we may be in # __del__ in which case gmDispatcher is not # guaranteed to exist anymore if self._quit_lock.acquire(0): _have_quit_lock = 1 break self._conn_lock.acquire(1) try: notification = self._conn.notifies.pop() finally: self._conn_lock.release() self.__notifications_received += 1 data = self.__parse_notification(notification) # try sending intra-client signals: self.__send_generic_signal(data) self.__send_old_style_table_signal(data) if self._quit_lock.acquire(0): # there may be more notifications pendings # but we don't care when quitting _have_quit_lock = 1 break # exit thread activity return
The backend listener singleton class.
Ancestors
Methods
def shutdown(self)
-
Expand source code
def shutdown(self): """Cleanly shut down listening. Unregister notifications. Rejoin listener thread. """ _log.debug('received %s notifications', self.__notifications_received) _log.debug('sent %s messages', self.__messages_sent) if self._listener_thread is None: self.__shutdown_connection() return _log.info('stopping backend notifications listener thread') self._quit_lock.release() try: # give the worker thread time to terminate self._listener_thread.join(self._poll_interval+2.0) try: if self._listener_thread.is_alive(): _log.error('listener thread still alive after join()') _log.debug('active threads: %s' % threading.enumerate()) except Exception: pass except Exception: print(sys.exc_info()) self._listener_thread = None try: self.__unregister_unspecific_notifications() except Exception: _log.exception('unable to unregister unspecific notifications') self.__shutdown_connection() return
Cleanly shut down listening.
Unregister notifications. Rejoin listener thread.