Merge pull request #63 from avatao-content/pipe-connector

Implement inotify based pipe connector
This commit is contained in:
therealkrispet 2019-08-14 15:48:46 +02:00 committed by GitHub
commit a97a4f4b12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 281 additions and 7 deletions

View File

@ -1 +1,2 @@
from .pipe_io_handler import PipeIOHandler, PipeIOHandlerBase, TransformerPipeIOHandler, CommandHandler from .pipe_io_handler import PipeIOHandler, PipeIOHandlerBase, TransformerPipeIOHandler, CommandHandler
from .pipe_connector import ProxyPipeConnectorHandler

View File

@ -0,0 +1 @@
from .proxy_pipe_connector_handler import ProxyPipeConnectorHandler

View File

@ -0,0 +1,72 @@
from stat import S_ISFIFO
from os import access, mkdir, stat, R_OK
from os.path import exists
from pipe_io_server import PipeReaderServer, PipeWriterServer
from tfw.internals.inotify import InotifyObserver, InotifyFileCreatedEvent, InotifyFileDeletedEvent
class PipeConnector:
reader_pattern, writer_pattern = 'send', 'recv'
def __init__(self, path):
self.recv_pipes, self.send_pipes = {}, {}
self.observer = self.build_observer(path)
self.observer.on_any_event = self._on_any_event
self.observer.start()
def build_observer(self, path): # pylint: disable=no-self-use
if not exists(path):
mkdir(path)
if not access(path, R_OK):
raise ValueError('Path does not exist or is not accessible.')
return InotifyObserver(path, patterns=[f'*{self.reader_pattern}*', f'*{self.writer_pattern}*'])
def _on_any_event(self, event):
path = event.src_path
if self._is_pipe(path):
if isinstance(event, InotifyFileCreatedEvent):
self._create_pipe(path)
elif isinstance(event, InotifyFileDeletedEvent):
self._delete_pipe(path)
@staticmethod
def _is_pipe(path):
return exists(path) and S_ISFIFO(stat(path).st_mode)
def _create_pipe(self, path):
if self._find_pipe(path):
return
server = None
if self.writer_pattern in path:
pipes, server = self.recv_pipes, self.build_writer(path)
elif self.reader_pattern in path:
pipes, server = self.send_pipes, self.build_reader(path)
if server:
server.start()
pipes[path] = server
def _find_pipe(self, path):
pipes = None
if path in self.recv_pipes.keys():
pipes = self.recv_pipes
if path in self.send_pipes.keys():
pipes = self.send_pipes
return pipes
def build_reader(self, path): # pylint: disable=no-self-use
raise NotImplementedError()
def build_writer(self, path): # pylint: disable=no-self-use
return PipeWriterServer(path, manage_pipes=False)
def _delete_pipe(self, path):
pipes = self._find_pipe(path)
if pipes:
pipes[path].stop()
del pipes[path]
def broadcast(self, message):
for server in self.recv_pipes.values():
server.send_message(message)

View File

@ -0,0 +1,43 @@
import logging
from threading import Lock
from json import dumps, loads, JSONDecodeError
from pipe_io_server import PipeReaderServer
from .pipe_connector import PipeConnector
LOG = logging.getLogger(__name__)
class ProxyPipeConnectorHandler:
keys = ['']
def __init__(self, path):
self.connector, self.pipes = None, None
self.path = path
def start(self):
self.pipes = ProxyPipeConnector(self.path, self.connector)
def handle_event(self, message, _):
self.pipes.broadcast(dumps(message).encode())
class ProxyPipeConnector(PipeConnector):
def __init__(self, path, connector):
self.connector = connector
self.mutex = Lock()
super().__init__(path)
def build_reader(self, path):
reader = PipeReaderServer(path, manage_pipes=False)
reader.handle_message = self._handle_message
return reader
def _handle_message(self, message):
try:
json_object = loads(message)
with self.mutex:
self.connector.send_message(json_object)
except JSONDecodeError:
LOG.error('Received invalid JSON message: %s', message)

View File

