diff --git a/pipe_io_server/deque.py b/pipe_io_server/deque.py new file mode 100644 index 0000000..b2f1ab4 --- /dev/null +++ b/pipe_io_server/deque.py @@ -0,0 +1,27 @@ +from collections import deque +from threading import Lock, Condition + + +class Deque: + def __init__(self): + self._queue = deque() + + self._mutex = Lock() + self._not_empty = Condition(self._mutex) + + def pop(self): + with self._mutex: + while not self._queue: + self._not_empty.wait() + return self._queue.pop() + + def push(self, item): + self._push(item, self._queue.appendleft) + + def push_front(self, item): + self._push(item, self._queue.append) + + def _push(self, item, put_method): + with self._mutex: + put_method(item) + self._not_empty.notify() diff --git a/pipe_io_server/pipe_writer_thread.py b/pipe_io_server/pipe_writer_thread.py index 0a16e3d..3be90f9 100644 --- a/pipe_io_server/pipe_writer_thread.py +++ b/pipe_io_server/pipe_writer_thread.py @@ -1,10 +1,10 @@ from contextlib import suppress from os import O_NONBLOCK, O_RDONLY, close from os import open as osopen -from queue import Queue from threading import Thread from .terminate_process_on_failure import terminate_process_on_failure +from .deque import Deque class PipeWriterThread(Thread): @@ -12,16 +12,16 @@ class PipeWriterThread(Thread): super().__init__() self._pipe_path = pipe_path self._stop_event = stop_event - self._write_queue = Queue() + self._write_queue = Deque() def write(self, message): - self._write_queue.put(message, block=False) + self._write_queue.push(message) @terminate_process_on_failure def run(self): with self._open() as pipe: while True: - message = self._write_queue.get(block=True) + message = self._write_queue.pop() if message is None: self._stop_event.set() break @@ -32,8 +32,7 @@ class PipeWriterThread(Thread): try: # pipe was reopened, close() flushed the message pipe.close() except BrokenPipeError: # close() discarded the message - # TODO: a message is lost here, implement thread safe, blocking deque - pass + self._write_queue.push_front(message) pipe = self._open() def _open(self): @@ -44,7 +43,7 @@ class PipeWriterThread(Thread): self.join() def unblock(self): - self._write_queue.put(None) + self._write_queue.push_front(None) with suppress(OSError): fd = osopen(self._pipe_path, O_RDONLY | O_NONBLOCK) close(fd)