diff --git a/echo_server.py b/echo_server.py index b5c339c..a8f5a3a 100644 --- a/echo_server.py +++ b/echo_server.py @@ -12,7 +12,7 @@ if __name__ == "__main__": pipe_io = EchoPipeIOServer() signal(SIGTERM, lambda a, b: pipe_io.stop()) signal(SIGINT, lambda a, b: pipe_io.stop()) - pipe_io.run() print('Running pipe IO server with named pipes:') print(f'Input: {pipe_io.in_pipe}') print(f'Output: {pipe_io.out_pipe}') + pipe_io.run() diff --git a/pipe_io_server/pipe.py b/pipe_io_server/pipe.py index b1b2578..82af022 100644 --- a/pipe_io_server/pipe.py +++ b/pipe_io_server/pipe.py @@ -16,11 +16,3 @@ class Pipe: def create(self): mkfifo(self.path) - - def write(self, data): - with open(self.path, 'wb') as pipe: - pipe.write(data) - - def read(self): - with open(self.path, 'rb') as pipe: - return pipe.read() diff --git a/pipe_io_server/pipe_io_server.py b/pipe_io_server/pipe_io_server.py index 33d6772..546de0e 100644 --- a/pipe_io_server/pipe_io_server.py +++ b/pipe_io_server/pipe_io_server.py @@ -1,7 +1,7 @@ from os.path import join from secrets import token_urlsafe -from collections import namedtuple from abc import ABC, abstractmethod +from threading import Event from .pipe_reader_thread import PipeReaderThread from .pipe_writer_thread import PipeWriterThread @@ -11,9 +11,10 @@ from .pipe import Pipe class PipeIOServer(ABC): def __init__(self, in_pipe=None, out_pipe=None): self._in_pipe, self._out_pipe = in_pipe, out_pipe - self._io_threads = None self._create_pipes() - self._init_io_threads() + self._stop_event = Event() + self._reader_thread, self._writer_thread = self._create_io_threads() + self._io_threads = (self._reader_thread, self._writer_thread) def _create_pipes(self): if not self.in_pipe or not self.out_pipe: @@ -31,28 +32,30 @@ class PipeIOServer(ABC): def out_pipe(self): return self._out_pipe - def _init_io_threads(self): - io_threads_dict = { - 'reader': PipeReaderThread(self.in_pipe, self.handle_message), - 'writer': PipeWriterThread(self.out_pipe) - } - IOThreadsTuple = namedtuple('IOThreadsTuple', sorted(io_threads_dict.keys())) - self._io_threads = IOThreadsTuple(**io_threads_dict) + def _create_io_threads(self): + reader_thread = PipeReaderThread(self.in_pipe, self._stop_event, self.handle_message) + writer_thread = PipeWriterThread(self.out_pipe, self._stop_event) + return reader_thread, writer_thread @abstractmethod def handle_message(self, message): raise NotImplementedError() def send(self, message): - self._io_threads.writer.write(message) + self._writer_thread.write(message) def run(self): for thread in self._io_threads: thread.start() + self._stop_event.wait() + self._stop() def stop(self): + self._stop_event.set() + + def _stop(self): for thread in self._io_threads: - if thread.isAlive(): + if thread.is_alive(): thread.stop() Pipe(self.in_pipe).remove() Pipe(self.out_pipe).remove() diff --git a/pipe_io_server/pipe_reader_thread.py b/pipe_io_server/pipe_reader_thread.py index f5bdcf6..70df33e 100644 --- a/pipe_io_server/pipe_reader_thread.py +++ b/pipe_io_server/pipe_reader_thread.py @@ -1,25 +1,37 @@ +from contextlib import suppress +from os import open as osopen +from os import write, close, O_WRONLY, O_NONBLOCK from threading import Thread from .terminate_process_on_failure import terminate_process_on_failure -from .pipe import Pipe class PipeReaderThread(Thread): - _stop_sequence = b'stop_reading' + eof = '' + stop_sequence = 'stop_reading' - def __init__(self, pipe_path, message_handler): + def __init__(self, pipe_path, stop_event, message_handler): super().__init__() self._message_handler = message_handler self._pipe_path = pipe_path + self._stop_event = stop_event @terminate_process_on_failure def run(self): - while True: - message = Pipe(self._pipe_path).read() - if message == self._stop_sequence: - break - self._message_handler(message) + with open(self._pipe_path, 'r') as pipe: + while True: + message = pipe.readline().rstrip() + if message in (self.eof, self.stop_sequence): + self._stop_event.set() + break + self._message_handler(message) def stop(self): - Pipe(self._pipe_path).write(self._stop_sequence) + self.unblock() self.join() + + def unblock(self): + with suppress(OSError): + fd = osopen(self._pipe_path, O_WRONLY | O_NONBLOCK) + write(fd, f'{self.stop_sequence}\n'.encode()) + close(fd) diff --git a/pipe_io_server/pipe_writer_thread.py b/pipe_io_server/pipe_writer_thread.py index 3fa96d0..d0e61c4 100644 --- a/pipe_io_server/pipe_writer_thread.py +++ b/pipe_io_server/pipe_writer_thread.py @@ -2,13 +2,13 @@ from threading import Thread from queue import Queue from .terminate_process_on_failure import terminate_process_on_failure -from .pipe import Pipe class PipeWriterThread(Thread): - def __init__(self, pipe_path): + def __init__(self, pipe_path, stop_event): super().__init__() self._pipe_path = pipe_path + self._stop_event = stop_event self._write_queue = Queue() def write(self, message): @@ -16,12 +16,22 @@ class PipeWriterThread(Thread): @terminate_process_on_failure def run(self): - while True: - message = self._write_queue.get(block=True) - if message is None: - break - Pipe(self._pipe_path).write(message) + try: + with open(self._pipe_path, 'w') as pipe: + while True: + message = self._write_queue.get(block=True) + if message is None: + self._stop_event.set() + break + pipe.write(f'{message}\n') + pipe.flush() + except BrokenPipeError: + self._stop_event.set() def stop(self): - self._write_queue.put(None) + self.unblock() self.join() + + def unblock(self): + self._write_queue.put(None) + open(self._pipe_path, 'r').close()