Implement inotify based pipe connector

This commit is contained in:
R. Richard 2019-08-14 14:12:31 +02:00
parent 9e49a93df3
commit f5582f0207
6 changed files with 260 additions and 7 deletions

View File

@ -0,0 +1 @@
from .proxy_pipe_connector_handler import ProxyPipeConnectorHandler

View File

@ -0,0 +1,69 @@
from stat import S_ISFIFO
from os import access, mkdir, stat, R_OK
from os.path import exists
from pipe_io_server import PipeReaderServer, PipeWriterServer
from tfw.internals.inotify import InotifyObserver, InotifyFileCreatedEvent, InotifyFileDeletedEvent
class PipeConnector:
def __init__(self, path):
self.recv_pipes, self.send_pipes = {}, {}
self.observer = self.build_observer(path)
self.observer.on_any_event = self._on_any_event
self.observer.start()
def build_observer(self, path): # pylint: disable=no-self-use
if not exists(path):
mkdir(path)
if not access(path, R_OK):
raise ValueError('Path does not exist or is not accessible.')
observer = InotifyObserver(path, patterns=['*send*', '*recv*'])
return observer
def _on_any_event(self, event):
path = event.src_path
if isinstance(event, InotifyFileCreatedEvent) and self._is_pipe(path):
self._create_pipe(path)
elif isinstance(event, InotifyFileDeletedEvent):
self._delete_pipe(path)
@staticmethod
def _is_pipe(path):
return exists(path) and S_ISFIFO(stat(path).st_mode)
def _create_pipe(self, path):
if self._find_pipe(path):
return
server = None
if 'recv' in path:
pipes, server = self.recv_pipes, self.build_writer(path)
elif 'send' in path:
pipes, server = self.send_pipes, self.build_reader(path)
if server:
server.start()
pipes[path] = server
def _find_pipe(self, path):
if path in self.recv_pipes.keys():
return self.recv_pipes
if path in self.send_pipes.keys():
return self.send_pipes
return None
def build_reader(self, path): # pylint: disable=no-self-use
return PipeReaderServer(path, manager_pipes=False)
def build_writer(self, path): # pylint: disable=no-self-use
return PipeWriterServer(path, manage_pipes=False)
def _delete_pipe(self, path):
pipes = self._find_pipe(path)
if pipes:
pipes[path].stop()
del pipes[path]
def broadcast(self, message):
for _, server in self.recv_pipes.items():
server.send_message(message)

View File

@ -0,0 +1,43 @@
import logging
from threading import Lock
from json import dumps, loads, JSONDecodeError
from pipe_io_server import PipeReaderServer
from .pipe_connector import PipeConnector
LOG = logging.getLogger(__name__)
class ProxyPipeConnectorHandler:
keys = ['']
def __init__(self, path):
self.connector, self.pipes = None, None
self.path = path
def start(self):
self.pipes = ProxyPipeConnector(self.path, self.connector)
def handle_event(self, message, _):
self.pipes.broadcast(dumps(message).encode())
class ProxyPipeConnector(PipeConnector):
def __init__(self, path, connector):
self.connector = connector
self.mutex = Lock()
super().__init__(path)
def build_reader(self, path):
reader = PipeReaderServer(path, manage_pipes=False)
reader.handle_message = self._handle_message
return reader
def _handle_message(self, message):
try:
json_object = loads(message)
with self.mutex:
self.connector.send_message(json_object)
except JSONDecodeError:
LOG.error('Received invalid JSON message: %s', message)

View File

