baseimage-tutorial-framework/tfw/components/pipe_io/pipe_connector/test_proxy_pipe_connector.py

158 lines
4.2 KiB
Python

# pylint: disable=redefined-outer-name
from enum import Enum
from dataclasses import dataclass
from json import dumps
from secrets import token_urlsafe
from os import urandom, mkfifo, mkdir, remove
from os.path import join
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from tfw.internals.inotify import InotifyFileCreatedEvent, InotifyFileDeletedEvent
from .proxy_pipe_connector_handler import ProxyPipeConnector
class Action(Enum):
SEND = 'send'
RECV = 'recv'
@dataclass
class PipeContext:
workdir: str
pipes: ProxyPipeConnector
def emit_pipe_creation_event(self, action, inode_creator):
filename = self.join(f'{self.generate_name()}_{action.value}')
inode_creator(filename)
self.pipes.observer.on_any_event(InotifyFileCreatedEvent(filename))
return filename
def join(self, path):
return join(self.workdir, path)
@staticmethod
def generate_name():
return urandom(4).hex()
class MockPipeConnector(ProxyPipeConnector):
def __init__(self, path, connector):
self.reader_events, self.writer_events = [], []
super().__init__(path, connector)
def _build_observer(self, path):
return MockObserver()
def build_reader(self, path):
self.reader_events.append(path)
reader = MockPipeServer()
reader.handle_message = self._handle_message
return reader
def build_writer(self, path):
self.writer_events.append(path)
return MockPipeServer()
class MockObserver:
def start(self):
pass
def on_any_event(self, event):
pass
class MockPipeServer:
def __init__(self):
self.messages = []
def handle_message(self, message):
pass
def send_message(self, message):
self.messages.append(message)
def start(self):
pass
def stop(self):
pass
class MockConnector: # pylint: disable=too-few-public-methods
def __init__(self):
self.messages = []
def send_message(self, message):
self.messages.append(message)
@pytest.fixture
def workdir():
with TemporaryDirectory() as workdir:
yield workdir
@pytest.fixture
def context(workdir):
mkfifo(join(workdir, Action.SEND.value))
mkfifo(join(workdir, Action.RECV.value))
yield PipeContext(workdir, MockPipeConnector(workdir, MockConnector()))
def test_existing_pipe_connection(context):
assert join(context.workdir, Action.SEND.value) in context.pipes.send_pipes.keys()
assert join(context.workdir, Action.RECV.value) in context.pipes.recv_pipes.keys()
def test_pipe_creation_deletion(context):
cases = [
(Action.RECV, context.pipes.recv_pipes, context.pipes.writer_events),
(Action.SEND, context.pipes.send_pipes, context.pipes.reader_events)
]
for action, pipes, events in cases:
path = context.emit_pipe_creation_event(action, mkfifo)
assert events[-1] == path
assert path in pipes.keys()
remove(path)
context.pipes.observer.on_any_event(InotifyFileDeletedEvent(path))
assert path not in pipes.keys()
def test_handle_message(context):
path = context.emit_pipe_creation_event(Action.SEND, mkfifo)
payload = {'key': token_urlsafe(16)}
context.pipes.send_pipes[path].handle_message(dumps(payload))
assert context.pipes.connector.messages[-1] == payload
context.pipes.send_pipes[path].handle_message(token_urlsafe(32))
assert len(context.pipes.connector.messages) == 1
def test_broadcast(context):
paths = [
context.emit_pipe_creation_event(Action.RECV, mkfifo)
for _ in range(32)
]
payload = {'key': token_urlsafe(16)}
context.pipes.broadcast(payload)
for path in paths:
assert context.pipes.recv_pipes[path].messages[-1] == payload
def test_inode_types(context):
touch = lambda path: Path(path).touch()
cases = [
(Action.RECV, context.pipes.recv_pipes, mkdir),
(Action.SEND, context.pipes.send_pipes, mkdir),
(Action.RECV, context.pipes.recv_pipes, touch),
(Action.SEND, context.pipes.send_pipes, touch)
]
for action, pipes, creator in cases:
path = context.emit_pipe_creation_event(action, creator)
assert path not in pipes.keys()