Adjust the whole framework to event handler dependency inversion

This commit is contained in:
Kristóf Tóth 2019-07-12 23:25:16 +02:00
parent 42beafcf04
commit de78f48930
26 changed files with 169 additions and 254 deletions

View File

@ -1,7 +1,5 @@
from .directory_snapshotting_event_handler import DirectorySnapshottingEventHandler from .directory_snapshotting_event_handler import DirectorySnapshottingEventHandler
from .event_handler import EventHandler
from .frontend_event_handler import FrontendEventHandler from .frontend_event_handler import FrontendEventHandler
from .fsm_aware_event_handler import FSMAwareEventHandler
from .fsm_managing_event_handler import FSMManagingEventHandler from .fsm_managing_event_handler import FSMManagingEventHandler
from .ide_event_handler import IdeEventHandler from .ide_event_handler import IdeEventHandler
from .log_monitoring_event_handler import LogMonitoringEventHandler from .log_monitoring_event_handler import LogMonitoringEventHandler
@ -10,4 +8,3 @@ from .pipe_io_event_handler import TransformerPipeIOEventHandler, CommandEventHa
from .process_managing_event_handler import ProcessManagingEventHandler from .process_managing_event_handler import ProcessManagingEventHandler
from .terminal_commands_event_handler import TerminalCommandsEventHandler from .terminal_commands_event_handler import TerminalCommandsEventHandler
from .terminal_event_handler import TerminalEventHandler from .terminal_event_handler import TerminalEventHandler
from .tfw_server_connector import TFWServerUplinkConnector, TFWServerConnector

View File

@ -10,14 +10,14 @@ from tfw.components.snapshot_provider import SnapshotProvider
from tfw.config import TFWENV from tfw.config import TFWENV
from tfw.networking import Scope from tfw.networking import Scope
from .event_handler import EventHandler
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class DirectorySnapshottingEventHandler(EventHandler): class DirectorySnapshottingEventHandler:
def __init__(self, key, directories, exclude_unix_patterns=None): keys = ['snapshot']
super().__init__(key, scope=Scope.WEBSOCKET)
def __init__(self, directories, exclude_unix_patterns=None):
self.snapshot_providers = {} self.snapshot_providers = {}
self._exclude_unix_patterns = exclude_unix_patterns self._exclude_unix_patterns = exclude_unix_patterns
self.init_snapshot_providers(directories) self.init_snapshot_providers(directories)
@ -46,11 +46,11 @@ class DirectorySnapshottingEventHandler(EventHandler):
makedirs(git_dir, exist_ok=True) makedirs(git_dir, exist_ok=True)
return git_dir return git_dir
def handle_event(self, message): def handle_event(self, message, server_connector):
try: try:
data = message['data'] data = message['data']
message['data'] = self.command_handlers[data['command']](data) message['data'] = self.command_handlers[data['command']](data)
self.send_message(message) server_connector.send_message(message, scope=Scope.WEBSOCKET)
except KeyError: except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -1,9 +0,0 @@
from tfw.event_handlers import EventHandlerBase
from .tfw_server_connector import TFWServerConnector
class EventHandler(EventHandlerBase):
# pylint: disable=abstract-method
def _build_server_connector(self):
return TFWServerConnector()

View File

@ -1,16 +1,19 @@
from tfw.networking import Scope from tfw.networking import Scope
from tfw.components import FrontendMessageStorage from tfw.components import FrontendMessageStorage
from .event_handler import EventHandler
class FrontendEventHandler:
keys = ['message', 'queueMessages', 'dashboard', 'console']
class FrontendEventHandler(EventHandler):
def __init__(self): def __init__(self):
frontend_keys = ('message', 'queueMessages', 'dashboard', 'console') self.server_connector = None
self._frontend_message_storage = FrontendMessageStorage(frontend_keys) self.keys = [*type(self).keys, 'recover']
super().__init__((*frontend_keys, 'recover'), scope=Scope.WEBSOCKET) self._frontend_message_storage = FrontendMessageStorage(type(self).keys)
def handle_event(self, message): def send_message(self, message):
self.server_connector.send_message(message, scope=Scope.WEBSOCKET)
def handle_event(self, message, _):
self._frontend_message_storage.save_message(message) self._frontend_message_storage.save_message(message)
if message['key'] == 'recover': if message['key'] == 'recover':
self.recover_frontend() self.recover_frontend()

