diff --git a/pipe_io_server/__init__.py b/pipe_io_server/__init__.py index 234d387..3de15dc 100644 --- a/pipe_io_server/__init__.py +++ b/pipe_io_server/__init__.py @@ -1 +1,3 @@ from .pipe_io_server import PipeIOServer +from .pipe_reader_server import PipeReaderServer +from .pipe_writer_server import PipeWriterServer diff --git a/pipe_io_server/pipe.py b/pipe_io_server/pipe.py index eb021ae..f83664a 100644 --- a/pipe_io_server/pipe.py +++ b/pipe_io_server/pipe.py @@ -1,6 +1,8 @@ from os import mkfifo, remove, chmod from os.path import exists +DEFAULT_PERMISSIONS = 0o600 + class Pipe: def __init__(self, path): diff --git a/pipe_io_server/pipe_io_server.py b/pipe_io_server/pipe_io_server.py index 9d28083..02fff8b 100644 --- a/pipe_io_server/pipe_io_server.py +++ b/pipe_io_server/pipe_io_server.py @@ -1,80 +1,17 @@ -from threading import Thread, Event - -from .pipe_reader_thread import PipeReaderThread -from .pipe_writer_thread import PipeWriterThread -from .pipe import Pipe -from .terminate_process_on_failure import terminate_process_on_failure +from .pipe import DEFAULT_PERMISSIONS +from .pipe_reader_server import PipeReaderServer +from .pipe_writer_server import PipeWriterServer -class PipeIOServer(Thread): - def __init__(self, in_pipe, out_pipe, permissions=0o600): - super().__init__(daemon=True) - self._in_pipe, self._out_pipe = in_pipe, out_pipe - self._create_pipes(permissions) - self._stop_event = Event() - self._reader_thread, self._writer_thread = None, None - self._io_threads = None - - def _create_pipes(self, permissions): - Pipe(self.in_pipe).recreate(permissions) - Pipe(self.out_pipe).recreate(permissions) - - @property - def in_pipe(self): - return self._in_pipe - - @property - def out_pipe(self): - return self._out_pipe - - def handle_message(self, message): - raise NotImplementedError() - - def send_message(self, message): - self._writer_thread.write(message) - - @terminate_process_on_failure - def run(self): - self._init_io_threads() - for thread in self._io_threads: - thread.start() - self._stop_event.wait() - self._stop_threads() - - def _init_io_threads(self): - self._reader_thread = PipeReaderThread( - self.in_pipe, - self._stop_event, - self.handle_message +class PipeIOServer(PipeReaderServer, PipeWriterServer): + # pylint: disable=abstract-method + def __init__(self, in_pipe, out_pipe, permissions=DEFAULT_PERMISSIONS): + super().__init__( + in_pipe=in_pipe, + out_pipe=out_pipe, + permissions=permissions ) - self._writer_thread = PipeWriterThread( - self.out_pipe, - self._stop_event - ) - self._io_threads = (self._reader_thread, self._writer_thread) - def stop(self): - self._stop_event.set() - if self.is_alive(): - self.join() - - def _stop_threads(self): - for thread in self._io_threads: - if thread.is_alive(): - thread.stop() - Pipe(self.in_pipe).remove() - Pipe(self.out_pipe).remove() - self.on_stop() - - def on_stop(self): - pass - - def wait(self): - self._stop_event.wait() - - def __enter__(self): - self.start() - return self - - def __exit__(self, type_, value, tb): - self.stop() + def _init_io_thread(self): + PipeReaderServer._init_io_thread(self) + PipeWriterServer._init_io_thread(self) diff --git a/pipe_io_server/pipe_io_thread.py b/pipe_io_server/pipe_io_thread.py new file mode 100644 index 0000000..868cb79 --- /dev/null +++ b/pipe_io_server/pipe_io_thread.py @@ -0,0 +1,45 @@ +from threading import Thread, Event + +from .terminate_process_on_failure import terminate_process_on_failure + + +class PipeIOThread(Thread): + def __init__(self): + super().__init__(daemon=True) + self._stop_event = Event() + self._io_threads = [] + + @terminate_process_on_failure + def run(self): + self._init_io_thread() + for thread in self._io_threads: + thread.start() + self._stop_event.wait() + self._stop_threads() + + def _init_io_thread(self): + raise NotImplementedError() + + def _stop_threads(self): + for thread in self._io_threads: + if thread.is_alive(): + thread.stop() + self.on_stop() + + def on_stop(self): + pass + + def stop(self): + self._stop_event.set() + if self.is_alive(): + self.join() + + def wait(self): + self._stop_event.wait() + + def __enter__(self): + self.start() + return self + + def __exit__(self, type_, value, tb): + self.stop() diff --git a/pipe_io_server/pipe_reader_server.py b/pipe_io_server/pipe_reader_server.py new file mode 100644 index 0000000..c0b8312 --- /dev/null +++ b/pipe_io_server/pipe_reader_server.py @@ -0,0 +1,30 @@ +from .pipe import Pipe, DEFAULT_PERMISSIONS +from .pipe_io_thread import PipeIOThread +from .pipe_reader_thread import PipeReaderThread + + +class PipeReaderServer(PipeIOThread): + def __init__(self, in_pipe, permissions=DEFAULT_PERMISSIONS, **kwargs): + super().__init__(**kwargs) + self._in_pipe = in_pipe + Pipe(self.in_pipe).recreate(permissions) + self._reader_thread = None + + @property + def in_pipe(self): + return self._in_pipe + + def _init_io_thread(self): + self._reader_thread = PipeReaderThread( + self.in_pipe, + self._stop_event, + self.handle_message + ) + self._io_threads.append(self._reader_thread) + + def handle_message(self, message): + raise NotImplementedError() + + def stop(self): + super().stop() + Pipe(self.in_pipe).remove() diff --git a/pipe_io_server/pipe_writer_server.py b/pipe_io_server/pipe_writer_server.py new file mode 100644 index 0000000..5c483ca --- /dev/null +++ b/pipe_io_server/pipe_writer_server.py @@ -0,0 +1,29 @@ +from .pipe import Pipe, DEFAULT_PERMISSIONS +from .pipe_io_thread import PipeIOThread +from .pipe_writer_thread import PipeWriterThread + + +class PipeWriterServer(PipeIOThread): + def __init__(self, out_pipe, permissions=DEFAULT_PERMISSIONS, **kwargs): + super().__init__(**kwargs) + self._out_pipe = out_pipe + Pipe(self.out_pipe).recreate(permissions) + self._writer_thread = None + + @property + def out_pipe(self): + return self._out_pipe + + def _init_io_thread(self): + self._writer_thread = PipeWriterThread( + self.out_pipe, + self._stop_event + ) + self._io_threads.append(self._writer_thread) + + def send_message(self, message): + self._writer_thread.write(message) + + def stop(self): + super().stop() + Pipe(self.out_pipe).remove()