from os.path import join from secrets import token_urlsafe from collections import namedtuple from abc import ABC, abstractmethod from .pipe_reader_thread import PipeReaderThread from .pipe_writer_thread import PipeWriterThread from .pipe import Pipe class PipeIOServer(ABC): def __init__(self, in_pipe=None, out_pipe=None): self.in_pipe, self.out_pipe = in_pipe, out_pipe self._io_threads = None self._create_pipes() self._init_io_threads() def _create_pipes(self): if not self.in_pipe or not self.out_pipe: pipe_id = token_urlsafe(6) self.in_pipe = join('/tmp', f'in_pipe_{pipe_id}') self.out_pipe = join('/tmp', f'out_pipe_{pipe_id}') Pipe(self.in_pipe).recreate() Pipe(self.out_pipe).recreate() def _init_io_threads(self): io_threads_dict = { 'reader': PipeReaderThread(self.in_pipe, self.handle_message), 'writer': PipeWriterThread(self.out_pipe) } IOThreadsTuple = namedtuple('IOThreadsTuple', sorted(io_threads_dict.keys())) self._io_threads = IOThreadsTuple(**io_threads_dict) @abstractmethod def handle_message(self, message): raise NotImplementedError() def send(self, message): self._io_threads.writer.write(message) def run(self): for thread in self._io_threads: thread.start() def stop(self): for thread in self._io_threads: if thread.isAlive(): thread.stop() Pipe(self.in_pipe).remove() Pipe(self.out_pipe).remove()