View File

@ -1,20 +0,0 @@
from tfw.components import FSMAware
from tfw.networking import Scope
from .event_handler import EventHandler
class FSMAwareEventHandler(EventHandler, FSMAware):
# pylint: disable=abstract-method
"""
Abstract base class for EventHandlers which automatically
keep track of the state of the TFW FSM.
"""
def __init__(self, key, scope=Scope.ZMQ):
EventHandler.__init__(self, key, scope=scope)
FSMAware.__init__(self)
self.subscribe('fsm_update')
def dispatch_handling(self, message):
if not self.refresh_on_fsm_update(message):
super().dispatch_handling(message)

View File

@ -4,12 +4,12 @@ from tfw.crypto import KeyManager, sign_message, verify_message
from tfw.networking import Scope from tfw.networking import Scope
from tfw.components import FSMUpdater from tfw.components import FSMUpdater
from .event_handler import EventHandler
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class FSMManagingEventHandler(EventHandler): class FSMManagingEventHandler:
keys = ['fsm']
""" """
EventHandler responsible for managing the state machine of EventHandler responsible for managing the state machine of
the framework (TFW FSM). the framework (TFW FSM).
@ -24,8 +24,7 @@ class FSMManagingEventHandler(EventHandler):
An 'fsm_update' message is broadcasted after every successful An 'fsm_update' message is broadcasted after every successful
command. command.
""" """
def __init__(self, key, fsm_type, require_signature=False): def __init__(self, fsm_type, require_signature=False):
super().__init__(key, scope=Scope.WEBSOCKET)
self.fsm = fsm_type() self.fsm = fsm_type()
self._fsm_updater = FSMUpdater(self.fsm) self._fsm_updater = FSMUpdater(self.fsm)
self.auth_key = KeyManager().auth_key self.auth_key = KeyManager().auth_key
@ -36,15 +35,14 @@ class FSMManagingEventHandler(EventHandler):
'update': self.handle_update 'update': self.handle_update
} }
def handle_event(self, message): def handle_event(self, message, server_connector):
try: try:
message = self.command_handlers[message['data']['command']](message) message = self.command_handlers[message['data']['command']](message)
if message: if message:
fsm_update_message = self._fsm_updater.fsm_update fsm_update_message = self._fsm_updater.fsm_update
sign_message(self.auth_key, message) sign_message(self.auth_key, message)
sign_message(self.auth_key, fsm_update_message) sign_message(self.auth_key, fsm_update_message)
self.server_connector.send_message(fsm_update_message, Scope.BROADCAST) server_connector.send_message(fsm_update_message, Scope.BROADCAST)
self.send_message(message)
except KeyError: except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -4,7 +4,6 @@ from tfw.networking import Scope
from tfw.components import FileManager from tfw.components import FileManager
from tfw.components.inotify import InotifyObserver from tfw.components.inotify import InotifyObserver
from .event_handler import EventHandler
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -32,7 +31,8 @@ BUILD_ARTIFACTS = (
) )
class IdeEventHandler(EventHandler): class IdeEventHandler:
keys = ['ide']
# pylint: disable=too-many-arguments,anomalous-backslash-in-string # pylint: disable=too-many-arguments,anomalous-backslash-in-string
""" """
Event handler implementing the backend of our browser based IDE. Event handler implementing the backend of our browser based IDE.
@ -47,7 +47,7 @@ class IdeEventHandler(EventHandler):
The API of each command is documented in their respective handler. The API of each command is documented in their respective handler.
""" """
def __init__(self, key, directory, allowed_directories, selected_file=None, exclude=None): def __init__(self, directory, allowed_directories, selected_file=None, exclude=None):
""" """
:param key: the key this instance should listen to :param key: the key this instance should listen to
:param directory: working directory which the EventHandler should serve files from :param directory: working directory which the EventHandler should serve files from
@ -55,7 +55,7 @@ class IdeEventHandler(EventHandler):
:param selected_file: file that is selected by default :param selected_file: file that is selected by default
:param exclude: list of filenames that should not appear between files (for .o, .pyc, etc.) :param exclude: list of filenames that should not appear between files (for .o, .pyc, etc.)
""" """
super().__init__(key, scope=Scope.WEBSOCKET) self.server_connector = None
try: try:
self.filemanager = FileManager( self.filemanager = FileManager(
allowed_directories=allowed_directories, allowed_directories=allowed_directories,
@ -84,10 +84,13 @@ class IdeEventHandler(EventHandler):
} }
def _reload_frontend(self, event): # pylint: disable=unused-argument def _reload_frontend(self, event): # pylint: disable=unused-argument
self.server_connector.send_message({ self.send_message({
'key': 'ide', 'key': 'ide',
'data': {'command': 'reload'} 'data': {'command': 'reload'}
}, Scope.WEBSOCKET) })
def send_message(self, message):
self.server_connector.send_message(message, scope=Scope.WEBSOCKET)
def read(self, data): def read(self, data):
""" """
@ -179,7 +182,7 @@ class IdeEventHandler(EventHandler):
data['files'] = self.filemanager.files data['files'] = self.filemanager.files
data['directory'] = self.filemanager.workdir data['directory'] = self.filemanager.workdir
def handle_event(self, message): def handle_event(self, message, _):
try: try:
data = message['data'] data = message['data']
message['data'] = self.commands[data['command']](data) message['data'] = self.commands[data['command']](data)

