diff --git a/tfw/components/pipe_io/__init__.py b/tfw/components/pipe_io/__init__.py index 43a5b98..54bc368 100644 --- a/tfw/components/pipe_io/__init__.py +++ b/tfw/components/pipe_io/__init__.py @@ -1 +1,2 @@ from .pipe_io_handler import PipeIOHandler, PipeIOHandlerBase, TransformerPipeIOHandler, CommandHandler +from .pipe_connector import ProxyPipeConnectorHandler diff --git a/tfw/components/pipe_connector/__init__.py b/tfw/components/pipe_io/pipe_connector/__init__.py similarity index 100% rename from tfw/components/pipe_connector/__init__.py rename to tfw/components/pipe_io/pipe_connector/__init__.py diff --git a/tfw/components/pipe_connector/pipe_connector.py b/tfw/components/pipe_io/pipe_connector/pipe_connector.py similarity index 72% rename from tfw/components/pipe_connector/pipe_connector.py rename to tfw/components/pipe_io/pipe_connector/pipe_connector.py index d1500c2..1f47ace 100644 --- a/tfw/components/pipe_connector/pipe_connector.py +++ b/tfw/components/pipe_io/pipe_connector/pipe_connector.py @@ -8,6 +8,8 @@ from tfw.internals.inotify import InotifyObserver, InotifyFileCreatedEvent, Inot class PipeConnector: + reader_pattern, writer_pattern = 'send', 'recv' + def __init__(self, path): self.recv_pipes, self.send_pipes = {}, {} self.observer = self.build_observer(path) @@ -19,15 +21,15 @@ class PipeConnector: mkdir(path) if not access(path, R_OK): raise ValueError('Path does not exist or is not accessible.') - observer = InotifyObserver(path, patterns=['*send*', '*recv*']) - return observer + return InotifyObserver(path, patterns=[f'*{self.reader_pattern}*', f'*{self.writer_pattern}*']) def _on_any_event(self, event): path = event.src_path - if isinstance(event, InotifyFileCreatedEvent) and self._is_pipe(path): - self._create_pipe(path) - elif isinstance(event, InotifyFileDeletedEvent): - self._delete_pipe(path) + if self._is_pipe(path): + if isinstance(event, InotifyFileCreatedEvent): + self._create_pipe(path) + elif isinstance(event, InotifyFileDeletedEvent): + self._delete_pipe(path) @staticmethod def _is_pipe(path): @@ -37,23 +39,24 @@ class PipeConnector: if self._find_pipe(path): return server = None - if 'recv' in path: + if self.writer_pattern in path: pipes, server = self.recv_pipes, self.build_writer(path) - elif 'send' in path: + elif self.reader_pattern in path: pipes, server = self.send_pipes, self.build_reader(path) if server: server.start() pipes[path] = server def _find_pipe(self, path): + pipes = None if path in self.recv_pipes.keys(): - return self.recv_pipes + pipes = self.recv_pipes if path in self.send_pipes.keys(): - return self.send_pipes - return None + pipes = self.send_pipes + return pipes def build_reader(self, path): # pylint: disable=no-self-use - return PipeReaderServer(path, manager_pipes=False) + raise NotImplementedError() def build_writer(self, path): # pylint: disable=no-self-use return PipeWriterServer(path, manage_pipes=False) @@ -65,5 +68,5 @@ class PipeConnector: del pipes[path] def broadcast(self, message): - for _, server in self.recv_pipes.items(): + for server in self.recv_pipes.values(): server.send_message(message) diff --git a/tfw/components/pipe_connector/proxy_pipe_connector_handler.py b/tfw/components/pipe_io/pipe_connector/proxy_pipe_connector_handler.py similarity index 100% rename from tfw/components/pipe_connector/proxy_pipe_connector_handler.py rename to tfw/components/pipe_io/pipe_connector/proxy_pipe_connector_handler.py diff --git a/tfw/components/pipe_connector/test_pipe_connector.py b/tfw/components/pipe_io/pipe_connector/test_proxy_pipe_connector.py similarity index 59% rename from tfw/components/pipe_connector/test_pipe_connector.py rename to tfw/components/pipe_io/pipe_connector/test_proxy_pipe_connector.py index bdf8a04..12a541a 100644 --- a/tfw/components/pipe_connector/test_pipe_connector.py +++ b/tfw/components/pipe_io/pipe_connector/test_proxy_pipe_connector.py @@ -11,6 +11,7 @@ from tfw.internals.inotify import InotifyFileCreatedEvent, InotifyFileDeletedEve import pytest +from .pipe_connector import PipeConnector from .proxy_pipe_connector_handler import ProxyPipeConnector @@ -91,54 +92,69 @@ class MockConnector: # pylint: disable=too-few-public-methods @pytest.fixture -def context(): +def workdir(): with TemporaryDirectory() as workdir: - yield PipeContext(workdir, MockPipeConnector(workdir, MockConnector())) + yield workdir -def test_pipe_creation_deletion(context): +@pytest.fixture +def context(workdir): + yield PipeContext(workdir, PipeConnector(workdir)) + + +@pytest.fixture +def mock_context(workdir): + yield PipeContext(workdir, MockPipeConnector(workdir, MockConnector())) + + +def test_pipe_creation_deletion(mock_context): cases = [ - (Action.RECV, context.pipes.recv_pipes, context.pipes.writer_events), - (Action.SEND, context.pipes.send_pipes, context.pipes.reader_events) + (Action.RECV, mock_context.pipes.recv_pipes, mock_context.pipes.writer_events), + (Action.SEND, mock_context.pipes.send_pipes, mock_context.pipes.reader_events) ] for action, pipes, events in cases: - path = context.emit_pipe_creation_event(action, mkfifo) + path = mock_context.emit_pipe_creation_event(action, mkfifo) assert events[-1] == path assert path in pipes.keys() - context.pipes.observer.on_any_event(InotifyFileDeletedEvent(path)) + mock_context.pipes.observer.on_any_event(InotifyFileDeletedEvent(path)) assert path not in pipes.keys() -def test_handle_message(context): - path = context.emit_pipe_creation_event(Action.SEND, mkfifo) +def test_handle_message(mock_context): + path = mock_context.emit_pipe_creation_event(Action.SEND, mkfifo) payload = {'key': token_urlsafe(16)} - context.pipes.send_pipes[path].handle_message(dumps(payload)) - assert context.pipes.connector.messages[-1] == payload - context.pipes.send_pipes[path].handle_message(token_urlsafe(32)) - assert len(context.pipes.connector.messages) == 1 + mock_context.pipes.send_pipes[path].handle_message(dumps(payload)) + assert mock_context.pipes.connector.messages[-1] == payload + mock_context.pipes.send_pipes[path].handle_message(token_urlsafe(32)) + assert len(mock_context.pipes.connector.messages) == 1 -def test_broadcast(context): +def test_broadcast(mock_context): paths = [ - context.emit_pipe_creation_event(Action.RECV, mkfifo) + mock_context.emit_pipe_creation_event(Action.RECV, mkfifo) for _ in range(32) ] payload = {'key': token_urlsafe(16)} - context.pipes.broadcast(payload) + mock_context.pipes.broadcast(payload) for path in paths: - assert context.pipes.recv_pipes[path].messages[-1] == payload + assert mock_context.pipes.recv_pipes[path].messages[-1] == payload -def test_inode_types(context): +def test_inode_types(mock_context): cases = [ - (Action.RECV, context.pipes.recv_pipes, mkdir), - (Action.SEND, context.pipes.send_pipes, mkdir), - (Action.RECV, context.pipes.recv_pipes, mknod), - (Action.SEND, context.pipes.send_pipes, mknod) + (Action.RECV, mock_context.pipes.recv_pipes, mkdir), + (Action.SEND, mock_context.pipes.send_pipes, mkdir), + (Action.RECV, mock_context.pipes.recv_pipes, mknod), + (Action.SEND, mock_context.pipes.send_pipes, mknod) ] for action, pipes, creator in cases: - path = context.emit_pipe_creation_event(action, creator) + path = mock_context.emit_pipe_creation_event(action, creator) assert path not in pipes.keys() + + +def test_build_reader_implemented(context): + with pytest.raises(NotImplementedError): + context.emit_pipe_creation_event(Action.SEND, mkfifo) diff --git a/tfw/internals/inotify/inotify.py b/tfw/internals/inotify/inotify.py index 2749ca6..867a704 100644 --- a/tfw/internals/inotify/inotify.py +++ b/tfw/internals/inotify/inotify.py @@ -11,6 +11,7 @@ from watchdog.events import ( DirCreatedEvent, DirModifiedEvent, DirMovedEvent, DirDeletedEvent ) +# This is done to prevent inotify event logs triggering themselves (infinite log recursion) logging.getLogger('watchdog.observers.inotify_buffer').propagate = False