from abc import abstractmethod from json import loads, dumps, JSONDecodeError from tfw import EventHandlerBase from tfw.config.logs import logging from .pipe_io_server import PipeIOServer LOG = logging.getLogger(__name__) class PipeIOEventHandlerBase(EventHandlerBase): def __init__(self, key, in_pipe_path, out_pipe_path, permissions=0o600): super().__init__(key) self.pipe_io = CallbackPipeIOServer( in_pipe_path, out_pipe_path, self.handle_pipe_event, permissions ) self.pipe_io.start() @abstractmethod def handle_pipe_event(self, message_bytes): raise NotImplementedError() def cleanup(self): self.pipe_io.stop() class CallbackPipeIOServer(PipeIOServer): def __init__(self, in_pipe_path, out_pipe_path, callback, permissions): super().__init__(in_pipe_path, out_pipe_path, permissions) self.callback = callback def handle_message(self, message): self.callback(message) class PipeIOEventHandler(PipeIOEventHandlerBase): def handle_event(self, message): json_bytes = dumps(message).encode() self.pipe_io.send_message(json_bytes) def handle_pipe_event(self, message_bytes): json = loads(message_bytes) self.server_connector.send(json)