diff --git a/lib/tfw/builtins/__init__.py b/lib/tfw/builtins/__init__.py index d276619..0343fe7 100644 --- a/lib/tfw/builtins/__init__.py +++ b/lib/tfw/builtins/__init__.py @@ -1,7 +1,5 @@ from .directory_snapshotting_event_handler import DirectorySnapshottingEventHandler -from .event_handler import EventHandler from .frontend_event_handler import FrontendEventHandler -from .fsm_aware_event_handler import FSMAwareEventHandler from .fsm_managing_event_handler import FSMManagingEventHandler from .ide_event_handler import IdeEventHandler from .log_monitoring_event_handler import LogMonitoringEventHandler @@ -10,4 +8,3 @@ from .pipe_io_event_handler import TransformerPipeIOEventHandler, CommandEventHa from .process_managing_event_handler import ProcessManagingEventHandler from .terminal_commands_event_handler import TerminalCommandsEventHandler from .terminal_event_handler import TerminalEventHandler -from .tfw_server_connector import TFWServerUplinkConnector, TFWServerConnector diff --git a/lib/tfw/builtins/directory_snapshotting_event_handler.py b/lib/tfw/builtins/directory_snapshotting_event_handler.py index 25986eb..0029f9b 100644 --- a/lib/tfw/builtins/directory_snapshotting_event_handler.py +++ b/lib/tfw/builtins/directory_snapshotting_event_handler.py @@ -10,14 +10,14 @@ from tfw.components.snapshot_provider import SnapshotProvider from tfw.config import TFWENV from tfw.networking import Scope -from .event_handler import EventHandler LOG = logging.getLogger(__name__) -class DirectorySnapshottingEventHandler(EventHandler): - def __init__(self, key, directories, exclude_unix_patterns=None): - super().__init__(key, scope=Scope.WEBSOCKET) +class DirectorySnapshottingEventHandler: + keys = ['snapshot'] + + def __init__(self, directories, exclude_unix_patterns=None): self.snapshot_providers = {} self._exclude_unix_patterns = exclude_unix_patterns self.init_snapshot_providers(directories) @@ -46,11 +46,11 @@ class DirectorySnapshottingEventHandler(EventHandler): makedirs(git_dir, exist_ok=True) return git_dir - def handle_event(self, message): + def handle_event(self, message, server_connector): try: data = message['data'] message['data'] = self.command_handlers[data['command']](data) - self.send_message(message) + server_connector.send_message(message, scope=Scope.WEBSOCKET) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) diff --git a/lib/tfw/builtins/event_handler.py b/lib/tfw/builtins/event_handler.py deleted file mode 100644 index 9696fe4..0000000 --- a/lib/tfw/builtins/event_handler.py +++ /dev/null @@ -1,9 +0,0 @@ -from tfw.event_handlers import EventHandlerBase - -from .tfw_server_connector import TFWServerConnector - - -class EventHandler(EventHandlerBase): - # pylint: disable=abstract-method - def _build_server_connector(self): - return TFWServerConnector() diff --git a/lib/tfw/builtins/frontend_event_handler.py b/lib/tfw/builtins/frontend_event_handler.py index ba39c27..1fadc5b 100644 --- a/lib/tfw/builtins/frontend_event_handler.py +++ b/lib/tfw/builtins/frontend_event_handler.py @@ -1,16 +1,19 @@ from tfw.networking import Scope from tfw.components import FrontendMessageStorage -from .event_handler import EventHandler +class FrontendEventHandler: + keys = ['message', 'queueMessages', 'dashboard', 'console'] -class FrontendEventHandler(EventHandler): def __init__(self): - frontend_keys = ('message', 'queueMessages', 'dashboard', 'console') - self._frontend_message_storage = FrontendMessageStorage(frontend_keys) - super().__init__((*frontend_keys, 'recover'), scope=Scope.WEBSOCKET) + self.server_connector = None + self.keys = [*type(self).keys, 'recover'] + self._frontend_message_storage = FrontendMessageStorage(type(self).keys) - def handle_event(self, message): + def send_message(self, message): + self.server_connector.send_message(message, scope=Scope.WEBSOCKET) + + def handle_event(self, message, _): self._frontend_message_storage.save_message(message) if message['key'] == 'recover': self.recover_frontend() diff --git a/lib/tfw/builtins/fsm_aware_event_handler.py b/lib/tfw/builtins/fsm_aware_event_handler.py deleted file mode 100644 index d585651..0000000 --- a/lib/tfw/builtins/fsm_aware_event_handler.py +++ /dev/null @@ -1,20 +0,0 @@ -from tfw.components import FSMAware -from tfw.networking import Scope - -from .event_handler import EventHandler - - -class FSMAwareEventHandler(EventHandler, FSMAware): - # pylint: disable=abstract-method - """ - Abstract base class for EventHandlers which automatically - keep track of the state of the TFW FSM. - """ - def __init__(self, key, scope=Scope.ZMQ): - EventHandler.__init__(self, key, scope=scope) - FSMAware.__init__(self) - self.subscribe('fsm_update') - - def dispatch_handling(self, message): - if not self.refresh_on_fsm_update(message): - super().dispatch_handling(message) diff --git a/lib/tfw/builtins/fsm_managing_event_handler.py b/lib/tfw/builtins/fsm_managing_event_handler.py index ccbb7de..dbc568b 100644 --- a/lib/tfw/builtins/fsm_managing_event_handler.py +++ b/lib/tfw/builtins/fsm_managing_event_handler.py @@ -4,12 +4,12 @@ from tfw.crypto import KeyManager, sign_message, verify_message from tfw.networking import Scope from tfw.components import FSMUpdater -from .event_handler import EventHandler LOG = logging.getLogger(__name__) -class FSMManagingEventHandler(EventHandler): +class FSMManagingEventHandler: + keys = ['fsm'] """ EventHandler responsible for managing the state machine of the framework (TFW FSM). @@ -24,8 +24,7 @@ class FSMManagingEventHandler(EventHandler): An 'fsm_update' message is broadcasted after every successful command. """ - def __init__(self, key, fsm_type, require_signature=False): - super().__init__(key, scope=Scope.WEBSOCKET) + def __init__(self, fsm_type, require_signature=False): self.fsm = fsm_type() self._fsm_updater = FSMUpdater(self.fsm) self.auth_key = KeyManager().auth_key @@ -36,15 +35,14 @@ class FSMManagingEventHandler(EventHandler): 'update': self.handle_update } - def handle_event(self, message): + def handle_event(self, message, server_connector): try: message = self.command_handlers[message['data']['command']](message) if message: fsm_update_message = self._fsm_updater.fsm_update sign_message(self.auth_key, message) sign_message(self.auth_key, fsm_update_message) - self.server_connector.send_message(fsm_update_message, Scope.BROADCAST) - self.send_message(message) + server_connector.send_message(fsm_update_message, Scope.BROADCAST) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) diff --git a/lib/tfw/builtins/ide_event_handler.py b/lib/tfw/builtins/ide_event_handler.py index ab5a7eb..1ec4ebd 100644 --- a/lib/tfw/builtins/ide_event_handler.py +++ b/lib/tfw/builtins/ide_event_handler.py @@ -4,7 +4,6 @@ from tfw.networking import Scope from tfw.components import FileManager from tfw.components.inotify import InotifyObserver -from .event_handler import EventHandler LOG = logging.getLogger(__name__) @@ -32,7 +31,8 @@ BUILD_ARTIFACTS = ( ) -class IdeEventHandler(EventHandler): +class IdeEventHandler: + keys = ['ide'] # pylint: disable=too-many-arguments,anomalous-backslash-in-string """ Event handler implementing the backend of our browser based IDE. @@ -47,7 +47,7 @@ class IdeEventHandler(EventHandler): The API of each command is documented in their respective handler. """ - def __init__(self, key, directory, allowed_directories, selected_file=None, exclude=None): + def __init__(self, directory, allowed_directories, selected_file=None, exclude=None): """ :param key: the key this instance should listen to :param directory: working directory which the EventHandler should serve files from @@ -55,7 +55,7 @@ class IdeEventHandler(EventHandler): :param selected_file: file that is selected by default :param exclude: list of filenames that should not appear between files (for .o, .pyc, etc.) """ - super().__init__(key, scope=Scope.WEBSOCKET) + self.server_connector = None try: self.filemanager = FileManager( allowed_directories=allowed_directories, @@ -84,10 +84,13 @@ class IdeEventHandler(EventHandler): } def _reload_frontend(self, event): # pylint: disable=unused-argument - self.server_connector.send_message({ + self.send_message({ 'key': 'ide', 'data': {'command': 'reload'} - }, Scope.WEBSOCKET) + }) + + def send_message(self, message): + self.server_connector.send_message(message, scope=Scope.WEBSOCKET) def read(self, data): """ @@ -179,7 +182,7 @@ class IdeEventHandler(EventHandler): data['files'] = self.filemanager.files data['directory'] = self.filemanager.workdir - def handle_event(self, message): + def handle_event(self, message, _): try: data = message['data'] message['data'] = self.commands[data['command']](data) diff --git a/lib/tfw/builtins/log_monitoring_event_handler.py b/lib/tfw/builtins/log_monitoring_event_handler.py index eebade5..7e376fc 100644 --- a/lib/tfw/builtins/log_monitoring_event_handler.py +++ b/lib/tfw/builtins/log_monitoring_event_handler.py @@ -1,15 +1,14 @@ import logging from tfw.config import TFWENV -from tfw.networking import Scope from tfw.components import LogInotifyObserver -from .event_handler import EventHandler LOG = logging.getLogger(__name__) -class LogMonitoringEventHandler(EventHandler): +class LogMonitoringEventHandler: + keys = ['logmonitor'] """ Monitors the output of a supervisor process (stdout, stderr) and sends the results to the frontend. @@ -19,23 +18,27 @@ class LogMonitoringEventHandler(EventHandler): The API of each command is documented in their respective handler. """ - def __init__(self, key, process_name, log_tail=0): - super().__init__(key, scope=Scope.WEBSOCKET) + def __init__(self, process_name, log_tail=0): + self.server_connector = None self.process_name = process_name - self._monitor = LogInotifyObserver( - server_connector=self.server_connector, - supervisor_uri=TFWENV.SUPERVISOR_HTTP_URI, - process_name=process_name, - log_tail=log_tail - ) - self._monitor.start() + self._initial_log_tail = log_tail + self._monitor = None self.command_handlers = { 'process_name': self.handle_process_name, 'log_tail': self.handle_log_tail } - def handle_event(self, message): + def start(self): + self._monitor = LogInotifyObserver( + server_connector=self.server_connector, + supervisor_uri=TFWENV.SUPERVISOR_HTTP_URI, + process_name=self.process_name, + log_tail=self._initial_log_tail + ) + self._monitor.start() + + def handle_event(self, message, _): try: data = message['data'] self.command_handlers[data['command']](data) diff --git a/lib/tfw/builtins/pipe_io_event_handler.py b/lib/tfw/builtins/pipe_io_event_handler.py index 96bff90..0a8e02f 100644 --- a/lib/tfw/builtins/pipe_io_event_handler.py +++ b/lib/tfw/builtins/pipe_io_event_handler.py @@ -12,15 +12,16 @@ from contextlib import suppress from tfw.components.pipe_io_server import PipeIOServer, terminate_process_on_failure -from .event_handler import EventHandler LOG = logging.getLogger(__name__) DEFAULT_PERMISSIONS = 0o600 -class PipeIOEventHandlerBase(EventHandler): - def __init__(self, key, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS): - super().__init__(key) +class PipeIOEventHandlerBase: + keys = [''] + + def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS): + self.server_connector = None self.pipe_io = CallbackPipeIOServer( in_pipe_path, out_pipe_path, @@ -50,25 +51,25 @@ class CallbackPipeIOServer(PipeIOServer): class PipeIOEventHandler(PipeIOEventHandlerBase): - def handle_event(self, message): + def handle_event(self, message, _): json_bytes = dumps(message).encode() self.pipe_io.send_message(json_bytes) def handle_pipe_event(self, message_bytes): json = loads(message_bytes) - self.send_message(json) + self.server_connector.send_message(json) class TransformerPipeIOEventHandler(PipeIOEventHandlerBase): # pylint: disable=too-many-arguments def __init__( - self, key, in_pipe_path, out_pipe_path, + self, in_pipe_path, out_pipe_path, transform_in_cmd, transform_out_cmd, permissions=DEFAULT_PERMISSIONS ): self._transform_in = partial(self._transform_message, transform_in_cmd) self._transform_out = partial(self._transform_message, transform_out_cmd) - super().__init__(key, in_pipe_path, out_pipe_path, permissions) + super().__init__(in_pipe_path, out_pipe_path, permissions) @staticmethod def _transform_message(transform_cmd, message): @@ -83,7 +84,7 @@ class TransformerPipeIOEventHandler(PipeIOEventHandlerBase): return proc.stdout raise ValueError(f'Transforming message {message} failed!') - def handle_event(self, message): + def handle_event(self, message, _): json_bytes = dumps(message).encode() transformed_bytes = self._transform_out(json_bytes) if transformed_bytes: @@ -93,13 +94,12 @@ class TransformerPipeIOEventHandler(PipeIOEventHandlerBase): transformed_bytes = self._transform_in(message_bytes) if transformed_bytes: json_message = loads(transformed_bytes) - self.send_message(json_message) + self.server_connector.send_message(json_message) class CommandEventHandler(PipeIOEventHandler): - def __init__(self, key, command, permissions=DEFAULT_PERMISSIONS): + def __init__(self, command, permissions=DEFAULT_PERMISSIONS): super().__init__( - key, self._generate_tempfilename(), self._generate_tempfilename(), permissions diff --git a/lib/tfw/builtins/process_managing_event_handler.py b/lib/tfw/builtins/process_managing_event_handler.py index 2a1f968..fe25ab6 100644 --- a/lib/tfw/builtins/process_managing_event_handler.py +++ b/lib/tfw/builtins/process_managing_event_handler.py @@ -5,12 +5,12 @@ from tfw.config import TFWENV from tfw.networking import Scope from tfw.components import ProcessManager, LogManager -from .event_handler import EventHandler LOG = logging.getLogger(__name__) -class ProcessManagingEventHandler(EventHandler, ProcessManager, LogManager): +class ProcessManagingEventHandler(ProcessManager, LogManager): + keys = ['processmanager'] """ Event handler that can manage processes managed by supervisor. @@ -23,8 +23,7 @@ class ProcessManagingEventHandler(EventHandler, ProcessManager, LogManager): Commands available: start, stop, restart, readlog (the names are as self-documenting as it gets) """ - def __init__(self, key, log_tail=0): - EventHandler.__init__(self, key, scope=Scope.WEBSOCKET) + def __init__(self, log_tail=0): ProcessManager.__init__(self, TFWENV.SUPERVISOR_HTTP_URI) LogManager.__init__(self, TFWENV.SUPERVISOR_HTTP_URI) self.log_tail = log_tail @@ -34,7 +33,7 @@ class ProcessManagingEventHandler(EventHandler, ProcessManager, LogManager): 'restart': self.restart_process } - def handle_event(self, message): + def handle_event(self, message, server_connector): try: data = message['data'] try: @@ -50,6 +49,6 @@ class ProcessManagingEventHandler(EventHandler, ProcessManager, LogManager): data['process_name'], self.log_tail ) - self.send_message(message) + server_connector.send_message(message, scope=Scope.WEBSOCKET) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) diff --git a/lib/tfw/builtins/terminal_commands_event_handler.py b/lib/tfw/builtins/terminal_commands_event_handler.py index 0385f0b..2bb7399 100644 --- a/lib/tfw/builtins/terminal_commands_event_handler.py +++ b/lib/tfw/builtins/terminal_commands_event_handler.py @@ -1,14 +1,9 @@ from tfw.components import TerminalCommands -from tfw.networking import Scope - -from .event_handler import EventHandler -class TerminalCommandsEventHandler(EventHandler, TerminalCommands): - def __init__(self, key, scope=Scope.ZMQ, bashrc=None): - EventHandler.__init__(self, key, scope) - TerminalCommands.__init__(self, bashrc) +class TerminalCommandsEventHandler(TerminalCommands): + keys = ['history.bash'] - def handle_event(self, message): + def handle_event(self, message, _): command = message['value'] self.callback(command) diff --git a/lib/tfw/builtins/terminal_event_handler.py b/lib/tfw/builtins/terminal_event_handler.py index 1f7bf5f..d02a058 100644 --- a/lib/tfw/builtins/terminal_event_handler.py +++ b/lib/tfw/builtins/terminal_event_handler.py @@ -1,16 +1,15 @@ import logging -from tfw.networking import Scope from tfw.components import BashMonitor, TerminadoMiniServer from tfw.config import TFWENV from tao.config import TAOENV -from .event_handler import EventHandler LOG = logging.getLogger(__name__) -class TerminalEventHandler(EventHandler): +class TerminalEventHandler: + keys = ['shell'] """ Event handler responsible for managing terminal sessions for frontend xterm sessions to connect to. You need to instanciate this in order for frontend @@ -20,13 +19,13 @@ class TerminalEventHandler(EventHandler): a command to be executed. The API of each command is documented in their respective handler. """ - def __init__(self, key): + def __init__(self): """ :param key: key this EventHandler listens to :param monitor: tfw.components.HistoryMonitor instance to read command history from """ - super().__init__(key, scope=Scope.WEBSOCKET) - self._historymonitor = BashMonitor(self.server_connector, TFWENV.HISTFILE) + self.server_connector = None + self._historymonitor = None bash_as_user_cmd = ['sudo', '-u', TAOENV.USER, 'bash'] self.terminado_server = TerminadoMiniServer( @@ -41,18 +40,20 @@ class TerminalEventHandler(EventHandler): 'read': self.read } - self._historymonitor.start() self.terminado_server.listen() + def start(self): + self._historymonitor = BashMonitor(self.server_connector, TFWENV.HISTFILE) + self._historymonitor.start() + @property def historymonitor(self): return self._historymonitor - def handle_event(self, message): + def handle_event(self, message, _): try: data = message['data'] message['data'] = self.commands[data['command']](data) - self.send_message(message) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) diff --git a/lib/tfw/components/__init__.py b/lib/tfw/components/__init__.py index a1355d7..b44466f 100644 --- a/lib/tfw/components/__init__.py +++ b/lib/tfw/components/__init__.py @@ -1,6 +1,5 @@ from .commands_equal import CommandsEqual from .file_manager import FileManager -from .fsm_aware import FSMAware from .fsm_updater import FSMUpdater from .history_monitor import BashMonitor, GDBMonitor from .log_inotify_observer import LogInotifyObserver diff --git a/lib/tfw/components/fsm_updater.py b/lib/tfw/components/fsm_updater.py index 6bff16a..93f9e7a 100644 --- a/lib/tfw/components/fsm_updater.py +++ b/lib/tfw/components/fsm_updater.py @@ -6,7 +6,7 @@ class FSMUpdater: def fsm_update(self): return { 'key': 'fsm_update', - 'data': self.fsm_update_data + **self.fsm_update_data } @property diff --git a/lib/tfw/components/terminal_commands.py b/lib/tfw/components/terminal_commands.py index adba1b4..d705eb2 100644 --- a/lib/tfw/components/terminal_commands.py +++ b/lib/tfw/components/terminal_commands.py @@ -26,7 +26,7 @@ class TerminalCommands(ABC): You can also use this class to create new commands similarly. """ - def __init__(self, bashrc=None): + def __init__(self, bashrc): self._command_method_regex = r'^command_(.+)$' self.command_implemetations = self._build_command_to_implementation_dict() if bashrc is not None: diff --git a/lib/tfw/event_handlers/__init__.py b/lib/tfw/event_handlers/__init__.py index 7284ec8..8fc4177 100644 --- a/lib/tfw/event_handlers/__init__.py +++ b/lib/tfw/event_handlers/__init__.py @@ -1 +1,3 @@ -from .event_handler_base import EventHandlerBase +from .event_handler_factory import EventHandlerFactoryBase +from .event_handler import EventHandler +from .fsm_aware_event_handler import FSMAwareEventHandler diff --git a/lib/tfw/event_handlers/event_handler_base.py b/lib/tfw/event_handlers/event_handler_base.py deleted file mode 100644 index e56eef5..0000000 --- a/lib/tfw/event_handlers/event_handler_base.py +++ /dev/null @@ -1,117 +0,0 @@ -import logging -from abc import ABC, abstractmethod -from typing import Iterable - -from tfw.networking import Scope - -LOG = logging.getLogger(__name__) - - -class EventHandlerBase(ABC): - """ - Abstract base class for all Python based EventHandlers. Useful implementation template - for other languages. - - Derived classes must implement the handle_event() method - """ - _instances = set() - - def __init__(self, key, scope=Scope.ZMQ): - type(self)._instances.add(self) - self.server_connector = self._build_server_connector() - self.scope = scope - self.keys = [] - if isinstance(key, str): - self.keys.append(key) - elif isinstance(key, Iterable): - self.keys = list(key) - - self.subscribe(*self.keys) - self.server_connector.register_callback(self.event_handler_callback) - - @abstractmethod - def _build_server_connector(self): - raise NotImplementedError() - - def subscribe(self, *keys): - """ - Subscribe this EventHandler to receive events for given keys. - Note that you can subscribe to the same key several times in which - case you will need to unsubscribe multiple times in order to stop - receiving events. - - :param keys: list of keys to subscribe to - """ - for key in keys: - self.server_connector.subscribe(key) - self.keys.append(key) - - def event_handler_callback(self, message): - """ - Callback that is invoked when receiving a message. - Dispatches messages to handler methods and sends - a response back in case the handler returned something. - This is subscribed in __init__(). - """ - if self.check_key(message): - self.dispatch_handling(message) - - def check_key(self, message): - """ - Checks whether the message is intended for this - EventHandler. - - This is necessary because ZMQ handles PUB - SUB - connetions with pattern matching (e.g. someone - subscribed to 'fsm' will receive 'fsm_update' - messages as well. - """ - if '' in self.keys: - return True - return message['key'] in self.keys - - def dispatch_handling(self, message): - """ - Used to dispatch messages to their specific handlers. - - :param message: the message received - :returns: the message to send back - """ - self.handle_event(message) - - def handle_event(self, message): - """ - Abstract method that implements the handling of messages. - - :param message: the message received - :returns: the message to send back - """ - raise NotImplementedError() - - def send_message(self, message): - self.server_connector.send_message(message, self.scope) - - def unsubscribe(self, *keys): - """ - Unsubscribe this eventhandler from the given keys. - - :param keys: list of keys to unsubscribe from - """ - for key in keys: - self.server_connector.unsubscribe(key) - self.keys.remove(key) - - @classmethod - def stop_all_instances(cls): - for instance in cls._instances: - instance.stop() - - def stop(self): - self.server_connector.close() - self.cleanup() - - def cleanup(self): - """ - Perform cleanup actions such as releasing database - connections and stuff like that. - """ diff --git a/lib/tfw/event_handlers/event_handler_factory.py b/lib/tfw/event_handlers/event_handler_factory.py index dc03c46..52f6962 100644 --- a/lib/tfw/event_handlers/event_handler_factory.py +++ b/lib/tfw/event_handlers/event_handler_factory.py @@ -24,13 +24,21 @@ class EventHandlerBuilder: self._event_handler_type = event_handler_type def build(self, server_connector): - server_connector.subscribe(*self._analyzer.keys) event_handler = self._event_handler_type(server_connector) + server_connector.subscribe(*self._try_get_keys(event_handler)) event_handler.handle_event = self._analyzer.handle_event with suppress(AttributeError): event_handler.cleanup = self._analyzer.cleanup return event_handler + def _try_get_keys(self, event_handler): + try: + return self._analyzer.keys + except ValueError: + with suppress(AttributeError): + return event_handler.keys + raise + class EventHandlerAnalyzer: def __init__(self, event_handler, supplied_keys): diff --git a/lib/tfw/components/fsm_aware.py b/lib/tfw/event_handlers/fsm_aware.py similarity index 50% rename from lib/tfw/components/fsm_aware.py rename to lib/tfw/event_handlers/fsm_aware.py index 395a162..76d8d3f 100644 --- a/lib/tfw/components/fsm_aware.py +++ b/lib/tfw/event_handlers/fsm_aware.py @@ -6,6 +6,7 @@ LOG = logging.getLogger(__name__) class FSMAware: + keys = ['fsm_update'] """ Base class for stuff that has to be aware of the framework FSM. This is done by processing 'fsm_update' messages. @@ -16,27 +17,21 @@ class FSMAware: self.fsm_event_log = [] self._auth_key = KeyManager().auth_key - def refresh_on_fsm_update(self, message): - if message['key'] == 'fsm_update' and verify_message(self._auth_key, message): - self._handle_fsm_update(message) - return True - return False + def process_message(self, message): + if message['key'] == 'fsm_update': + if verify_message(self._auth_key, message): + self._handle_fsm_update(message) def _handle_fsm_update(self, message): try: - update_data = message['data'] - new_state = update_data['current_state'] + new_state = message['current_state'] if self.fsm_state != new_state: - self.handle_fsm_step(**update_data) + self.handle_fsm_step(message) self.fsm_state = new_state - self.fsm_in_accepted_state = update_data['in_accepted_state'] - self.fsm_event_log.append(update_data) + self.fsm_in_accepted_state = message['in_accepted_state'] + self.fsm_event_log.append(message) except KeyError: LOG.error('Invalid fsm_update message received!') - def handle_fsm_step(self, **kwargs): - """ - Called in case the TFW FSM has stepped. - - :param kwargs: fsm_update 'data' field - """ + def handle_fsm_step(self, message): + pass diff --git a/lib/tfw/event_handlers/fsm_aware_event_handler.py b/lib/tfw/event_handlers/fsm_aware_event_handler.py new file mode 100644 index 0000000..966d4d4 --- /dev/null +++ b/lib/tfw/event_handlers/fsm_aware_event_handler.py @@ -0,0 +1,19 @@ +from .event_handler import EventHandler +from .fsm_aware import FSMAware + + +class FSMAwareEventHandler(EventHandler, FSMAware): + # pylint: disable=abstract-method + """ + Abstract base class for EventHandlers which automatically + keep track of the state of the TFW FSM. + """ + def __init__(self, server_connector): + EventHandler.__init__(self, server_connector) + FSMAware.__init__(self) + + def _event_callback(self, message): + self.process_message(message) + + def handle_fsm_step(self, message): + self.handle_event(message, self.server_connector) diff --git a/lib/tfw/event_handlers/test_event_handler.py b/lib/tfw/event_handlers/test_event_handler.py index edde537..ae165f3 100644 --- a/lib/tfw/event_handlers/test_event_handler.py +++ b/lib/tfw/event_handlers/test_event_handler.py @@ -174,13 +174,17 @@ def test_build_raises_if_no_key(test_keys): with pytest.raises(ValueError): MockEventHandlerFactory().build(eh) - def test_handle_event(*_): + def handle_event(*_): pass with pytest.raises(ValueError): - MockEventHandlerFactory().build(test_handle_event) + MockEventHandlerFactory().build(handle_event) with pytest.raises(ValueError): MockEventHandlerFactory().build(lambda msg, sc: None) + WithKeysEventHandler = EventHandler + WithKeysEventHandler.keys = test_keys + MockEventHandlerFactory().build(eh, event_handler_type=WithKeysEventHandler) + eh.keys = test_keys MockEventHandlerFactory().build(eh) diff --git a/lib/tfw/main/__init__.py b/lib/tfw/main/__init__.py new file mode 100644 index 0000000..9e1c892 --- /dev/null +++ b/lib/tfw/main/__init__.py @@ -0,0 +1,3 @@ +from .tfw_connector import TFWUplinkConnector, TFWConnector +from .event_handler_factory import EventHandlerFactory +from .signal_handling import setup_signal_handlers diff --git a/lib/tfw/main/event_handler_factory.py b/lib/tfw/main/event_handler_factory.py new file mode 100644 index 0000000..271be8c --- /dev/null +++ b/lib/tfw/main/event_handler_factory.py @@ -0,0 +1,8 @@ +from tfw.event_handlers import EventHandlerFactoryBase + +from .tfw_connector import TFWConnector + + +class EventHandlerFactory(EventHandlerFactoryBase): + def _build_server_connector(self): + return TFWConnector() diff --git a/lib/tfw/main/signal_handling.py b/lib/tfw/main/signal_handling.py new file mode 100644 index 0000000..a8750ef --- /dev/null +++ b/lib/tfw/main/signal_handling.py @@ -0,0 +1,11 @@ +from signal import signal, SIGTERM, SIGINT + +from tfw.event_handlers import EventHandler + + +def setup_signal_handlers(): + def stop(*_): + EventHandler.stop_all_instances() + exit(0) + signal(SIGTERM, stop) + signal(SIGINT, stop) diff --git a/lib/tfw/builtins/tfw_server_connector.py b/lib/tfw/main/tfw_connector.py similarity index 70% rename from lib/tfw/builtins/tfw_server_connector.py rename to lib/tfw/main/tfw_connector.py index 8b8b7c3..2e56996 100644 --- a/lib/tfw/builtins/tfw_server_connector.py +++ b/lib/tfw/main/tfw_connector.py @@ -1,4 +1,4 @@ -from tfw.networking import ServerUplinkConnector, ServerConnector +from tfw.networking import ServerConnector, ServerUplinkConnector from tfw.config import TFWENV @@ -12,12 +12,12 @@ class ConnAddrMixin: return f'tcp://localhost:{TFWENV.PUB_PORT}' -class TFWServerUplinkConnector(ServerUplinkConnector, ConnAddrMixin): +class TFWUplinkConnector(ServerUplinkConnector, ConnAddrMixin): def __init__(self): super().__init__(self.uplink_conn_addr) -class TFWServerConnector(ServerConnector, ConnAddrMixin): +class TFWConnector(ServerConnector, ConnAddrMixin): def __init__(self): super().__init__( self.downlink_conn_addr, diff --git a/lib/tfw/networking/server_connector.py b/lib/tfw/networking/server_connector.py index da407ac..0ab1644 100644 --- a/lib/tfw/networking/server_connector.py +++ b/lib/tfw/networking/server_connector.py @@ -1,5 +1,4 @@ import logging -from functools import partial import zmq from zmq.eventloop.zmqstream import ZMQStream @@ -12,17 +11,31 @@ LOG = logging.getLogger(__name__) class ServerDownlinkConnector: def __init__(self, connect_addr): + self.keys = [] + self._on_recv_callback = None self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB) self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_sub_socket.connect(connect_addr) self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) - self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE) - self.unsubscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.UNSUBSCRIBE) + def subscribe(self, *keys): + for key in keys: + self._zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, key) + self.keys.append(key) + + def unsubscribe(self, *keys): + for key in keys: + self._zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, key) + self.keys.remove(key) def register_callback(self, callback): - callback = with_deserialize_tfw_msg(callback) - self._zmq_sub_stream.on_recv(callback) + self._on_recv_callback = callback + self._zmq_sub_stream.on_recv(with_deserialize_tfw_msg(self._on_recv)) + + def _on_recv(self, message): + key = message['key'] + if key in self.keys or '' in self.keys: + self._on_recv_callback(message) def close(self): self._zmq_sub_stream.close()