baseimage-tutorial-framework/tfw/components/pipe_io/pipe_connector/pipe_connector.py

91 lines
3.0 KiB
Python

import logging
from stat import S_ISFIFO
from os import access, listdir, mkdir, stat, R_OK
from os.path import exists, join
from pipe_io_server import PipeWriterServer
from tfw.internals.inotify import InotifyObserver, InotifyFileCreatedEvent, InotifyFileDeletedEvent
LOG = logging.getLogger(__name__)
class PipeConnector:
reader_pattern, writer_pattern = 'send', 'recv'
def __init__(self, path):
self.recv_pipes, self.send_pipes = {}, {}
self.observer = self.build_observer(path)
self.observer.on_any_event = self._on_any_event
self.observer.start()
self._connect_existing_pipes(path)
def build_observer(self, path): # pylint: disable=no-self-use
if not exists(path):
mkdir(path)
if not access(path, R_OK):
raise ValueError('Path does not exist or is not accessible.')
return InotifyObserver(path, patterns=[f'*{self.reader_pattern}*', f'*{self.writer_pattern}*'])
def _connect_existing_pipes(self, path):
for node in listdir(path):
node_path = join(path, node)
if self._is_pipe(node_path):
self._create_pipe(node_path)
LOG.debug('Connected to existing pipe "%s"', node_path)
def _on_any_event(self, event):
path = event.src_path
if self._is_pipe(path) and isinstance(event, InotifyFileCreatedEvent):
self._create_pipe(path)
LOG.debug('Connected to new pipe "%s"', path)
elif isinstance(event, InotifyFileDeletedEvent):
self._delete_pipe(path)
LOG.debug('Disconnected from deleted pipe "%s"', path)
@staticmethod
def _is_pipe(path):
return exists(path) and S_ISFIFO(stat(path).st_mode)
def _create_pipe(self, path):
if self._find_pipe(path):
return
server = None
if self.writer_pattern in path:
pipes, server = self.recv_pipes, self.build_writer(path)
elif self.reader_pattern in path:
pipes, server = self.send_pipes, self.build_reader(path)
if server:
server.start()
pipes[path] = server
def _find_pipe(self, path):
pipes = None
if path in self.recv_pipes.keys():
pipes = self.recv_pipes
if path in self.send_pipes.keys():
pipes = self.send_pipes
return pipes
def build_reader(self, path): # pylint: disable=no-self-use
raise NotImplementedError()
def build_writer(self, path): # pylint: disable=no-self-use
return PipeWriterServer(path, manage_pipes=False)
def _delete_pipe(self, path):
pipes = self._find_pipe(path)
if pipes:
pipes[path].stop()
del pipes[path]
def broadcast(self, message):
for server in self.recv_pipes.values():
server.send_message(message)
def stop(self):
for pipe in self.recv_pipes.values():
pipe.stop()
for pipe in self.send_pipes.values():
pipe.stop()