from os.path import join from secrets import token_urlsafe from abc import ABC, abstractmethod from threading import Event from .pipe_reader_thread import PipeReaderThread from .pipe_writer_thread import PipeWriterThread from .pipe import Pipe class PipeIOServer(ABC): def __init__(self, in_pipe=None, out_pipe=None): self._in_pipe, self._out_pipe = in_pipe, out_pipe self._create_pipes() self._stop_event = Event() self._reader_thread, self._writer_thread = self._create_io_threads() self._io_threads = (self._reader_thread, self._writer_thread) def _create_pipes(self): if not self.in_pipe or not self.out_pipe: pipe_id = token_urlsafe(6) self._in_pipe = join('/tmp', f'in_pipe_{pipe_id}') self._out_pipe = join('/tmp', f'out_pipe_{pipe_id}') Pipe(self.in_pipe).recreate() Pipe(self.out_pipe).recreate() @property def in_pipe(self): return self._in_pipe @property def out_pipe(self): return self._out_pipe def _create_io_threads(self): reader_thread = PipeReaderThread(self.in_pipe, self._stop_event, self.handle_message) writer_thread = PipeWriterThread(self.out_pipe, self._stop_event) return reader_thread, writer_thread @abstractmethod def handle_message(self, message): raise NotImplementedError() def send(self, message): self._writer_thread.write(message) def run(self): for thread in self._io_threads: thread.start() self._stop_event.wait() self._stop() def stop(self): self._stop_event.set() def _stop(self): for thread in self._io_threads: if thread.is_alive(): thread.stop() Pipe(self.in_pipe).remove() Pipe(self.out_pipe).remove()