View File

@ -1,15 +1,14 @@
import logging import logging
from tfw.config import TFWENV from tfw.config import TFWENV
from tfw.networking import Scope
from tfw.components import LogInotifyObserver from tfw.components import LogInotifyObserver
from .event_handler import EventHandler
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class LogMonitoringEventHandler(EventHandler): class LogMonitoringEventHandler:
keys = ['logmonitor']
""" """
Monitors the output of a supervisor process (stdout, stderr) and Monitors the output of a supervisor process (stdout, stderr) and
sends the results to the frontend. sends the results to the frontend.
@ -19,23 +18,27 @@ class LogMonitoringEventHandler(EventHandler):
The API of each command is documented in their respective handler. The API of each command is documented in their respective handler.
""" """
def __init__(self, key, process_name, log_tail=0): def __init__(self, process_name, log_tail=0):
super().__init__(key, scope=Scope.WEBSOCKET) self.server_connector = None
self.process_name = process_name self.process_name = process_name
self._monitor = LogInotifyObserver( self._initial_log_tail = log_tail
server_connector=self.server_connector, self._monitor = None
supervisor_uri=TFWENV.SUPERVISOR_HTTP_URI,
process_name=process_name,
log_tail=log_tail
)
self._monitor.start()
self.command_handlers = { self.command_handlers = {
'process_name': self.handle_process_name, 'process_name': self.handle_process_name,
'log_tail': self.handle_log_tail 'log_tail': self.handle_log_tail
} }
def handle_event(self, message): def start(self):
self._monitor = LogInotifyObserver(
server_connector=self.server_connector,
supervisor_uri=TFWENV.SUPERVISOR_HTTP_URI,
process_name=self.process_name,
log_tail=self._initial_log_tail
)
self._monitor.start()
def handle_event(self, message, _):
try: try:
data = message['data'] data = message['data']
self.command_handlers[data['command']](data) self.command_handlers[data['command']](data)

View File

