Implement thread-safe, blocking Deque class for reopen consistency
This commit is contained in:
		@@ -1,10 +1,10 @@
 | 
			
		||||
from contextlib import suppress
 | 
			
		||||
from os import O_NONBLOCK, O_RDONLY, close
 | 
			
		||||
from os import open as osopen
 | 
			
		||||
from queue import Queue
 | 
			
		||||
from threading import Thread
 | 
			
		||||
 | 
			
		||||
from .terminate_process_on_failure import terminate_process_on_failure
 | 
			
		||||
from .deque import Deque
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipeWriterThread(Thread):
 | 
			
		||||
@@ -12,16 +12,16 @@ class PipeWriterThread(Thread):
 | 
			
		||||
        super().__init__()
 | 
			
		||||
        self._pipe_path = pipe_path
 | 
			
		||||
        self._stop_event = stop_event
 | 
			
		||||
        self._write_queue = Queue()
 | 
			
		||||
        self._write_queue = Deque()
 | 
			
		||||
 | 
			
		||||
    def write(self, message):
 | 
			
		||||
        self._write_queue.put(message, block=False)
 | 
			
		||||
        self._write_queue.push(message)
 | 
			
		||||
 | 
			
		||||
    @terminate_process_on_failure
 | 
			
		||||
    def run(self):
 | 
			
		||||
        with self._open() as pipe:
 | 
			
		||||
            while True:
 | 
			
		||||
                message = self._write_queue.get(block=True)
 | 
			
		||||
                message = self._write_queue.pop()
 | 
			
		||||
                if message is None:
 | 
			
		||||
                    self._stop_event.set()
 | 
			
		||||
                    break
 | 
			
		||||
@@ -32,8 +32,7 @@ class PipeWriterThread(Thread):
 | 
			
		||||
                    try:  # pipe was reopened, close() flushed the message
 | 
			
		||||
                        pipe.close()
 | 
			
		||||
                    except BrokenPipeError:  # close() discarded the message
 | 
			
		||||
                        # TODO: a message is lost here, implement thread safe, blocking deque
 | 
			
		||||
                        pass
 | 
			
		||||
                        self._write_queue.push_front(message)
 | 
			
		||||
                    pipe = self._open()
 | 
			
		||||
 | 
			
		||||
    def _open(self):
 | 
			
		||||
@@ -44,7 +43,7 @@ class PipeWriterThread(Thread):
 | 
			
		||||
        self.join()
 | 
			
		||||
 | 
			
		||||
    def unblock(self):
 | 
			
		||||
        self._write_queue.put(None)
 | 
			
		||||
        self._write_queue.push_front(None)
 | 
			
		||||
        with suppress(OSError):
 | 
			
		||||
            fd = osopen(self._pipe_path, O_RDONLY | O_NONBLOCK)
 | 
			
		||||
            close(fd)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user