From daabc6dd8e56385c9897f8d6402636ab0cd3e52a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Thu, 2 May 2019 23:07:32 +0200 Subject: [PATCH] Add support for reopening closed pipes with consistency guarantees --- pipe_io_server/pipe_reader_thread.py | 11 +++++++++-- pipe_io_server/pipe_writer_thread.py | 28 ++++++++++++++++++---------- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/pipe_io_server/pipe_reader_thread.py b/pipe_io_server/pipe_reader_thread.py index 3ba6bdc..85e9a2e 100644 --- a/pipe_io_server/pipe_reader_thread.py +++ b/pipe_io_server/pipe_reader_thread.py @@ -18,14 +18,21 @@ class PipeReaderThread(Thread): @terminate_process_on_failure def run(self): - with open(self._pipe_path, 'rb') as pipe: + with self._open() as pipe: while True: message = pipe.readline() - if message in (self.eof, self.stop_sequence): + if message == self.stop_sequence: self._stop_event.set() break + if message == self.eof: + pipe.close() + pipe = self._open() + continue self._message_handler(message[:-1]) + def _open(self): + return open(self._pipe_path, 'rb') + def stop(self): self.unblock() self.join() diff --git a/pipe_io_server/pipe_writer_thread.py b/pipe_io_server/pipe_writer_thread.py index c73a057..0a16e3d 100644 --- a/pipe_io_server/pipe_writer_thread.py +++ b/pipe_io_server/pipe_writer_thread.py @@ -15,21 +15,29 @@ class PipeWriterThread(Thread): self._write_queue = Queue() def write(self, message): - self._write_queue.put(message, block=True) + self._write_queue.put(message, block=False) @terminate_process_on_failure def run(self): - try: - with open(self._pipe_path, 'wb') as pipe: - while True: - message = self._write_queue.get(block=True) - if message is None: - self._stop_event.set() - break + with self._open() as pipe: + while True: + message = self._write_queue.get(block=True) + if message is None: + self._stop_event.set() + break + try: pipe.write(message + b'\n') pipe.flush() - except BrokenPipeError: - self._stop_event.set() + except BrokenPipeError: + try: # pipe was reopened, close() flushed the message + pipe.close() + except BrokenPipeError: # close() discarded the message + # TODO: a message is lost here, implement thread safe, blocking deque + pass + pipe = self._open() + + def _open(self): + return open(self._pipe_path, 'wb') def stop(self): self.unblock()