@ -12,15 +12,16 @@ from contextlib import suppress
from tfw.components.pipe_io_server import PipeIOServer, terminate_process_on_failure from tfw.components.pipe_io_server import PipeIOServer, terminate_process_on_failure
from .event_handler import EventHandler
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
DEFAULT_PERMISSIONS = 0o600 DEFAULT_PERMISSIONS = 0o600
class PipeIOEventHandlerBase(EventHandler): class PipeIOEventHandlerBase:
def __init__(self, key, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS): keys = ['']
super().__init__(key)
def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS):
self.server_connector = None
self.pipe_io = CallbackPipeIOServer( self.pipe_io = CallbackPipeIOServer(
in_pipe_path, in_pipe_path,
out_pipe_path, out_pipe_path,
@ -50,25 +51,25 @@ class CallbackPipeIOServer(PipeIOServer):
class PipeIOEventHandler(PipeIOEventHandlerBase): class PipeIOEventHandler(PipeIOEventHandlerBase):
def handle_event(self, message): def handle_event(self, message, _):
json_bytes = dumps(message).encode() json_bytes = dumps(message).encode()
self.pipe_io.send_message(json_bytes) self.pipe_io.send_message(json_bytes)
def handle_pipe_event(self, message_bytes): def handle_pipe_event(self, message_bytes):
json = loads(message_bytes) json = loads(message_bytes)
self.send_message(json) self.server_connector.send_message(json)
class TransformerPipeIOEventHandler(PipeIOEventHandlerBase): class TransformerPipeIOEventHandler(PipeIOEventHandlerBase):
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
def __init__( def __init__(
self, key, in_pipe_path, out_pipe_path, self, in_pipe_path, out_pipe_path,
transform_in_cmd, transform_out_cmd, transform_in_cmd, transform_out_cmd,
permissions=DEFAULT_PERMISSIONS permissions=DEFAULT_PERMISSIONS
): ):
self._transform_in = partial(self._transform_message, transform_in_cmd) self._transform_in = partial(self._transform_message, transform_in_cmd)
self._transform_out = partial(self._transform_message, transform_out_cmd) self._transform_out = partial(self._transform_message, transform_out_cmd)
super().__init__(key, in_pipe_path, out_pipe_path, permissions) super().__init__(in_pipe_path, out_pipe_path, permissions)
@staticmethod @staticmethod
def _transform_message(transform_cmd, message): def _transform_message(transform_cmd, message):
@ -83,7 +84,7 @@ class TransformerPipeIOEventHandler(PipeIOEventHandlerBase):
return proc.stdout return proc.stdout
raise ValueError(f'Transforming message {message} failed!') raise ValueError(f'Transforming message {message} failed!')
def handle_event(self, message): def handle_event(self, message, _):
json_bytes = dumps(message).encode() json_bytes = dumps(message).encode()
transformed_bytes = self._transform_out(json_bytes) transformed_bytes = self._transform_out(json_bytes)
if transformed_bytes: if transformed_bytes:
@ -93,13 +94,12 @@ class TransformerPipeIOEventHandler(PipeIOEventHandlerBase):
transformed_bytes = self._transform_in(message_bytes) transformed_bytes = self._transform_in(message_bytes)
if transformed_bytes: if transformed_bytes:
json_message = loads(transformed_bytes) json_message = loads(transformed_bytes)
self.send_message(json_message) self.server_connector.send_message(json_message)
class CommandEventHandler(PipeIOEventHandler): class CommandEventHandler(PipeIOEventHandler):
def __init__(self, key, command, permissions=DEFAULT_PERMISSIONS): def __init__(self, command, permissions=DEFAULT_PERMISSIONS):
super().__init__( super().__init__(
key,
self._generate_tempfilename(), self._generate_tempfilename(),
self._generate_tempfilename(), self._generate_tempfilename(),
permissions permissions

View File

@ -5,12 +5,12 @@ from tfw.config import TFWENV
from tfw.networking import Scope from tfw.networking import Scope
from tfw.components import ProcessManager, LogManager from tfw.components import ProcessManager, LogManager
from .event_handler import EventHandler
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class ProcessManagingEventHandler(EventHandler, ProcessManager, LogManager): class ProcessManagingEventHandler(ProcessManager, LogManager):
keys = ['processmanager']
""" """
Event handler that can manage processes managed by supervisor. Event handler that can manage processes managed by supervisor.
@ -23,8 +23,7 @@ class ProcessManagingEventHandler(EventHandler, ProcessManager, LogManager):
Commands available: start, stop, restart, readlog Commands available: start, stop, restart, readlog
(the names are as self-documenting as it gets) (the names are as self-documenting as it gets)
""" """
def __init__(self, key, log_tail=0): def __init__(self, log_tail=0):
EventHandler.__init__(self, key, scope=Scope.WEBSOCKET)
ProcessManager.__init__(self, TFWENV.SUPERVISOR_HTTP_URI) ProcessManager.__init__(self, TFWENV.SUPERVISOR_HTTP_URI)
LogManager.__init__(self, TFWENV.SUPERVISOR_HTTP_URI) LogManager.__init__(self, TFWENV.SUPERVISOR_HTTP_URI)
self.log_tail = log_tail self.log_tail = log_tail
@ -34,7 +33,7 @@ class ProcessManagingEventHandler(EventHandler, ProcessManager, LogManager):
'restart': self.restart_process 'restart': self.restart_process
} }
def handle_event(self, message): def handle_event(self, message, server_connector):
try: try:
data = message['data'] data = message['data']
try: try:
@ -50,6 +49,6 @@ class ProcessManagingEventHandler(EventHandler, ProcessManager, LogManager):
data['process_name'], data['process_name'],
self.log_tail self.log_tail
) )
self.send_message(message) server_connector.send_message(message, scope=Scope.WEBSOCKET)
except KeyError: except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -1,14 +1,9 @@
from tfw.components import TerminalCommands from tfw.components import TerminalCommands
from tfw.networking import Scope
from .event_handler import EventHandler
class TerminalCommandsEventHandler(EventHandler, TerminalCommands): class TerminalCommandsEventHandler(TerminalCommands):
def __init__(self, key, scope=Scope.ZMQ, bashrc=None): keys = ['history.bash']
EventHandler.__init__(self, key, scope)
TerminalCommands.__init__(self, bashrc)
def handle_event(self, message): def handle_event(self, message, _):
command = message['value'] command = message['value']
self.callback(command) self.callback(command)

