from abc import abstractmethod from json import loads, dumps from subprocess import run, PIPE from functools import partial from tfw import EventHandlerBase from tfw.config.logs import logging from .pipe_io_server import PipeIOServer LOG = logging.getLogger(__name__) DEFAULT_PERMISSIONS = 0o600 class PipeIOEventHandlerBase(EventHandlerBase): def __init__(self, key, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS): super().__init__(key) self.pipe_io = CallbackPipeIOServer( in_pipe_path, out_pipe_path, self.handle_pipe_event, permissions ) self.pipe_io.start() @abstractmethod def handle_pipe_event(self, message_bytes): raise NotImplementedError() def cleanup(self): self.pipe_io.stop() class CallbackPipeIOServer(PipeIOServer): def __init__(self, in_pipe_path, out_pipe_path, callback, permissions): super().__init__(in_pipe_path, out_pipe_path, permissions) self.callback = callback def handle_message(self, message): self.callback(message) class PipeIOEventHandler(PipeIOEventHandlerBase): def handle_event(self, message): json_bytes = dumps(message).encode() self.pipe_io.send_message(json_bytes) def handle_pipe_event(self, message_bytes): json = loads(message_bytes) self.server_connector.send(json) class TransformerPipeIOEventHandler(PipeIOEventHandlerBase): # pylint: disable=too-many-arguments def __init__( self, key, in_pipe_path, out_pipe_path, transform_in_cmd, transform_out_cmd, permissions=DEFAULT_PERMISSIONS ): self._transform_in = partial(self._transform_message, transform_in_cmd) self._transform_out = partial(self._transform_message, transform_out_cmd) super().__init__(key, in_pipe_path, out_pipe_path, permissions) @staticmethod def _transform_message(transform_cmd, message): proc = run( transform_cmd, input=message, stdout=PIPE, stderr=PIPE, shell=True ) if proc.returncode == 0: return proc.stdout raise ValueError(f'Transforming message {message} failed!') def handle_event(self, message): json_bytes = dumps(message).encode() transformed_bytes = self._transform_out(json_bytes) if transformed_bytes: self.pipe_io.send_message(transformed_bytes) def handle_pipe_event(self, message_bytes): transformed_bytes = self._transform_in(message_bytes) if transformed_bytes: json_message = loads(transformed_bytes) self.server_connector.send(json_message)