77 lines
2.5 KiB
Python
77 lines
2.5 KiB
Python
import logging
|
|
from stat import S_ISFIFO
|
|
from os import access, mkdir, stat, R_OK
|
|
from os.path import exists
|
|
|
|
from pipe_io_server import PipeWriterServer
|
|
|
|
from tfw.internals.inotify import InotifyObserver, InotifyFileCreatedEvent, InotifyFileDeletedEvent
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class PipeConnector:
|
|
reader_pattern, writer_pattern = 'send', 'recv'
|
|
|
|
def __init__(self, path):
|
|
self.recv_pipes, self.send_pipes = {}, {}
|
|
self.observer = self.build_observer(path)
|
|
self.observer.on_any_event = self._on_any_event
|
|
self.observer.start()
|
|
|
|
def build_observer(self, path): # pylint: disable=no-self-use
|
|
if not exists(path):
|
|
mkdir(path)
|
|
if not access(path, R_OK):
|
|
raise ValueError('Path does not exist or is not accessible.')
|
|
return InotifyObserver(path, patterns=[f'*{self.reader_pattern}*', f'*{self.writer_pattern}*'])
|
|
|
|
def _on_any_event(self, event):
|
|
path = event.src_path
|
|
if self._is_pipe(path) and isinstance(event, InotifyFileCreatedEvent):
|
|
self._on_create_pipe(path)
|
|
LOG.debug('Connected to new pipe "%s"', path)
|
|
elif isinstance(event, InotifyFileDeletedEvent):
|
|
self._on_delete_pipe(path)
|
|
LOG.debug('Disconnected from deleted pipe "%s"', path)
|
|
|
|
@staticmethod
|
|
def _is_pipe(path):
|
|
return exists(path) and S_ISFIFO(stat(path).st_mode)
|
|
|
|
def _on_create_pipe(self, path):
|
|
if self._find_pipe(path):
|
|
return
|
|
server = None
|
|
if self.writer_pattern in path:
|
|
pipes, server = self.recv_pipes, self.build_writer(path)
|
|
elif self.reader_pattern in path:
|
|
pipes, server = self.send_pipes, self.build_reader(path)
|
|
if server:
|
|
server.start()
|
|
pipes[path] = server
|
|
|
|
def _find_pipe(self, path):
|
|
pipes = None
|
|
if path in self.recv_pipes.keys():
|
|
pipes = self.recv_pipes
|
|
if path in self.send_pipes.keys():
|
|
pipes = self.send_pipes
|
|
return pipes
|
|
|
|
def build_reader(self, path): # pylint: disable=no-self-use
|
|
raise NotImplementedError()
|
|
|
|
def build_writer(self, path): # pylint: disable=no-self-use
|
|
return PipeWriterServer(path, manage_pipes=False)
|
|
|
|
def _on_delete_pipe(self, path):
|
|
pipes = self._find_pipe(path)
|
|
if pipes:
|
|
pipes[path].stop()
|
|
del pipes[path]
|
|
|
|
def broadcast(self, message):
|
|
for server in self.recv_pipes.values():
|
|
server.send_message(message)
|