View File

@ -1,16 +1,15 @@
import logging import logging
from tfw.networking import Scope
from tfw.components import BashMonitor, TerminadoMiniServer from tfw.components import BashMonitor, TerminadoMiniServer
from tfw.config import TFWENV from tfw.config import TFWENV
from tao.config import TAOENV from tao.config import TAOENV
from .event_handler import EventHandler
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class TerminalEventHandler(EventHandler): class TerminalEventHandler:
keys = ['shell']
""" """
Event handler responsible for managing terminal sessions for frontend xterm Event handler responsible for managing terminal sessions for frontend xterm
sessions to connect to. You need to instanciate this in order for frontend sessions to connect to. You need to instanciate this in order for frontend
@ -20,13 +19,13 @@ class TerminalEventHandler(EventHandler):
a command to be executed. a command to be executed.
The API of each command is documented in their respective handler. The API of each command is documented in their respective handler.
""" """
def __init__(self, key): def __init__(self):
""" """
:param key: key this EventHandler listens to :param key: key this EventHandler listens to
:param monitor: tfw.components.HistoryMonitor instance to read command history from :param monitor: tfw.components.HistoryMonitor instance to read command history from
""" """
super().__init__(key, scope=Scope.WEBSOCKET) self.server_connector = None
self._historymonitor = BashMonitor(self.server_connector, TFWENV.HISTFILE) self._historymonitor = None
bash_as_user_cmd = ['sudo', '-u', TAOENV.USER, 'bash'] bash_as_user_cmd = ['sudo', '-u', TAOENV.USER, 'bash']
self.terminado_server = TerminadoMiniServer( self.terminado_server = TerminadoMiniServer(
@ -41,18 +40,20 @@ class TerminalEventHandler(EventHandler):
'read': self.read 'read': self.read
} }
self._historymonitor.start()
self.terminado_server.listen() self.terminado_server.listen()
def start(self):
self._historymonitor = BashMonitor(self.server_connector, TFWENV.HISTFILE)
self._historymonitor.start()
@property @property
def historymonitor(self): def historymonitor(self):
return self._historymonitor return self._historymonitor
def handle_event(self, message): def handle_event(self, message, _):
try: try:
data = message['data'] data = message['data']
message['data'] = self.commands[data['command']](data) message['data'] = self.commands[data['command']](data)
self.send_message(message)
except KeyError: except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -1,6 +1,5 @@
from .commands_equal import CommandsEqual from .commands_equal import CommandsEqual
from .file_manager import FileManager from .file_manager import FileManager
from .fsm_aware import FSMAware
from .fsm_updater import FSMUpdater from .fsm_updater import FSMUpdater
from .history_monitor import BashMonitor, GDBMonitor from .history_monitor import BashMonitor, GDBMonitor
from .log_inotify_observer import LogInotifyObserver from .log_inotify_observer import LogInotifyObserver

