diff --git a/lib/tfw/components/__init__.py b/lib/tfw/components/__init__.py index 5251a4e..2c36b84 100644 --- a/lib/tfw/components/__init__.py +++ b/lib/tfw/components/__init__.py @@ -10,4 +10,4 @@ from .terminal_commands import TerminalCommands from .log_monitoring_event_handler import LogMonitoringEventHandler from .fsm_managing_event_handler import FSMManagingEventHandler from .snapshot_provider import SnapshotProvider -from .pipe_io_event_handler import PipeIOEventHandler +from .pipe_io_event_handler import PipeIOEventHandlerBase, PipeIOEventHandler diff --git a/lib/tfw/components/pipe_io_event_handler.py b/lib/tfw/components/pipe_io_event_handler.py index 7604787..c681567 100644 --- a/lib/tfw/components/pipe_io_event_handler.py +++ b/lib/tfw/components/pipe_io_event_handler.py @@ -1,3 +1,4 @@ +from abc import abstractmethod from json import loads, dumps, JSONDecodeError from tfw import EventHandlerBase @@ -8,37 +9,45 @@ from .pipe_io_server import PipeIOServer LOG = logging.getLogger(__name__) -class PipeIOEventHandler(EventHandlerBase): +class PipeIOEventHandlerBase(EventHandlerBase): def __init__(self, key, in_pipe_path, out_pipe_path, permissions=0o600): super().__init__(key) - self._pipe_io_server = JSONProxyPipeIOServer( + self.pipe_io = CallbackPipeIOServer( in_pipe_path, out_pipe_path, - self.server_connector.send, + self.handle_pipe_event, permissions ) - self._pipe_io_server.start() + self.pipe_io.start() + + @abstractmethod + def handle_pipe_event(self, message): + raise NotImplementedError() def cleanup(self): - self._pipe_io_server.stop() + self.pipe_io.stop() + +class CallbackPipeIOServer(PipeIOServer): + def __init__(self, in_pipe_path, out_pipe_path, callback, permissions): + super().__init__(in_pipe_path, out_pipe_path, permissions) + self.callback = callback + + def handle_message(self, message): + self.callback(message) + + +class PipeIOEventHandler(PipeIOEventHandlerBase): def handle_event(self, message): try: json_bytes = dumps(message).encode() - self._pipe_io_server.send_message(json_bytes) + self.pipe_io.send_message(json_bytes) except TypeError: LOG.error("Message %s not JSON serializable! Ignoring...", message) - - -class JSONProxyPipeIOServer(PipeIOServer): - def __init__(self, in_pipe_path, out_pipe_path, proxy_method, permissions): - super().__init__(in_pipe_path, out_pipe_path, permissions) - self.proxy = proxy_method - - def handle_message(self, message): + def handle_pipe_event(self, message): try: json = loads(message) - self.proxy(json) + self.server_connector.send(json) except JSONDecodeError: - LOG.error("Invalid JSON received on %s! Ignoring...", self._in_pipe) + LOG.error("Invalid JSON received on %s! Ignoring...", self.pipe_io.in_pipe)