from contextlib import suppress from os import open as osopen from os import write, close, O_WRONLY, O_NONBLOCK from threading import Thread from .terminate_process_on_failure import terminate_process_on_failure class PipeReaderThread(Thread): eof = b'' stop_sequence = b'stop_reading' def __init__(self, pipe_path, stop_event, message_handler): super().__init__() self._message_handler = message_handler self._pipe_path = pipe_path self._stop_event = stop_event @terminate_process_on_failure def run(self): with open(self._pipe_path, 'rb') as pipe: while True: message = pipe.readline().rstrip() if message in (self.eof, self.stop_sequence): self._stop_event.set() break self._message_handler(message) def stop(self): self.unblock() self.join() def unblock(self): with suppress(OSError): fd = osopen(self._pipe_path, O_WRONLY | O_NONBLOCK) write(fd, self.stop_sequence + b'\n') close(fd)