@ -0,0 +1,144 @@
# pylint: disable=redefined-outer-name
from enum import Enum
from dataclasses import dataclass
from json import dumps
from secrets import token_urlsafe
from os import urandom, mkfifo, mkdir, mknod
from os.path import join
from tempfile import TemporaryDirectory
from tfw.internals.inotify import InotifyFileCreatedEvent, InotifyFileDeletedEvent
import pytest
from .proxy_pipe_connector_handler import ProxyPipeConnector
class Action(Enum):
SEND = 'send'
RECV = 'recv'
@dataclass
class PipeContext:
workdir: str
pipes: ProxyPipeConnector
def emit_pipe_creation_event(self, action, inode_creator):
filename = self.join(f'{self.generate_name()}_{action.value}')
inode_creator(filename)
self.pipes.observer.on_any_event(InotifyFileCreatedEvent(filename))
return filename
def join(self, path):
return join(self.workdir, path)
@staticmethod
def generate_name():
return urandom(4).hex()
class MockPipeConnector(ProxyPipeConnector):
def __init__(self, path, connector):
self.reader_events, self.writer_events = [], []
super().__init__(path, connector)
def build_observer(self, path):
return MockObserver()
def build_reader(self, path):
self.reader_events.append(path)
reader = MockPipeServer()
reader.handle_message = self._handle_message
return reader
def build_writer(self, path):
self.writer_events.append(path)
return MockPipeServer()
class MockObserver:
def start(self):
pass
def on_any_event(self, event):
pass
class MockPipeServer:
def __init__(self):
self.messages = []
def handle_message(self, message):
pass
def send_message(self, message):
self.messages.append(message)
def start(self):
pass
def stop(self):
pass
class MockConnector: # pylint: disable=too-few-public-methods
def __init__(self):
self.messages = []
def send_message(self, message):
self.messages.append(message)
@pytest.fixture
def context():
with TemporaryDirectory() as workdir:
yield PipeContext(workdir, MockPipeConnector(workdir, MockConnector()))
def test_pipe_creation_deletion(context):
cases = [
(Action.RECV, context.pipes.recv_pipes, context.pipes.writer_events),
(Action.SEND, context.pipes.send_pipes, context.pipes.reader_events)
]
for action, pipes, events in cases:
path = context.emit_pipe_creation_event(action, mkfifo)
assert events[-1] == path
assert path in pipes.keys()
context.pipes.observer.on_any_event(InotifyFileDeletedEvent(path))
assert path not in pipes.keys()
def test_handle_message(context):
path = context.emit_pipe_creation_event(Action.SEND, mkfifo)
payload = {'key': token_urlsafe(16)}
context.pipes.send_pipes[path].handle_message(dumps(payload))
assert context.pipes.connector.messages[-1] == payload
context.pipes.send_pipes[path].handle_message(token_urlsafe(32))
assert len(context.pipes.connector.messages) == 1
def test_broadcast(context):
paths = [
context.emit_pipe_creation_event(Action.RECV, mkfifo)
for _ in range(32)
]
payload = {'key': token_urlsafe(16)}
context.pipes.broadcast(payload)
for path in paths:
assert context.pipes.recv_pipes[path].messages[-1] == payload
def test_inode_types(context):
cases = [
(Action.RECV, context.pipes.recv_pipes, mkdir),
(Action.SEND, context.pipes.send_pipes, mkdir),
(Action.RECV, context.pipes.recv_pipes, mknod),
(Action.SEND, context.pipes.send_pipes, mknod)
]
for action, pipes, creator in cases:
path = context.emit_pipe_creation_event(action, creator)
assert path not in pipes.keys()

View File

@ -8,7 +8,6 @@ from .supervisor import ProcessLogManager
class LogInotifyObserver(InotifyObserver, ProcessLogManager): class LogInotifyObserver(InotifyObserver, ProcessLogManager):
def __init__(self, connector, process_name, supervisor_uri, log_tail=0): def __init__(self, connector, process_name, supervisor_uri, log_tail=0):
self._prevent_log_recursion()
self._connector = connector self._connector = connector
self._process_name = process_name self._process_name = process_name
self.log_tail = log_tail self.log_tail = log_tail
@ -16,11 +15,6 @@ class LogInotifyObserver(InotifyObserver, ProcessLogManager):
ProcessLogManager.__init__(self, supervisor_uri) ProcessLogManager.__init__(self, supervisor_uri)
InotifyObserver.__init__(self, self._get_logfiles()) InotifyObserver.__init__(self, self._get_logfiles())
@staticmethod
def _prevent_log_recursion():
# This is done to prevent inotify event logs triggering themselves (infinite log recursion)
logging.getLogger('watchdog.observers.inotify_buffer').propagate = False
def _get_logfiles(self): def _get_logfiles(self):
self._procinfo = self.supervisor.getProcessInfo(self._process_name) self._procinfo = self.supervisor.getProcessInfo(self._process_name)
return self._procinfo['stdout_logfile'], self._procinfo['stderr_logfile'] return self._procinfo['stdout_logfile'], self._procinfo['stderr_logfile']

View File

@ -1,5 +1,5 @@
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
import logging
from typing import Iterable from typing import Iterable
from time import time from time import time
from os.path import abspath, dirname, isdir from os.path import abspath, dirname, isdir
@ -11,6 +11,8 @@ from watchdog.events import (
DirCreatedEvent, DirModifiedEvent, DirMovedEvent, DirDeletedEvent DirCreatedEvent, DirModifiedEvent, DirMovedEvent, DirDeletedEvent
) )
logging.getLogger('watchdog.observers.inotify_buffer').propagate = False
class InotifyEvent: class InotifyEvent:
def __init__(self, src_path): def __init__(self, src_path):