from json import loads, dumps, JSONDecodeError from tfw import EventHandlerBase from tfw.config.logs import logging from .pipe_io_server import PipeIOServer LOG = logging.getLogger(__name__) class PipeIOEventHandler(EventHandlerBase): def __init__(self, key, in_pipe_path, out_pipe_path, permissions=0o600): super().__init__(key) self._pipe_io_server = JSONProxyPipeIOServer( in_pipe_path, out_pipe_path, self.server_connector.send, permissions ) self._pipe_io_server.start() def cleanup(self): self._pipe_io_server.stop() def handle_event(self, message): try: json_bytes = dumps(message).encode() self._pipe_io_server.send_message(json_bytes) except TypeError: LOG.error("Message %s not JSON serializable! Ignoring...", message) class JSONProxyPipeIOServer(PipeIOServer): def __init__(self, in_pipe_path, out_pipe_path, proxy_method, permissions): super().__init__(in_pipe_path, out_pipe_path, permissions) self.proxy = proxy_method def handle_message(self, message): try: json = loads(message) self.proxy(json) except JSONDecodeError: LOG.error("Invalid JSON received on %s! Ignoring...", self._in_pipe)