View File

@ -6,7 +6,7 @@ class FSMUpdater:
def fsm_update(self): def fsm_update(self):
return { return {
'key': 'fsm_update', 'key': 'fsm_update',
'data': self.fsm_update_data **self.fsm_update_data
} }
@property @property

View File

@ -26,7 +26,7 @@ class TerminalCommands(ABC):
You can also use this class to create new commands similarly. You can also use this class to create new commands similarly.
""" """
def __init__(self, bashrc=None): def __init__(self, bashrc):
self._command_method_regex = r'^command_(.+)$' self._command_method_regex = r'^command_(.+)$'
self.command_implemetations = self._build_command_to_implementation_dict() self.command_implemetations = self._build_command_to_implementation_dict()
if bashrc is not None: if bashrc is not None:

View File

@ -1 +1,3 @@
from .event_handler_base import EventHandlerBase from .event_handler_factory import EventHandlerFactoryBase
from .event_handler import EventHandler
from .fsm_aware_event_handler import FSMAwareEventHandler

View File

@ -1,117 +0,0 @@
import logging
from abc import ABC, abstractmethod
from typing import Iterable
from tfw.networking import Scope
LOG = logging.getLogger(__name__)
class EventHandlerBase(ABC):
"""
Abstract base class for all Python based EventHandlers. Useful implementation template
for other languages.
Derived classes must implement the handle_event() method
"""
_instances = set()
def __init__(self, key, scope=Scope.ZMQ):
type(self)._instances.add(self)
self.server_connector = self._build_server_connector()
self.scope = scope
self.keys = []
if isinstance(key, str):
self.keys.append(key)
elif isinstance(key, Iterable):
self.keys = list(key)
self.subscribe(*self.keys)
self.server_connector.register_callback(self.event_handler_callback)
@abstractmethod
def _build_server_connector(self):
raise NotImplementedError()
def subscribe(self, *keys):
"""
Subscribe this EventHandler to receive events for given keys.
Note that you can subscribe to the same key several times in which
case you will need to unsubscribe multiple times in order to stop
receiving events.
:param keys: list of keys to subscribe to
"""
for key in keys:
self.server_connector.subscribe(key)
self.keys.append(key)
def event_handler_callback(self, message):
"""
Callback that is invoked when receiving a message.
Dispatches messages to handler methods and sends
a response back in case the handler returned something.
This is subscribed in __init__().
"""
if self.check_key(message):
self.dispatch_handling(message)
def check_key(self, message):
"""
Checks whether the message is intended for this
EventHandler.
This is necessary because ZMQ handles PUB - SUB
connetions with pattern matching (e.g. someone
subscribed to 'fsm' will receive 'fsm_update'
messages as well.
"""
if '' in self.keys:
return True
return message['key'] in self.keys
def dispatch_handling(self, message):
"""
Used to dispatch messages to their specific handlers.
:param message: the message received
:returns: the message to send back
"""
self.handle_event(message)
def handle_event(self, message):
"""
Abstract method that implements the handling of messages.
:param message: the message received
:returns: the message to send back
"""
raise NotImplementedError()
def send_message(self, message):
self.server_connector.send_message(message, self.scope)
def unsubscribe(self, *keys):
"""
Unsubscribe this eventhandler from the given keys.
:param keys: list of keys to unsubscribe from
"""
for key in keys:
self.server_connector.unsubscribe(key)
self.keys.remove(key)
@classmethod
def stop_all_instances(cls):
for instance in cls._instances:
instance.stop()
def stop(self):
self.server_connector.close()
self.cleanup()
def cleanup(self):
"""
Perform cleanup actions such as releasing database
connections and stuff like that.
"""

