diff --git a/pipe_io_server/pipe_reader_thread.py b/pipe_io_server/pipe_reader_thread.py index 4bce19d..487fa64 100644 --- a/pipe_io_server/pipe_reader_thread.py +++ b/pipe_io_server/pipe_reader_thread.py @@ -1,44 +1,50 @@ -from contextlib import suppress -from os import open as osopen -from os import write, close, O_WRONLY, O_NONBLOCK +# pylint: disable=redefined-builtin,too-many-instance-attributes +from contextlib import contextmanager +from os import O_NONBLOCK, O_RDONLY, open, fdopen, close, write, pipe from threading import Thread +from select import select from .terminate_process_on_failure import terminate_process_on_failure class PipeReaderThread(Thread): - eof = b'' - stop_sequence = b'stop_reading\n' - def __init__(self, pipe_path, stop_event, message_handler): super().__init__(daemon=True) self._message_handler = message_handler self._pipe_path = pipe_path + self._read_fd, self._read_fp = None, None + self._stop_signal_rfd, self._stop_signal_wfd = pipe() + self._msg_buf = b'' self._stop_event = stop_event @terminate_process_on_failure def run(self): - with self._open() as pipe: + with self._open(): while True: - message = pipe.readline() - if message == self.stop_sequence: + can_read, _, _ = select([self._read_fd, self._stop_signal_rfd], [], []) + if self._stop_signal_rfd in can_read: self._stop_event.set() break - if message == self.eof: - self._open().close() - continue - self._message_handler(message[:-1]) + for msg in iter(self._read_fp.readline, b''): + self._msg_buf += msg + if self._msg_buf.endswith(b'\n'): + self._message_handler(self._msg_buf[:-1]) + self._msg_buf = b'' + @contextmanager def _open(self): - return open(self._pipe_path, 'rb') + self._read_fd = open(self._pipe_path, O_RDONLY | O_NONBLOCK) + self._read_fp = fdopen(self._read_fd, 'rb') + yield + self._read_fp.close() + close(self._stop_signal_rfd) + close(self._stop_signal_wfd) + def stop(self): - while self.is_alive(): + if self.is_alive(): self._unblock() self.join() def _unblock(self): - with suppress(OSError): - fd = osopen(self._pipe_path, O_WRONLY | O_NONBLOCK) - write(fd, self.stop_sequence) - close(fd) + write(self._stop_signal_wfd, b'1') diff --git a/pipe_io_server/pipe_writer_thread.py b/pipe_io_server/pipe_writer_thread.py index 3bc3fe7..822c868 100644 --- a/pipe_io_server/pipe_writer_thread.py +++ b/pipe_io_server/pipe_writer_thread.py @@ -1,6 +1,6 @@ -from contextlib import suppress -from os import O_NONBLOCK, O_RDONLY, close -from os import open as osopen +# pylint: disable=redefined-builtin +from contextlib import suppress, contextmanager +from os import O_NONBLOCK, O_RDONLY, O_WRONLY, open, close, write, read from threading import Thread from .terminate_process_on_failure import terminate_process_on_failure @@ -11,6 +11,7 @@ class PipeWriterThread(Thread): def __init__(self, pipe_path, stop_event): super().__init__(daemon=True) self._pipe_path = pipe_path + self._write_fd, self._drain_fd = None, None self._stop_event = stop_event self._write_queue = Deque() @@ -19,24 +20,21 @@ class PipeWriterThread(Thread): @terminate_process_on_failure def run(self): - with self._open() as pipe: + with self._open(): while True: message = self._write_queue.pop() if message is None: self._stop_event.set() break - try: - pipe.write(message + b'\n') - pipe.flush() - except BrokenPipeError: - try: # pipe was reopened, close() flushed the message - pipe.close() - except BrokenPipeError: # close() discarded the message - self._write_queue.push_front(message) - pipe = self._open() + write(self._write_fd, message + b'\n') + @contextmanager def _open(self): - return open(self._pipe_path, 'wb') + self._drain_fd = open(self._pipe_path, O_RDONLY | O_NONBLOCK) + self._write_fd = open(self._pipe_path, O_WRONLY) + yield + close(self._write_fd) + close(self._drain_fd) def stop(self): while self.is_alive(): @@ -44,7 +42,7 @@ class PipeWriterThread(Thread): self.join() def _unblock(self): + self._write_queue.push_front(None) with suppress(OSError): - fd = osopen(self._pipe_path, O_RDONLY | O_NONBLOCK) - self._write_queue.push_front(None) - close(fd) + while read(self._drain_fd, 65536): + pass