diff --git a/tfw/components/pipe_io/pipe_connector/pipe_connector.py b/tfw/components/pipe_io/pipe_connector/pipe_connector.py index 0c33a59..fa423da 100644 --- a/tfw/components/pipe_io/pipe_connector/pipe_connector.py +++ b/tfw/components/pipe_io/pipe_connector/pipe_connector.py @@ -1,7 +1,7 @@ import logging from stat import S_ISFIFO -from os import access, mkdir, stat, R_OK -from os.path import exists +from os import access, listdir, mkdir, stat, R_OK +from os.path import exists, join from pipe_io_server import PipeWriterServer @@ -18,6 +18,11 @@ class PipeConnector: self.observer = self.build_observer(path) self.observer.on_any_event = self._on_any_event self.observer.start() + for node in listdir(path): + node_path = join(path, node) + if self._is_pipe(node_path): + self._create_pipe(node_path) + LOG.debug('Connected to existing pipe "%s"', node_path) def build_observer(self, path): # pylint: disable=no-self-use if not exists(path): @@ -29,17 +34,17 @@ class PipeConnector: def _on_any_event(self, event): path = event.src_path if self._is_pipe(path) and isinstance(event, InotifyFileCreatedEvent): - self._on_create_pipe(path) + self._create_pipe(path) LOG.debug('Connected to new pipe "%s"', path) elif isinstance(event, InotifyFileDeletedEvent): - self._on_delete_pipe(path) + self._delete_pipe(path) LOG.debug('Disconnected from deleted pipe "%s"', path) @staticmethod def _is_pipe(path): return exists(path) and S_ISFIFO(stat(path).st_mode) - def _on_create_pipe(self, path): + def _create_pipe(self, path): if self._find_pipe(path): return server = None @@ -65,7 +70,7 @@ class PipeConnector: def build_writer(self, path): # pylint: disable=no-self-use return PipeWriterServer(path, manage_pipes=False) - def _on_delete_pipe(self, path): + def _delete_pipe(self, path): pipes = self._find_pipe(path) if pipes: pipes[path].stop() @@ -74,3 +79,9 @@ class PipeConnector: def broadcast(self, message): for server in self.recv_pipes.values(): server.send_message(message) + + def stop(self): + for pipe in self.recv_pipes.values(): + pipe.stop() + for pipe in self.send_pipes.values(): + pipe.stop() diff --git a/tfw/components/pipe_io/pipe_connector/proxy_pipe_connector_handler.py b/tfw/components/pipe_io/pipe_connector/proxy_pipe_connector_handler.py index b3b20f7..c82ebca 100644 --- a/tfw/components/pipe_io/pipe_connector/proxy_pipe_connector_handler.py +++ b/tfw/components/pipe_io/pipe_connector/proxy_pipe_connector_handler.py @@ -22,6 +22,9 @@ class ProxyPipeConnectorHandler: def handle_event(self, message, _): self.pipes.broadcast(dumps(message).encode()) + def cleanup(self): + self.pipes.stop() + class ProxyPipeConnector(PipeConnector): def __init__(self, path, connector): diff --git a/tfw/components/pipe_io/pipe_connector/test_proxy_pipe_connector.py b/tfw/components/pipe_io/pipe_connector/test_proxy_pipe_connector.py index 65175f7..e49f7cc 100644 --- a/tfw/components/pipe_io/pipe_connector/test_proxy_pipe_connector.py +++ b/tfw/components/pipe_io/pipe_connector/test_proxy_pipe_connector.py @@ -11,7 +11,6 @@ from tempfile import TemporaryDirectory import pytest from tfw.internals.inotify import InotifyFileCreatedEvent, InotifyFileDeletedEvent -from .pipe_connector import PipeConnector from .proxy_pipe_connector_handler import ProxyPipeConnector @@ -99,63 +98,60 @@ def workdir(): @pytest.fixture def context(workdir): - yield PipeContext(workdir, PipeConnector(workdir)) - - -@pytest.fixture -def mock_context(workdir): + mkfifo(join(workdir, Action.SEND.value)) + mkfifo(join(workdir, Action.RECV.value)) yield PipeContext(workdir, MockPipeConnector(workdir, MockConnector())) -def test_pipe_creation_deletion(mock_context): +def test_existing_pipe_connection(context): + assert join(context.workdir, Action.SEND.value) in context.pipes.send_pipes.keys() + assert join(context.workdir, Action.RECV.value) in context.pipes.recv_pipes.keys() + + +def test_pipe_creation_deletion(context): cases = [ - (Action.RECV, mock_context.pipes.recv_pipes, mock_context.pipes.writer_events), - (Action.SEND, mock_context.pipes.send_pipes, mock_context.pipes.reader_events) + (Action.RECV, context.pipes.recv_pipes, context.pipes.writer_events), + (Action.SEND, context.pipes.send_pipes, context.pipes.reader_events) ] for action, pipes, events in cases: - path = mock_context.emit_pipe_creation_event(action, mkfifo) + path = context.emit_pipe_creation_event(action, mkfifo) assert events[-1] == path assert path in pipes.keys() remove(path) - mock_context.pipes.observer.on_any_event(InotifyFileDeletedEvent(path)) + context.pipes.observer.on_any_event(InotifyFileDeletedEvent(path)) assert path not in pipes.keys() -def test_handle_message(mock_context): - path = mock_context.emit_pipe_creation_event(Action.SEND, mkfifo) +def test_handle_message(context): + path = context.emit_pipe_creation_event(Action.SEND, mkfifo) payload = {'key': token_urlsafe(16)} - mock_context.pipes.send_pipes[path].handle_message(dumps(payload)) - assert mock_context.pipes.connector.messages[-1] == payload - mock_context.pipes.send_pipes[path].handle_message(token_urlsafe(32)) - assert len(mock_context.pipes.connector.messages) == 1 + context.pipes.send_pipes[path].handle_message(dumps(payload)) + assert context.pipes.connector.messages[-1] == payload + context.pipes.send_pipes[path].handle_message(token_urlsafe(32)) + assert len(context.pipes.connector.messages) == 1 -def test_broadcast(mock_context): +def test_broadcast(context): paths = [ - mock_context.emit_pipe_creation_event(Action.RECV, mkfifo) + context.emit_pipe_creation_event(Action.RECV, mkfifo) for _ in range(32) ] payload = {'key': token_urlsafe(16)} - mock_context.pipes.broadcast(payload) + context.pipes.broadcast(payload) for path in paths: - assert mock_context.pipes.recv_pipes[path].messages[-1] == payload + assert context.pipes.recv_pipes[path].messages[-1] == payload -def test_inode_types(mock_context): +def test_inode_types(context): touch = lambda path: Path(path).touch() cases = [ - (Action.RECV, mock_context.pipes.recv_pipes, mkdir), - (Action.SEND, mock_context.pipes.send_pipes, mkdir), - (Action.RECV, mock_context.pipes.recv_pipes, touch), - (Action.SEND, mock_context.pipes.send_pipes, touch) + (Action.RECV, context.pipes.recv_pipes, mkdir), + (Action.SEND, context.pipes.send_pipes, mkdir), + (Action.RECV, context.pipes.recv_pipes, touch), + (Action.SEND, context.pipes.send_pipes, touch) ] for action, pipes, creator in cases: - path = mock_context.emit_pipe_creation_event(action, creator) + path = context.emit_pipe_creation_event(action, creator) assert path not in pipes.keys() - - -def test_build_reader_implemented(context): - with pytest.raises(NotImplementedError): - context.emit_pipe_creation_event(Action.SEND, mkfifo)