View File

@ -24,13 +24,21 @@ class EventHandlerBuilder:
self._event_handler_type = event_handler_type self._event_handler_type = event_handler_type
def build(self, server_connector): def build(self, server_connector):
server_connector.subscribe(*self._analyzer.keys)
event_handler = self._event_handler_type(server_connector) event_handler = self._event_handler_type(server_connector)
server_connector.subscribe(*self._try_get_keys(event_handler))
event_handler.handle_event = self._analyzer.handle_event event_handler.handle_event = self._analyzer.handle_event
with suppress(AttributeError): with suppress(AttributeError):
event_handler.cleanup = self._analyzer.cleanup event_handler.cleanup = self._analyzer.cleanup
return event_handler return event_handler
def _try_get_keys(self, event_handler):
try:
return self._analyzer.keys
except ValueError:
with suppress(AttributeError):
return event_handler.keys
raise
class EventHandlerAnalyzer: class EventHandlerAnalyzer:
def __init__(self, event_handler, supplied_keys): def __init__(self, event_handler, supplied_keys):

View File

@ -6,6 +6,7 @@ LOG = logging.getLogger(__name__)
class FSMAware: class FSMAware:
keys = ['fsm_update']
""" """
Base class for stuff that has to be aware of the framework FSM. Base class for stuff that has to be aware of the framework FSM.
This is done by processing 'fsm_update' messages. This is done by processing 'fsm_update' messages.
@ -16,27 +17,21 @@ class FSMAware:
self.fsm_event_log = [] self.fsm_event_log = []
self._auth_key = KeyManager().auth_key self._auth_key = KeyManager().auth_key
def refresh_on_fsm_update(self, message): def process_message(self, message):
if message['key'] == 'fsm_update' and verify_message(self._auth_key, message): if message['key'] == 'fsm_update':
self._handle_fsm_update(message) if verify_message(self._auth_key, message):
return True self._handle_fsm_update(message)
return False
def _handle_fsm_update(self, message): def _handle_fsm_update(self, message):
try: try:
update_data = message['data'] new_state = message['current_state']
new_state = update_data['current_state']
if self.fsm_state != new_state: if self.fsm_state != new_state:
self.handle_fsm_step(**update_data) self.handle_fsm_step(message)
self.fsm_state = new_state self.fsm_state = new_state
self.fsm_in_accepted_state = update_data['in_accepted_state'] self.fsm_in_accepted_state = message['in_accepted_state']
self.fsm_event_log.append(update_data) self.fsm_event_log.append(message)
except KeyError: except KeyError:
LOG.error('Invalid fsm_update message received!') LOG.error('Invalid fsm_update message received!')
def handle_fsm_step(self, **kwargs): def handle_fsm_step(self, message):
""" pass
Called in case the TFW FSM has stepped.
:param kwargs: fsm_update 'data' field
"""

View File

@ -0,0 +1,19 @@
from .event_handler import EventHandler
from .fsm_aware import FSMAware
class FSMAwareEventHandler(EventHandler, FSMAware):
# pylint: disable=abstract-method
"""
Abstract base class for EventHandlers which automatically
keep track of the state of the TFW FSM.
"""
def __init__(self, server_connector):
EventHandler.__init__(self, server_connector)
FSMAware.__init__(self)
def _event_callback(self, message):
self.process_message(message)
def handle_fsm_step(self, message):
self.handle_event(message, self.server_connector)

View File

