Rework PipeIOServer to use "keep pipes open" model with robust unblocking
This commit is contained in:
		@@ -2,13 +2,13 @@ from threading import Thread
 | 
			
		||||
from queue import Queue
 | 
			
		||||
 | 
			
		||||
from .terminate_process_on_failure import terminate_process_on_failure
 | 
			
		||||
from .pipe import Pipe
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipeWriterThread(Thread):
 | 
			
		||||
    def __init__(self, pipe_path):
 | 
			
		||||
    def __init__(self, pipe_path, stop_event):
 | 
			
		||||
        super().__init__()
 | 
			
		||||
        self._pipe_path = pipe_path
 | 
			
		||||
        self._stop_event = stop_event
 | 
			
		||||
        self._write_queue = Queue()
 | 
			
		||||
 | 
			
		||||
    def write(self, message):
 | 
			
		||||
@@ -16,12 +16,22 @@ class PipeWriterThread(Thread):
 | 
			
		||||
 | 
			
		||||
    @terminate_process_on_failure
 | 
			
		||||
    def run(self):
 | 
			
		||||
        while True:
 | 
			
		||||
            message = self._write_queue.get(block=True)
 | 
			
		||||
            if message is None:
 | 
			
		||||
                break
 | 
			
		||||
            Pipe(self._pipe_path).write(message)
 | 
			
		||||
        try:
 | 
			
		||||
            with open(self._pipe_path, 'w') as pipe:
 | 
			
		||||
                while True:
 | 
			
		||||
                    message = self._write_queue.get(block=True)
 | 
			
		||||
                    if message is None:
 | 
			
		||||
                        self._stop_event.set()
 | 
			
		||||
                        break
 | 
			
		||||
                    pipe.write(f'{message}\n')
 | 
			
		||||
                    pipe.flush()
 | 
			
		||||
        except BrokenPipeError:
 | 
			
		||||
            self._stop_event.set()
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        self._write_queue.put(None)
 | 
			
		||||
        self.unblock()
 | 
			
		||||
        self.join()
 | 
			
		||||
 | 
			
		||||
    def unblock(self):
 | 
			
		||||
        self._write_queue.put(None)
 | 
			
		||||
        open(self._pipe_path, 'r').close()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user