diff --git a/lib/tfw/components/__init__.py b/lib/tfw/components/__init__.py index fd93d94..7aabc38 100644 --- a/lib/tfw/components/__init__.py +++ b/lib/tfw/components/__init__.py @@ -11,3 +11,4 @@ from .log_monitoring_event_handler import LogMonitoringEventHandler from .fsm_managing_event_handler import FSMManagingEventHandler from .snapshot_provider import SnapshotProvider from .pipe_io_event_handler import PipeIOEventHandlerBase, PipeIOEventHandler, PipeIOServer +from .pipe_io_event_handler import TransformerPipeIOEventHandler diff --git a/lib/tfw/components/pipe_io_event_handler.py b/lib/tfw/components/pipe_io_event_handler.py index ef162c9..3bdb38a 100644 --- a/lib/tfw/components/pipe_io_event_handler.py +++ b/lib/tfw/components/pipe_io_event_handler.py @@ -1,5 +1,6 @@ from abc import abstractmethod -from json import loads, dumps, JSONDecodeError +from json import loads, dumps +from subprocess import run, PIPE from tfw import EventHandlerBase from tfw.config.logs import logging @@ -7,10 +8,11 @@ from tfw.config.logs import logging from .pipe_io_server import PipeIOServer LOG = logging.getLogger(__name__) +DEFAULT_PERMISSIONS = 0o600 class PipeIOEventHandlerBase(EventHandlerBase): - def __init__(self, key, in_pipe_path, out_pipe_path, permissions=0o600): + def __init__(self, key, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS): super().__init__(key) self.pipe_io = CallbackPipeIOServer( in_pipe_path, @@ -45,3 +47,45 @@ class PipeIOEventHandler(PipeIOEventHandlerBase): def handle_pipe_event(self, message_bytes): json = loads(message_bytes) self.server_connector.send(json) + + +class TransformerPipeIOEventHandler(PipeIOEventHandlerBase): + # pylint: disable=too-many-arguments + def __init__( + self, key, in_pipe_path, out_pipe_path, + serialize_cmd, deserialize_cmd, + permissions=DEFAULT_PERMISSIONS + ): + self._serialize_cmd, self._deserialize_cmd = serialize_cmd, deserialize_cmd + super().__init__(key, in_pipe_path, out_pipe_path, permissions) + + def _serialize_message(self, message): + return self._transform_message(self._serialize_cmd, message) + + def _deserialize_message(self, message): + return self._transform_message(self._deserialize_cmd, message) + + @staticmethod + def _transform_message(transform_cmd, message): + proc = run( + transform_cmd, + input=message, + stdout=PIPE, + stderr=PIPE, + shell=True + ) + if proc.returncode == 0: + return proc.stdout + raise ValueError(f'Transforming message {message} failed!') + + def handle_event(self, message): + json_bytes = dumps(message).encode() + transformed_bytes = self._serialize_message(json_bytes) + if transformed_bytes: + self.pipe_io.send_message(transformed_bytes) + + def handle_pipe_event(self, message_bytes): + transformed_bytes = self._deserialize_message(message_bytes) + if transformed_bytes: + json_message = loads(transformed_bytes) + self.server_connector.send(json_message)