mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-22 19:01:33 +00:00
Fix issues in PR #63
This commit is contained in:
parent
f5582f0207
commit
dc0615c11e
@ -1 +1,2 @@
|
|||||||
from .pipe_io_handler import PipeIOHandler, PipeIOHandlerBase, TransformerPipeIOHandler, CommandHandler
|
from .pipe_io_handler import PipeIOHandler, PipeIOHandlerBase, TransformerPipeIOHandler, CommandHandler
|
||||||
|
from .pipe_connector import ProxyPipeConnectorHandler
|
||||||
|
@ -8,6 +8,8 @@ from tfw.internals.inotify import InotifyObserver, InotifyFileCreatedEvent, Inot
|
|||||||
|
|
||||||
|
|
||||||
class PipeConnector:
|
class PipeConnector:
|
||||||
|
reader_pattern, writer_pattern = 'send', 'recv'
|
||||||
|
|
||||||
def __init__(self, path):
|
def __init__(self, path):
|
||||||
self.recv_pipes, self.send_pipes = {}, {}
|
self.recv_pipes, self.send_pipes = {}, {}
|
||||||
self.observer = self.build_observer(path)
|
self.observer = self.build_observer(path)
|
||||||
@ -19,15 +21,15 @@ class PipeConnector:
|
|||||||
mkdir(path)
|
mkdir(path)
|
||||||
if not access(path, R_OK):
|
if not access(path, R_OK):
|
||||||
raise ValueError('Path does not exist or is not accessible.')
|
raise ValueError('Path does not exist or is not accessible.')
|
||||||
observer = InotifyObserver(path, patterns=['*send*', '*recv*'])
|
return InotifyObserver(path, patterns=[f'*{self.reader_pattern}*', f'*{self.writer_pattern}*'])
|
||||||
return observer
|
|
||||||
|
|
||||||
def _on_any_event(self, event):
|
def _on_any_event(self, event):
|
||||||
path = event.src_path
|
path = event.src_path
|
||||||
if isinstance(event, InotifyFileCreatedEvent) and self._is_pipe(path):
|
if self._is_pipe(path):
|
||||||
self._create_pipe(path)
|
if isinstance(event, InotifyFileCreatedEvent):
|
||||||
elif isinstance(event, InotifyFileDeletedEvent):
|
self._create_pipe(path)
|
||||||
self._delete_pipe(path)
|
elif isinstance(event, InotifyFileDeletedEvent):
|
||||||
|
self._delete_pipe(path)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _is_pipe(path):
|
def _is_pipe(path):
|
||||||
@ -37,23 +39,24 @@ class PipeConnector:
|
|||||||
if self._find_pipe(path):
|
if self._find_pipe(path):
|
||||||
return
|
return
|
||||||
server = None
|
server = None
|
||||||
if 'recv' in path:
|
if self.writer_pattern in path:
|
||||||
pipes, server = self.recv_pipes, self.build_writer(path)
|
pipes, server = self.recv_pipes, self.build_writer(path)
|
||||||
elif 'send' in path:
|
elif self.reader_pattern in path:
|
||||||
pipes, server = self.send_pipes, self.build_reader(path)
|
pipes, server = self.send_pipes, self.build_reader(path)
|
||||||
if server:
|
if server:
|
||||||
server.start()
|
server.start()
|
||||||
pipes[path] = server
|
pipes[path] = server
|
||||||
|
|
||||||
def _find_pipe(self, path):
|
def _find_pipe(self, path):
|
||||||
|
pipes = None
|
||||||
if path in self.recv_pipes.keys():
|
if path in self.recv_pipes.keys():
|
||||||
return self.recv_pipes
|
pipes = self.recv_pipes
|
||||||
if path in self.send_pipes.keys():
|
if path in self.send_pipes.keys():
|
||||||
return self.send_pipes
|
pipes = self.send_pipes
|
||||||
return None
|
return pipes
|
||||||
|
|
||||||
def build_reader(self, path): # pylint: disable=no-self-use
|
def build_reader(self, path): # pylint: disable=no-self-use
|
||||||
return PipeReaderServer(path, manager_pipes=False)
|
raise NotImplementedError()
|
||||||
|
|
||||||
def build_writer(self, path): # pylint: disable=no-self-use
|
def build_writer(self, path): # pylint: disable=no-self-use
|
||||||
return PipeWriterServer(path, manage_pipes=False)
|
return PipeWriterServer(path, manage_pipes=False)
|
||||||
@ -65,5 +68,5 @@ class PipeConnector:
|
|||||||
del pipes[path]
|
del pipes[path]
|
||||||
|
|
||||||
def broadcast(self, message):
|
def broadcast(self, message):
|
||||||
for _, server in self.recv_pipes.items():
|
for server in self.recv_pipes.values():
|
||||||
server.send_message(message)
|
server.send_message(message)
|
@ -11,6 +11,7 @@ from tfw.internals.inotify import InotifyFileCreatedEvent, InotifyFileDeletedEve
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from .pipe_connector import PipeConnector
|
||||||
from .proxy_pipe_connector_handler import ProxyPipeConnector
|
from .proxy_pipe_connector_handler import ProxyPipeConnector
|
||||||
|
|
||||||
|
|
||||||
@ -91,54 +92,69 @@ class MockConnector: # pylint: disable=too-few-public-methods
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def context():
|
def workdir():
|
||||||
with TemporaryDirectory() as workdir:
|
with TemporaryDirectory() as workdir:
|
||||||
yield PipeContext(workdir, MockPipeConnector(workdir, MockConnector()))
|
yield workdir
|
||||||
|
|
||||||
|
|
||||||
def test_pipe_creation_deletion(context):
|
@pytest.fixture
|
||||||
|
def context(workdir):
|
||||||
|
yield PipeContext(workdir, PipeConnector(workdir))
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_context(workdir):
|
||||||
|
yield PipeContext(workdir, MockPipeConnector(workdir, MockConnector()))
|
||||||
|
|
||||||
|
|
||||||
|
def test_pipe_creation_deletion(mock_context):
|
||||||
cases = [
|
cases = [
|
||||||
(Action.RECV, context.pipes.recv_pipes, context.pipes.writer_events),
|
(Action.RECV, mock_context.pipes.recv_pipes, mock_context.pipes.writer_events),
|
||||||
(Action.SEND, context.pipes.send_pipes, context.pipes.reader_events)
|
(Action.SEND, mock_context.pipes.send_pipes, mock_context.pipes.reader_events)
|
||||||
]
|
]
|
||||||
|
|
||||||
for action, pipes, events in cases:
|
for action, pipes, events in cases:
|
||||||
path = context.emit_pipe_creation_event(action, mkfifo)
|
path = mock_context.emit_pipe_creation_event(action, mkfifo)
|
||||||
assert events[-1] == path
|
assert events[-1] == path
|
||||||
assert path in pipes.keys()
|
assert path in pipes.keys()
|
||||||
context.pipes.observer.on_any_event(InotifyFileDeletedEvent(path))
|
mock_context.pipes.observer.on_any_event(InotifyFileDeletedEvent(path))
|
||||||
assert path not in pipes.keys()
|
assert path not in pipes.keys()
|
||||||
|
|
||||||
|
|
||||||
def test_handle_message(context):
|
def test_handle_message(mock_context):
|
||||||
path = context.emit_pipe_creation_event(Action.SEND, mkfifo)
|
path = mock_context.emit_pipe_creation_event(Action.SEND, mkfifo)
|
||||||
payload = {'key': token_urlsafe(16)}
|
payload = {'key': token_urlsafe(16)}
|
||||||
context.pipes.send_pipes[path].handle_message(dumps(payload))
|
mock_context.pipes.send_pipes[path].handle_message(dumps(payload))
|
||||||
assert context.pipes.connector.messages[-1] == payload
|
assert mock_context.pipes.connector.messages[-1] == payload
|
||||||
context.pipes.send_pipes[path].handle_message(token_urlsafe(32))
|
mock_context.pipes.send_pipes[path].handle_message(token_urlsafe(32))
|
||||||
assert len(context.pipes.connector.messages) == 1
|
assert len(mock_context.pipes.connector.messages) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_broadcast(context):
|
def test_broadcast(mock_context):
|
||||||
paths = [
|
paths = [
|
||||||
context.emit_pipe_creation_event(Action.RECV, mkfifo)
|
mock_context.emit_pipe_creation_event(Action.RECV, mkfifo)
|
||||||
for _ in range(32)
|
for _ in range(32)
|
||||||
]
|
]
|
||||||
payload = {'key': token_urlsafe(16)}
|
payload = {'key': token_urlsafe(16)}
|
||||||
|
|
||||||
context.pipes.broadcast(payload)
|
mock_context.pipes.broadcast(payload)
|
||||||
for path in paths:
|
for path in paths:
|
||||||
assert context.pipes.recv_pipes[path].messages[-1] == payload
|
assert mock_context.pipes.recv_pipes[path].messages[-1] == payload
|
||||||
|
|
||||||
|
|
||||||
def test_inode_types(context):
|
def test_inode_types(mock_context):
|
||||||
cases = [
|
cases = [
|
||||||
(Action.RECV, context.pipes.recv_pipes, mkdir),
|
(Action.RECV, mock_context.pipes.recv_pipes, mkdir),
|
||||||
(Action.SEND, context.pipes.send_pipes, mkdir),
|
(Action.SEND, mock_context.pipes.send_pipes, mkdir),
|
||||||
(Action.RECV, context.pipes.recv_pipes, mknod),
|
(Action.RECV, mock_context.pipes.recv_pipes, mknod),
|
||||||
(Action.SEND, context.pipes.send_pipes, mknod)
|
(Action.SEND, mock_context.pipes.send_pipes, mknod)
|
||||||
]
|
]
|
||||||
|
|
||||||
for action, pipes, creator in cases:
|
for action, pipes, creator in cases:
|
||||||
path = context.emit_pipe_creation_event(action, creator)
|
path = mock_context.emit_pipe_creation_event(action, creator)
|
||||||
assert path not in pipes.keys()
|
assert path not in pipes.keys()
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_reader_implemented(context):
|
||||||
|
with pytest.raises(NotImplementedError):
|
||||||
|
context.emit_pipe_creation_event(Action.SEND, mkfifo)
|
@ -11,6 +11,7 @@ from watchdog.events import (
|
|||||||
DirCreatedEvent, DirModifiedEvent, DirMovedEvent, DirDeletedEvent
|
DirCreatedEvent, DirModifiedEvent, DirMovedEvent, DirDeletedEvent
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# This is done to prevent inotify event logs triggering themselves (infinite log recursion)
|
||||||
logging.getLogger('watchdog.observers.inotify_buffer').propagate = False
|
logging.getLogger('watchdog.observers.inotify_buffer').propagate = False
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user