Add support for reopening closed pipes with consistency guarantees
This commit is contained in:
		@@ -18,14 +18,21 @@ class PipeReaderThread(Thread):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @terminate_process_on_failure
 | 
					    @terminate_process_on_failure
 | 
				
			||||||
    def run(self):
 | 
					    def run(self):
 | 
				
			||||||
        with open(self._pipe_path, 'rb') as pipe:
 | 
					        with self._open() as pipe:
 | 
				
			||||||
            while True:
 | 
					            while True:
 | 
				
			||||||
                message = pipe.readline()
 | 
					                message = pipe.readline()
 | 
				
			||||||
                if message in (self.eof, self.stop_sequence):
 | 
					                if message == self.stop_sequence:
 | 
				
			||||||
                    self._stop_event.set()
 | 
					                    self._stop_event.set()
 | 
				
			||||||
                    break
 | 
					                    break
 | 
				
			||||||
 | 
					                if message == self.eof:
 | 
				
			||||||
 | 
					                    pipe.close()
 | 
				
			||||||
 | 
					                    pipe = self._open()
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
                self._message_handler(message[:-1])
 | 
					                self._message_handler(message[:-1])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _open(self):
 | 
				
			||||||
 | 
					        return open(self._pipe_path, 'rb')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def stop(self):
 | 
					    def stop(self):
 | 
				
			||||||
        self.unblock()
 | 
					        self.unblock()
 | 
				
			||||||
        self.join()
 | 
					        self.join()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -15,21 +15,29 @@ class PipeWriterThread(Thread):
 | 
				
			|||||||
        self._write_queue = Queue()
 | 
					        self._write_queue = Queue()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def write(self, message):
 | 
					    def write(self, message):
 | 
				
			||||||
        self._write_queue.put(message, block=True)
 | 
					        self._write_queue.put(message, block=False)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @terminate_process_on_failure
 | 
					    @terminate_process_on_failure
 | 
				
			||||||
    def run(self):
 | 
					    def run(self):
 | 
				
			||||||
        try:
 | 
					        with self._open() as pipe:
 | 
				
			||||||
            with open(self._pipe_path, 'wb') as pipe:
 | 
					            while True:
 | 
				
			||||||
                while True:
 | 
					                message = self._write_queue.get(block=True)
 | 
				
			||||||
                    message = self._write_queue.get(block=True)
 | 
					                if message is None:
 | 
				
			||||||
                    if message is None:
 | 
					                    self._stop_event.set()
 | 
				
			||||||
                        self._stop_event.set()
 | 
					                    break
 | 
				
			||||||
                        break
 | 
					                try:
 | 
				
			||||||
                    pipe.write(message + b'\n')
 | 
					                    pipe.write(message + b'\n')
 | 
				
			||||||
                    pipe.flush()
 | 
					                    pipe.flush()
 | 
				
			||||||
        except BrokenPipeError:
 | 
					                except BrokenPipeError:
 | 
				
			||||||
            self._stop_event.set()
 | 
					                    try:  # pipe was reopened, close() flushed the message
 | 
				
			||||||
 | 
					                        pipe.close()
 | 
				
			||||||
 | 
					                    except BrokenPipeError:  # close() discarded the message
 | 
				
			||||||
 | 
					                        # TODO: a message is lost here, implement thread safe, blocking deque
 | 
				
			||||||
 | 
					                        pass
 | 
				
			||||||
 | 
					                    pipe = self._open()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _open(self):
 | 
				
			||||||
 | 
					        return open(self._pipe_path, 'wb')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def stop(self):
 | 
					    def stop(self):
 | 
				
			||||||
        self.unblock()
 | 
					        self.unblock()
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user