from abc import abstractmethod from json import loads, dumps from subprocess import run, PIPE from tfw import EventHandlerBase from tfw.config.logs import logging from .pipe_io_server import PipeIOServer LOG = logging.getLogger(__name__) DEFAULT_PERMISSIONS = 0o600 class PipeIOEventHandlerBase(EventHandlerBase): def __init__(self, key, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS): super().__init__(key) self.pipe_io = CallbackPipeIOServer( in_pipe_path, out_pipe_path, self.handle_pipe_event, permissions ) self.pipe_io.start() @abstractmethod def handle_pipe_event(self, message_bytes): raise NotImplementedError() def cleanup(self): self.pipe_io.stop() class CallbackPipeIOServer(PipeIOServer): def __init__(self, in_pipe_path, out_pipe_path, callback, permissions): super().__init__(in_pipe_path, out_pipe_path, permissions) self.callback = callback def handle_message(self, message): self.callback(message) class PipeIOEventHandler(PipeIOEventHandlerBase): def handle_event(self, message): json_bytes = dumps(message).encode() self.pipe_io.send_message(json_bytes) def handle_pipe_event(self, message_bytes): json = loads(message_bytes) self.server_connector.send(json) class TransformerPipeIOEventHandler(PipeIOEventHandlerBase): # pylint: disable=too-many-arguments def __init__( self, key, in_pipe_path, out_pipe_path, serialize_cmd, deserialize_cmd, permissions=DEFAULT_PERMISSIONS ): self._serialize_cmd, self._deserialize_cmd = serialize_cmd, deserialize_cmd super().__init__(key, in_pipe_path, out_pipe_path, permissions) def _serialize_message(self, message): return self._transform_message(self._serialize_cmd, message) def _deserialize_message(self, message): return self._transform_message(self._deserialize_cmd, message) @staticmethod def _transform_message(transform_cmd, message): proc = run( transform_cmd, input=message, stdout=PIPE, stderr=PIPE, shell=True ) if proc.returncode == 0: return proc.stdout raise ValueError(f'Transforming message {message} failed!') def handle_event(self, message): json_bytes = dumps(message).encode() transformed_bytes = self._serialize_message(json_bytes) if transformed_bytes: self.pipe_io.send_message(transformed_bytes) def handle_pipe_event(self, message_bytes): transformed_bytes = self._deserialize_message(message_bytes) if transformed_bytes: json_message = loads(transformed_bytes) self.server_connector.send(json_message)