from contextlib import suppress from os import open as osopen from os import write, close, O_WRONLY, O_NONBLOCK from threading import Thread from .terminate_process_on_failure import terminate_process_on_failure class PipeReaderThread(Thread): eof = b'' stop_sequence = b'stop_reading\n' def __init__(self, pipe_path, stop_event, message_handler): super().__init__(daemon=True) self._message_handler = message_handler self._pipe_path = pipe_path self._stop_event = stop_event @terminate_process_on_failure def run(self): with self._open() as pipe: while True: message = pipe.readline() if message == self.stop_sequence: self._stop_event.set() break if message == self.eof: self._open().close() continue self._message_handler(message[:-1]) def _open(self): return open(self._pipe_path, 'rb') def stop(self): while self.is_alive(): self._unblock() self.join() def _unblock(self): with suppress(OSError): fd = osopen(self._pipe_path, O_WRONLY | O_NONBLOCK) write(fd, self.stop_sequence) close(fd)