from threading import Thread, Event from .pipe_reader_thread import PipeReaderThread from .pipe_writer_thread import PipeWriterThread from .pipe import Pipe from .terminate_process_on_failure import terminate_process_on_failure class PipeIOServer(Thread): def __init__(self, in_pipe, out_pipe, permissions=0o600): super().__init__(daemon=True) self._in_pipe, self._out_pipe = in_pipe, out_pipe self._create_pipes(permissions) self._stop_event = Event() self._reader_thread, self._writer_thread = None, None self._io_threads = None def _create_pipes(self, permissions): Pipe(self.in_pipe).recreate(permissions) Pipe(self.out_pipe).recreate(permissions) @property def in_pipe(self): return self._in_pipe @property def out_pipe(self): return self._out_pipe def handle_message(self, message): raise NotImplementedError() def send_message(self, message): self._writer_thread.write(message) @terminate_process_on_failure def run(self): self._init_io_threads() for thread in self._io_threads: thread.start() self._stop_event.wait() self._stop_threads() def _init_io_threads(self): self._reader_thread = PipeReaderThread( self.in_pipe, self._stop_event, self.handle_message ) self._writer_thread = PipeWriterThread( self.out_pipe, self._stop_event ) self._io_threads = (self._reader_thread, self._writer_thread) def stop(self): self._stop_event.set() if self.is_alive(): self.join() def _stop_threads(self): for thread in self._io_threads: if thread.is_alive(): thread.stop() Pipe(self.in_pipe).remove() Pipe(self.out_pipe).remove() self.on_stop() def on_stop(self): pass def wait(self): self._stop_event.wait() def __enter__(self): self.start() return self def __exit__(self, type_, value, tb): self.stop()