diff --git a/lib/tfw/components/pipe_io_event_handler.py b/lib/tfw/components/pipe_io_event_handler.py new file mode 100644 index 0000000..df3d012 --- /dev/null +++ b/lib/tfw/components/pipe_io_event_handler.py @@ -0,0 +1,35 @@ +from json import loads, dumps, JSONDecodeError + +from tfw import EventHandlerBase +from tfw.config.logs import logging + +from .pipe_io_server import PipeIOServer + +LOG = logging.getLogger(__name__) + + +class PipeIOEventHandler(EventHandlerBase): + def __init__(self, key, in_pipe_path, out_pipe_path): + super().__init__(key) + self._pipe_io_server = JSONProxyPipeIOServer(in_pipe_path, out_pipe_path, self.server_connector.send) + self._pipe_io_server.start() + + def cleanup(self): + self._pipe_io_server.stop() + + def handle_event(self, message): + json_bytes = dumps(message).encode() + self._pipe_io_server.send_message(json_bytes) + + +class JSONProxyPipeIOServer(PipeIOServer): + def __init__(self, in_pipe_path, out_pipe_path, proxy_method): + super().__init__(in_pipe_path, out_pipe_path) + self.proxy = proxy_method + + def handle_message(self, message): + try: + json = loads(message) + self.proxy(json) + except JSONDecodeError: + LOG.debug("Invalid JSON received on %s! Ignoring...", self._in_pipe)