@ -174,13 +174,17 @@ def test_build_raises_if_no_key(test_keys):
with pytest.raises(ValueError): with pytest.raises(ValueError):
MockEventHandlerFactory().build(eh) MockEventHandlerFactory().build(eh)
def test_handle_event(*_): def handle_event(*_):
pass pass
with pytest.raises(ValueError): with pytest.raises(ValueError):
MockEventHandlerFactory().build(test_handle_event) MockEventHandlerFactory().build(handle_event)
with pytest.raises(ValueError): with pytest.raises(ValueError):
MockEventHandlerFactory().build(lambda msg, sc: None) MockEventHandlerFactory().build(lambda msg, sc: None)
WithKeysEventHandler = EventHandler
WithKeysEventHandler.keys = test_keys
MockEventHandlerFactory().build(eh, event_handler_type=WithKeysEventHandler)
eh.keys = test_keys eh.keys = test_keys
MockEventHandlerFactory().build(eh) MockEventHandlerFactory().build(eh)

3
lib/tfw/main/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .tfw_connector import TFWUplinkConnector, TFWConnector
from .event_handler_factory import EventHandlerFactory
from .signal_handling import setup_signal_handlers

View File

@ -0,0 +1,8 @@
from tfw.event_handlers import EventHandlerFactoryBase
from .tfw_connector import TFWConnector
class EventHandlerFactory(EventHandlerFactoryBase):
def _build_server_connector(self):
return TFWConnector()

View File

@ -0,0 +1,11 @@
from signal import signal, SIGTERM, SIGINT
from tfw.event_handlers import EventHandler
def setup_signal_handlers():
def stop(*_):
EventHandler.stop_all_instances()
exit(0)
signal(SIGTERM, stop)
signal(SIGINT, stop)

View File

@ -1,4 +1,4 @@
from tfw.networking import ServerUplinkConnector, ServerConnector from tfw.networking import ServerConnector, ServerUplinkConnector
from tfw.config import TFWENV from tfw.config import TFWENV
@ -12,12 +12,12 @@ class ConnAddrMixin:
return f'tcp://localhost:{TFWENV.PUB_PORT}' return f'tcp://localhost:{TFWENV.PUB_PORT}'
class TFWServerUplinkConnector(ServerUplinkConnector, ConnAddrMixin): class TFWUplinkConnector(ServerUplinkConnector, ConnAddrMixin):
def __init__(self): def __init__(self):
super().__init__(self.uplink_conn_addr) super().__init__(self.uplink_conn_addr)
class TFWServerConnector(ServerConnector, ConnAddrMixin): class TFWConnector(ServerConnector, ConnAddrMixin):
def __init__(self): def __init__(self):
super().__init__( super().__init__(
self.downlink_conn_addr, self.downlink_conn_addr,

View File

@ -1,5 +1,4 @@
import logging import logging
from functools import partial
import zmq import zmq
from zmq.eventloop.zmqstream import ZMQStream from zmq.eventloop.zmqstream import ZMQStream
@ -12,17 +11,31 @@ LOG = logging.getLogger(__name__)
class ServerDownlinkConnector: class ServerDownlinkConnector:
def __init__(self, connect_addr): def __init__(self, connect_addr):
self.keys = []
self._on_recv_callback = None
self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB) self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB)
self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0)
self._zmq_sub_socket.connect(connect_addr) self._zmq_sub_socket.connect(connect_addr)
self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket)
self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE) def subscribe(self, *keys):
self.unsubscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.UNSUBSCRIBE) for key in keys:
self._zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, key)
self.keys.append(key)
def unsubscribe(self, *keys):
for key in keys:
self._zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, key)
self.keys.remove(key)
def register_callback(self, callback): def register_callback(self, callback):
callback = with_deserialize_tfw_msg(callback) self._on_recv_callback = callback
self._zmq_sub_stream.on_recv(callback) self._zmq_sub_stream.on_recv(with_deserialize_tfw_msg(self._on_recv))
def _on_recv(self, message):
key = message['key']
if key in self.keys or '' in self.keys:
self._on_recv_callback(message)
def close(self): def close(self):
self._zmq_sub_stream.close() self._zmq_sub_stream.close()