from contextlib import suppress from os import O_NONBLOCK, O_RDONLY, close from os import open as osopen from threading import Thread from .terminate_process_on_failure import terminate_process_on_failure from .deque import Deque class PipeWriterThread(Thread): def __init__(self, pipe_path, stop_event): super().__init__(daemon=True) self._pipe_path = pipe_path self._stop_event = stop_event self._write_queue = Deque() def write(self, message): self._write_queue.push(message) @terminate_process_on_failure def run(self): with self._open() as pipe: while True: message = self._write_queue.pop() if message is None: self._stop_event.set() break try: pipe.write(message + b'\n') pipe.flush() except BrokenPipeError: try: # pipe was reopened, close() flushed the message pipe.close() except BrokenPipeError: # close() discarded the message self._write_queue.push_front(message) pipe = self._open() def _open(self): return open(self._pipe_path, 'wb') def stop(self): while self.is_alive(): self._unblock() self.join() def _unblock(self): with suppress(OSError): fd = osopen(self._pipe_path, O_RDONLY | O_NONBLOCK) self._write_queue.push_front(None) close(fd)