mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-22 13:51:33 +00:00
Make PipeConnector stop gracefully and connect to every pipe on init
This commit is contained in:
parent
5b0f79dbae
commit
d6f2eb987f
@ -1,7 +1,7 @@
|
|||||||
import logging
|
import logging
|
||||||
from stat import S_ISFIFO
|
from stat import S_ISFIFO
|
||||||
from os import access, mkdir, stat, R_OK
|
from os import access, listdir, mkdir, stat, R_OK
|
||||||
from os.path import exists
|
from os.path import exists, join
|
||||||
|
|
||||||
from pipe_io_server import PipeWriterServer
|
from pipe_io_server import PipeWriterServer
|
||||||
|
|
||||||
@ -18,6 +18,11 @@ class PipeConnector:
|
|||||||
self.observer = self.build_observer(path)
|
self.observer = self.build_observer(path)
|
||||||
self.observer.on_any_event = self._on_any_event
|
self.observer.on_any_event = self._on_any_event
|
||||||
self.observer.start()
|
self.observer.start()
|
||||||
|
for node in listdir(path):
|
||||||
|
node_path = join(path, node)
|
||||||
|
if self._is_pipe(node_path):
|
||||||
|
self._create_pipe(node_path)
|
||||||
|
LOG.debug('Connected to existing pipe "%s"', node_path)
|
||||||
|
|
||||||
def build_observer(self, path): # pylint: disable=no-self-use
|
def build_observer(self, path): # pylint: disable=no-self-use
|
||||||
if not exists(path):
|
if not exists(path):
|
||||||
@ -29,17 +34,17 @@ class PipeConnector:
|
|||||||
def _on_any_event(self, event):
|
def _on_any_event(self, event):
|
||||||
path = event.src_path
|
path = event.src_path
|
||||||
if self._is_pipe(path) and isinstance(event, InotifyFileCreatedEvent):
|
if self._is_pipe(path) and isinstance(event, InotifyFileCreatedEvent):
|
||||||
self._on_create_pipe(path)
|
self._create_pipe(path)
|
||||||
LOG.debug('Connected to new pipe "%s"', path)
|
LOG.debug('Connected to new pipe "%s"', path)
|
||||||
elif isinstance(event, InotifyFileDeletedEvent):
|
elif isinstance(event, InotifyFileDeletedEvent):
|
||||||
self._on_delete_pipe(path)
|
self._delete_pipe(path)
|
||||||
LOG.debug('Disconnected from deleted pipe "%s"', path)
|
LOG.debug('Disconnected from deleted pipe "%s"', path)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _is_pipe(path):
|
def _is_pipe(path):
|
||||||
return exists(path) and S_ISFIFO(stat(path).st_mode)
|
return exists(path) and S_ISFIFO(stat(path).st_mode)
|
||||||
|
|
||||||
def _on_create_pipe(self, path):
|
def _create_pipe(self, path):
|
||||||
if self._find_pipe(path):
|
if self._find_pipe(path):
|
||||||
return
|
return
|
||||||
server = None
|
server = None
|
||||||
@ -65,7 +70,7 @@ class PipeConnector:
|
|||||||
def build_writer(self, path): # pylint: disable=no-self-use
|
def build_writer(self, path): # pylint: disable=no-self-use
|
||||||
return PipeWriterServer(path, manage_pipes=False)
|
return PipeWriterServer(path, manage_pipes=False)
|
||||||
|
|
||||||
def _on_delete_pipe(self, path):
|
def _delete_pipe(self, path):
|
||||||
pipes = self._find_pipe(path)
|
pipes = self._find_pipe(path)
|
||||||
if pipes:
|
if pipes:
|
||||||
pipes[path].stop()
|
pipes[path].stop()
|
||||||
@ -74,3 +79,9 @@ class PipeConnector:
|
|||||||
def broadcast(self, message):
|
def broadcast(self, message):
|
||||||
for server in self.recv_pipes.values():
|
for server in self.recv_pipes.values():
|
||||||
server.send_message(message)
|
server.send_message(message)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
for pipe in self.recv_pipes.values():
|
||||||
|
pipe.stop()
|
||||||
|
for pipe in self.send_pipes.values():
|
||||||
|
pipe.stop()
|
||||||
|
@ -22,6 +22,9 @@ class ProxyPipeConnectorHandler:
|
|||||||
def handle_event(self, message, _):
|
def handle_event(self, message, _):
|
||||||
self.pipes.broadcast(dumps(message).encode())
|
self.pipes.broadcast(dumps(message).encode())
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
self.pipes.stop()
|
||||||
|
|
||||||
|
|
||||||
class ProxyPipeConnector(PipeConnector):
|
class ProxyPipeConnector(PipeConnector):
|
||||||
def __init__(self, path, connector):
|
def __init__(self, path, connector):
|
||||||
|
@ -11,7 +11,6 @@ from tempfile import TemporaryDirectory
|
|||||||
import pytest
|
import pytest
|
||||||
from tfw.internals.inotify import InotifyFileCreatedEvent, InotifyFileDeletedEvent
|
from tfw.internals.inotify import InotifyFileCreatedEvent, InotifyFileDeletedEvent
|
||||||
|
|
||||||
from .pipe_connector import PipeConnector
|
|
||||||
from .proxy_pipe_connector_handler import ProxyPipeConnector
|
from .proxy_pipe_connector_handler import ProxyPipeConnector
|
||||||
|
|
||||||
|
|
||||||
@ -99,63 +98,60 @@ def workdir():
|
|||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def context(workdir):
|
def context(workdir):
|
||||||
yield PipeContext(workdir, PipeConnector(workdir))
|
mkfifo(join(workdir, Action.SEND.value))
|
||||||
|
mkfifo(join(workdir, Action.RECV.value))
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def mock_context(workdir):
|
|
||||||
yield PipeContext(workdir, MockPipeConnector(workdir, MockConnector()))
|
yield PipeContext(workdir, MockPipeConnector(workdir, MockConnector()))
|
||||||
|
|
||||||
|
|
||||||
def test_pipe_creation_deletion(mock_context):
|
def test_existing_pipe_connection(context):
|
||||||
|
assert join(context.workdir, Action.SEND.value) in context.pipes.send_pipes.keys()
|
||||||
|
assert join(context.workdir, Action.RECV.value) in context.pipes.recv_pipes.keys()
|
||||||
|
|
||||||
|
|
||||||
|
def test_pipe_creation_deletion(context):
|
||||||
cases = [
|
cases = [
|
||||||
(Action.RECV, mock_context.pipes.recv_pipes, mock_context.pipes.writer_events),
|
(Action.RECV, context.pipes.recv_pipes, context.pipes.writer_events),
|
||||||
(Action.SEND, mock_context.pipes.send_pipes, mock_context.pipes.reader_events)
|
(Action.SEND, context.pipes.send_pipes, context.pipes.reader_events)
|
||||||
]
|
]
|
||||||
|
|
||||||
for action, pipes, events in cases:
|
for action, pipes, events in cases:
|
||||||
path = mock_context.emit_pipe_creation_event(action, mkfifo)
|
path = context.emit_pipe_creation_event(action, mkfifo)
|
||||||
assert events[-1] == path
|
assert events[-1] == path
|
||||||
assert path in pipes.keys()
|
assert path in pipes.keys()
|
||||||
remove(path)
|
remove(path)
|
||||||
mock_context.pipes.observer.on_any_event(InotifyFileDeletedEvent(path))
|
context.pipes.observer.on_any_event(InotifyFileDeletedEvent(path))
|
||||||
assert path not in pipes.keys()
|
assert path not in pipes.keys()
|
||||||
|
|
||||||
|
|
||||||
def test_handle_message(mock_context):
|
def test_handle_message(context):
|
||||||
path = mock_context.emit_pipe_creation_event(Action.SEND, mkfifo)
|
path = context.emit_pipe_creation_event(Action.SEND, mkfifo)
|
||||||
payload = {'key': token_urlsafe(16)}
|
payload = {'key': token_urlsafe(16)}
|
||||||
mock_context.pipes.send_pipes[path].handle_message(dumps(payload))
|
context.pipes.send_pipes[path].handle_message(dumps(payload))
|
||||||
assert mock_context.pipes.connector.messages[-1] == payload
|
assert context.pipes.connector.messages[-1] == payload
|
||||||
mock_context.pipes.send_pipes[path].handle_message(token_urlsafe(32))
|
context.pipes.send_pipes[path].handle_message(token_urlsafe(32))
|
||||||
assert len(mock_context.pipes.connector.messages) == 1
|
assert len(context.pipes.connector.messages) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_broadcast(mock_context):
|
def test_broadcast(context):
|
||||||
paths = [
|
paths = [
|
||||||
mock_context.emit_pipe_creation_event(Action.RECV, mkfifo)
|
context.emit_pipe_creation_event(Action.RECV, mkfifo)
|
||||||
for _ in range(32)
|
for _ in range(32)
|
||||||
]
|
]
|
||||||
payload = {'key': token_urlsafe(16)}
|
payload = {'key': token_urlsafe(16)}
|
||||||
|
|
||||||
mock_context.pipes.broadcast(payload)
|
context.pipes.broadcast(payload)
|
||||||
for path in paths:
|
for path in paths:
|
||||||
assert mock_context.pipes.recv_pipes[path].messages[-1] == payload
|
assert context.pipes.recv_pipes[path].messages[-1] == payload
|
||||||
|
|
||||||
def test_inode_types(mock_context):
|
def test_inode_types(context):
|
||||||
touch = lambda path: Path(path).touch()
|
touch = lambda path: Path(path).touch()
|
||||||
cases = [
|
cases = [
|
||||||
(Action.RECV, mock_context.pipes.recv_pipes, mkdir),
|
(Action.RECV, context.pipes.recv_pipes, mkdir),
|
||||||
(Action.SEND, mock_context.pipes.send_pipes, mkdir),
|
(Action.SEND, context.pipes.send_pipes, mkdir),
|
||||||
(Action.RECV, mock_context.pipes.recv_pipes, touch),
|
(Action.RECV, context.pipes.recv_pipes, touch),
|
||||||
(Action.SEND, mock_context.pipes.send_pipes, touch)
|
(Action.SEND, context.pipes.send_pipes, touch)
|
||||||
]
|
]
|
||||||
|
|
||||||
for action, pipes, creator in cases:
|
for action, pipes, creator in cases:
|
||||||
path = mock_context.emit_pipe_creation_event(action, creator)
|
path = context.emit_pipe_creation_event(action, creator)
|
||||||
assert path not in pipes.keys()
|
assert path not in pipes.keys()
|
||||||
|
|
||||||
|
|
||||||
def test_build_reader_implemented(context):
|
|
||||||
with pytest.raises(NotImplementedError):
|
|
||||||
context.emit_pipe_creation_event(Action.SEND, mkfifo)
|
|
||||||
|
Loading…
Reference in New Issue
Block a user