from contextlib import suppress from os import O_NONBLOCK, O_RDONLY, close from os import open as osopen from queue import Queue from threading import Thread from .terminate_process_on_failure import terminate_process_on_failure class PipeWriterThread(Thread): def __init__(self, pipe_path, stop_event): super().__init__() self._pipe_path = pipe_path self._stop_event = stop_event self._write_queue = Queue() def write(self, message): self._write_queue.put(message, block=True) @terminate_process_on_failure def run(self): try: with open(self._pipe_path, 'wb') as pipe: while True: message = self._write_queue.get(block=True) if message is None: self._stop_event.set() break pipe.write(message + b'\n') pipe.flush() except BrokenPipeError: self._stop_event.set() def stop(self): self.unblock() self.join() def unblock(self): self._write_queue.put(None) with suppress(OSError): fd = osopen(self._pipe_path, O_RDONLY | O_NONBLOCK) close(fd)