diff --git a/tfw/components/pipe_connector/__init__.py b/tfw/components/pipe_connector/__init__.py new file mode 100644 index 0000000..07f335a --- /dev/null +++ b/tfw/components/pipe_connector/__init__.py @@ -0,0 +1 @@ +from .proxy_pipe_connector_handler import ProxyPipeConnectorHandler diff --git a/tfw/components/pipe_connector/pipe_connector.py b/tfw/components/pipe_connector/pipe_connector.py new file mode 100644 index 0000000..d1500c2 --- /dev/null +++ b/tfw/components/pipe_connector/pipe_connector.py @@ -0,0 +1,69 @@ +from stat import S_ISFIFO +from os import access, mkdir, stat, R_OK +from os.path import exists + +from pipe_io_server import PipeReaderServer, PipeWriterServer + +from tfw.internals.inotify import InotifyObserver, InotifyFileCreatedEvent, InotifyFileDeletedEvent + + +class PipeConnector: + def __init__(self, path): + self.recv_pipes, self.send_pipes = {}, {} + self.observer = self.build_observer(path) + self.observer.on_any_event = self._on_any_event + self.observer.start() + + def build_observer(self, path): # pylint: disable=no-self-use + if not exists(path): + mkdir(path) + if not access(path, R_OK): + raise ValueError('Path does not exist or is not accessible.') + observer = InotifyObserver(path, patterns=['*send*', '*recv*']) + return observer + + def _on_any_event(self, event): + path = event.src_path + if isinstance(event, InotifyFileCreatedEvent) and self._is_pipe(path): + self._create_pipe(path) + elif isinstance(event, InotifyFileDeletedEvent): + self._delete_pipe(path) + + @staticmethod + def _is_pipe(path): + return exists(path) and S_ISFIFO(stat(path).st_mode) + + def _create_pipe(self, path): + if self._find_pipe(path): + return + server = None + if 'recv' in path: + pipes, server = self.recv_pipes, self.build_writer(path) + elif 'send' in path: + pipes, server = self.send_pipes, self.build_reader(path) + if server: + server.start() + pipes[path] = server + + def _find_pipe(self, path): + if path in self.recv_pipes.keys(): + return self.recv_pipes + if path in self.send_pipes.keys(): + return self.send_pipes + return None + + def build_reader(self, path): # pylint: disable=no-self-use + return PipeReaderServer(path, manager_pipes=False) + + def build_writer(self, path): # pylint: disable=no-self-use + return PipeWriterServer(path, manage_pipes=False) + + def _delete_pipe(self, path): + pipes = self._find_pipe(path) + if pipes: + pipes[path].stop() + del pipes[path] + + def broadcast(self, message): + for _, server in self.recv_pipes.items(): + server.send_message(message) diff --git a/tfw/components/pipe_connector/proxy_pipe_connector_handler.py b/tfw/components/pipe_connector/proxy_pipe_connector_handler.py new file mode 100644 index 0000000..b3b20f7 --- /dev/null +++ b/tfw/components/pipe_connector/proxy_pipe_connector_handler.py @@ -0,0 +1,43 @@ +import logging +from threading import Lock +from json import dumps, loads, JSONDecodeError + +from pipe_io_server import PipeReaderServer + +from .pipe_connector import PipeConnector + +LOG = logging.getLogger(__name__) + + +class ProxyPipeConnectorHandler: + keys = [''] + + def __init__(self, path): + self.connector, self.pipes = None, None + self.path = path + + def start(self): + self.pipes = ProxyPipeConnector(self.path, self.connector) + + def handle_event(self, message, _): + self.pipes.broadcast(dumps(message).encode()) + + +class ProxyPipeConnector(PipeConnector): + def __init__(self, path, connector): + self.connector = connector + self.mutex = Lock() + super().__init__(path) + + def build_reader(self, path): + reader = PipeReaderServer(path, manage_pipes=False) + reader.handle_message = self._handle_message + return reader + + def _handle_message(self, message): + try: + json_object = loads(message) + with self.mutex: + self.connector.send_message(json_object) + except JSONDecodeError: + LOG.error('Received invalid JSON message: %s', message) diff --git a/tfw/components/pipe_connector/test_pipe_connector.py b/tfw/components/pipe_connector/test_pipe_connector.py new file mode 100644 index 0000000..bdf8a04 --- /dev/null +++ b/tfw/components/pipe_connector/test_pipe_connector.py @@ -0,0 +1,144 @@ +# pylint: disable=redefined-outer-name +from enum import Enum +from dataclasses import dataclass +from json import dumps +from secrets import token_urlsafe +from os import urandom, mkfifo, mkdir, mknod +from os.path import join +from tempfile import TemporaryDirectory + +from tfw.internals.inotify import InotifyFileCreatedEvent, InotifyFileDeletedEvent + +import pytest + +from .proxy_pipe_connector_handler import ProxyPipeConnector + + +class Action(Enum): + SEND = 'send' + RECV = 'recv' + + +@dataclass +class PipeContext: + workdir: str + pipes: ProxyPipeConnector + + def emit_pipe_creation_event(self, action, inode_creator): + filename = self.join(f'{self.generate_name()}_{action.value}') + inode_creator(filename) + self.pipes.observer.on_any_event(InotifyFileCreatedEvent(filename)) + return filename + + def join(self, path): + return join(self.workdir, path) + + @staticmethod + def generate_name(): + return urandom(4).hex() + + +class MockPipeConnector(ProxyPipeConnector): + def __init__(self, path, connector): + self.reader_events, self.writer_events = [], [] + super().__init__(path, connector) + + def build_observer(self, path): + return MockObserver() + + def build_reader(self, path): + self.reader_events.append(path) + reader = MockPipeServer() + reader.handle_message = self._handle_message + return reader + + def build_writer(self, path): + self.writer_events.append(path) + return MockPipeServer() + + +class MockObserver: + def start(self): + pass + + def on_any_event(self, event): + pass + + +class MockPipeServer: + def __init__(self): + self.messages = [] + + def handle_message(self, message): + pass + + def send_message(self, message): + self.messages.append(message) + + def start(self): + pass + + def stop(self): + pass + + +class MockConnector: # pylint: disable=too-few-public-methods + def __init__(self): + self.messages = [] + + def send_message(self, message): + self.messages.append(message) + + +@pytest.fixture +def context(): + with TemporaryDirectory() as workdir: + yield PipeContext(workdir, MockPipeConnector(workdir, MockConnector())) + + +def test_pipe_creation_deletion(context): + cases = [ + (Action.RECV, context.pipes.recv_pipes, context.pipes.writer_events), + (Action.SEND, context.pipes.send_pipes, context.pipes.reader_events) + ] + + for action, pipes, events in cases: + path = context.emit_pipe_creation_event(action, mkfifo) + assert events[-1] == path + assert path in pipes.keys() + context.pipes.observer.on_any_event(InotifyFileDeletedEvent(path)) + assert path not in pipes.keys() + + +def test_handle_message(context): + path = context.emit_pipe_creation_event(Action.SEND, mkfifo) + payload = {'key': token_urlsafe(16)} + context.pipes.send_pipes[path].handle_message(dumps(payload)) + assert context.pipes.connector.messages[-1] == payload + context.pipes.send_pipes[path].handle_message(token_urlsafe(32)) + assert len(context.pipes.connector.messages) == 1 + + +def test_broadcast(context): + paths = [ + context.emit_pipe_creation_event(Action.RECV, mkfifo) + for _ in range(32) + ] + payload = {'key': token_urlsafe(16)} + + context.pipes.broadcast(payload) + for path in paths: + assert context.pipes.recv_pipes[path].messages[-1] == payload + + +def test_inode_types(context): + cases = [ + (Action.RECV, context.pipes.recv_pipes, mkdir), + (Action.SEND, context.pipes.send_pipes, mkdir), + (Action.RECV, context.pipes.recv_pipes, mknod), + (Action.SEND, context.pipes.send_pipes, mknod) + ] + + for action, pipes, creator in cases: + path = context.emit_pipe_creation_event(action, creator) + assert path not in pipes.keys() diff --git a/tfw/components/process_management/log_inotify_observer.py b/tfw/components/process_management/log_inotify_observer.py index 97b6767..1c80dd1 100644 --- a/tfw/components/process_management/log_inotify_observer.py +++ b/tfw/components/process_management/log_inotify_observer.py @@ -8,7 +8,6 @@ from .supervisor import ProcessLogManager class LogInotifyObserver(InotifyObserver, ProcessLogManager): def __init__(self, connector, process_name, supervisor_uri, log_tail=0): - self._prevent_log_recursion() self._connector = connector self._process_name = process_name self.log_tail = log_tail @@ -16,11 +15,6 @@ class LogInotifyObserver(InotifyObserver, ProcessLogManager): ProcessLogManager.__init__(self, supervisor_uri) InotifyObserver.__init__(self, self._get_logfiles()) - @staticmethod - def _prevent_log_recursion(): - # This is done to prevent inotify event logs triggering themselves (infinite log recursion) - logging.getLogger('watchdog.observers.inotify_buffer').propagate = False - def _get_logfiles(self): self._procinfo = self.supervisor.getProcessInfo(self._process_name) return self._procinfo['stdout_logfile'], self._procinfo['stderr_logfile'] diff --git a/tfw/internals/inotify/inotify.py b/tfw/internals/inotify/inotify.py index 4f69a02..2749ca6 100644 --- a/tfw/internals/inotify/inotify.py +++ b/tfw/internals/inotify/inotify.py @@ -1,5 +1,5 @@ # pylint: disable=too-few-public-methods - +import logging from typing import Iterable from time import time from os.path import abspath, dirname, isdir @@ -11,6 +11,8 @@ from watchdog.events import ( DirCreatedEvent, DirModifiedEvent, DirMovedEvent, DirDeletedEvent ) +logging.getLogger('watchdog.observers.inotify_buffer').propagate = False + class InotifyEvent: def __init__(self, src_path):