from json import loads, dumps, JSONDecodeError from tfw import EventHandlerBase from tfw.config.logs import logging from .pipe_io_server import PipeIOServer LOG = logging.getLogger(__name__) class PipeIOEventHandler(EventHandlerBase): def __init__(self, key, in_pipe_path, out_pipe_path): super().__init__(key) self._pipe_io_server = JSONProxyPipeIOServer(in_pipe_path, out_pipe_path, self.server_connector.send) self._pipe_io_server.start() def cleanup(self): self._pipe_io_server.stop() def handle_event(self, message): json_bytes = dumps(message).encode() self._pipe_io_server.send_message(json_bytes) class JSONProxyPipeIOServer(PipeIOServer): def __init__(self, in_pipe_path, out_pipe_path, proxy_method): super().__init__(in_pipe_path, out_pipe_path) self.proxy = proxy_method def handle_message(self, message): try: json = loads(message) self.proxy(json) except JSONDecodeError: LOG.debug("Invalid JSON received on %s! Ignoring...", self._in_pipe)