@ -0,0 +1,160 @@
# pylint: disable=redefined-outer-name
from enum import Enum
from dataclasses import dataclass
from json import dumps
from secrets import token_urlsafe
from os import urandom, mkfifo, mkdir, mknod
from os.path import join
from tempfile import TemporaryDirectory
from tfw.internals.inotify import InotifyFileCreatedEvent, InotifyFileDeletedEvent
import pytest
from .pipe_connector import PipeConnector
from .proxy_pipe_connector_handler import ProxyPipeConnector
class Action(Enum):
SEND = 'send'
RECV = 'recv'
@dataclass
class PipeContext:
workdir: str
pipes: ProxyPipeConnector
def emit_pipe_creation_event(self, action, inode_creator):
filename = self.join(f'{self.generate_name()}_{action.value}')
inode_creator(filename)
self.pipes.observer.on_any_event(InotifyFileCreatedEvent(filename))
return filename
def join(self, path):
return join(self.workdir, path)
@staticmethod
def generate_name():
return urandom(4).hex()
class MockPipeConnector(ProxyPipeConnector):
def __init__(self, path, connector):
self.reader_events, self.writer_events = [], []
super().__init__(path, connector)
def build_observer(self, path):
return MockObserver()
def build_reader(self, path):
self.reader_events.append(path)
reader = MockPipeServer()
reader.handle_message = self._handle_message
return reader
def build_writer(self, path):
self.writer_events.append(path)
return MockPipeServer()
class MockObserver:
def start(self):
pass
def on_any_event(self, event):
pass
class MockPipeServer:
def __init__(self):
self.messages = []
def handle_message(self, message):
pass
def send_message(self, message):
self.messages.append(message)
def start(self):
pass
def stop(self):
pass
class MockConnector: # pylint: disable=too-few-public-methods
def __init__(self):
self.messages = []
def send_message(self, message):
self.messages.append(message)
@pytest.fixture
def workdir():
with TemporaryDirectory() as workdir:
yield workdir
@pytest.fixture
def context(workdir):
yield PipeContext(workdir, PipeConnector(workdir))
@pytest.fixture
def mock_context(workdir):
yield PipeContext(workdir, MockPipeConnector(workdir, MockConnector()))
def test_pipe_creation_deletion(mock_context):
cases = [
(Action.RECV, mock_context.pipes.recv_pipes, mock_context.pipes.writer_events),
(Action.SEND, mock_context.pipes.send_pipes, mock_context.pipes.reader_events)
]
for action, pipes, events in cases:
path = mock_context.emit_pipe_creation_event(action, mkfifo)
assert events[-1] == path
assert path in pipes.keys()
mock_context.pipes.observer.on_any_event(InotifyFileDeletedEvent(path))
assert path not in pipes.keys()
def test_handle_message(mock_context):
path = mock_context.emit_pipe_creation_event(Action.SEND, mkfifo)
payload = {'key': token_urlsafe(16)}
mock_context.pipes.send_pipes[path].handle_message(dumps(payload))
assert mock_context.pipes.connector.messages[-1] == payload
mock_context.pipes.send_pipes[path].handle_message(token_urlsafe(32))
assert len(mock_context.pipes.connector.messages) == 1
def test_broadcast(mock_context):
paths = [
mock_context.emit_pipe_creation_event(Action.RECV, mkfifo)
for _ in range(32)
]
payload = {'key': token_urlsafe(16)}
mock_context.pipes.broadcast(payload)
for path in paths:
assert mock_context.pipes.recv_pipes[path].messages[-1] == payload
def test_inode_types(mock_context):
cases = [
(Action.RECV, mock_context.pipes.recv_pipes, mkdir),
(Action.SEND, mock_context.pipes.send_pipes, mkdir),
(Action.RECV, mock_context.pipes.recv_pipes, mknod),
(Action.SEND, mock_context.pipes.send_pipes, mknod)
]
for action, pipes, creator in cases:
path = mock_context.emit_pipe_creation_event(action, creator)
assert path not in pipes.keys()
def test_build_reader_implemented(context):
with pytest.raises(NotImplementedError):
context.emit_pipe_creation_event(Action.SEND, mkfifo)

View File

@ -8,7 +8,6 @@ from .supervisor import ProcessLogManager
class LogInotifyObserver(InotifyObserver, ProcessLogManager): class LogInotifyObserver(InotifyObserver, ProcessLogManager):
def __init__(self, connector, process_name, supervisor_uri, log_tail=0): def __init__(self, connector, process_name, supervisor_uri, log_tail=0):
self._prevent_log_recursion()
self._connector = connector self._connector = connector
self._process_name = process_name self._process_name = process_name
self.log_tail = log_tail self.log_tail = log_tail
@ -16,11 +15,6 @@ class LogInotifyObserver(InotifyObserver, ProcessLogManager):
ProcessLogManager.__init__(self, supervisor_uri) ProcessLogManager.__init__(self, supervisor_uri)
InotifyObserver.__init__(self, self._get_logfiles()) InotifyObserver.__init__(self, self._get_logfiles())
@staticmethod
def _prevent_log_recursion():
# This is done to prevent inotify event logs triggering themselves (infinite log recursion)
logging.getLogger('watchdog.observers.inotify_buffer').propagate = False
def _get_logfiles(self): def _get_logfiles(self):
self._procinfo = self.supervisor.getProcessInfo(self._process_name) self._procinfo = self.supervisor.getProcessInfo(self._process_name)
return self._procinfo['stdout_logfile'], self._procinfo['stderr_logfile'] return self._procinfo['stdout_logfile'], self._procinfo['stderr_logfile']

View File

@ -1,5 +1,5 @@
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
import logging
from typing import Iterable from typing import Iterable
from time import time from time import time
from os.path import abspath, dirname, isdir from os.path import abspath, dirname, isdir
@ -11,6 +11,9 @@ from watchdog.events import (
DirCreatedEvent, DirModifiedEvent, DirMovedEvent, DirDeletedEvent DirCreatedEvent, DirModifiedEvent, DirMovedEvent, DirDeletedEvent
) )
# This is done to prevent inotify event logs triggering themselves (infinite log recursion)
logging.getLogger('watchdog.observers.inotify_buffer').propagate = False
class InotifyEvent: class InotifyEvent:
def __init__(self, src_path): def __init__(self, src_path):