diff --git a/tfw/components/pipe_io/pipe_io_server/__init__.py b/tfw/components/pipe_io/pipe_io_server/__init__.py index cab334b..b0d450a 100644 --- a/tfw/components/pipe_io/pipe_io_server/__init__.py +++ b/tfw/components/pipe_io/pipe_io_server/__init__.py @@ -1,2 +1,4 @@ from .pipe_io_server import PipeIOServer +from .pipe_reader_server import PipeReaderServer +from .pipe_writer_server import PipeWriterServer from .terminate_process_on_failure import terminate_process_on_failure diff --git a/tfw/components/pipe_io/pipe_io_server/pipe.py b/tfw/components/pipe_io/pipe_io_server/pipe.py index eb021ae..f83664a 100644 --- a/tfw/components/pipe_io/pipe_io_server/pipe.py +++ b/tfw/components/pipe_io/pipe_io_server/pipe.py @@ -1,6 +1,8 @@ from os import mkfifo, remove, chmod from os.path import exists +DEFAULT_PERMISSIONS = 0o600 + class Pipe: def __init__(self, path): diff --git a/tfw/components/pipe_io/pipe_io_server/pipe_io_server.py b/tfw/components/pipe_io/pipe_io_server/pipe_io_server.py index 2715f40..8204adb 100644 --- a/tfw/components/pipe_io/pipe_io_server/pipe_io_server.py +++ b/tfw/components/pipe_io/pipe_io_server/pipe_io_server.py @@ -1,73 +1,25 @@ -from abc import ABC, abstractmethod -from threading import Thread, Event -from typing import Callable - -from .pipe_reader_thread import PipeReaderThread -from .pipe_writer_thread import PipeWriterThread -from .pipe import Pipe -from .terminate_process_on_failure import terminate_process_on_failure +from .pipe import DEFAULT_PERMISSIONS +from .pipe_reader_server import PipeReaderServer +from .pipe_writer_server import PipeWriterServer -class PipeIOServer(ABC, Thread): - def __init__(self, in_pipe=None, out_pipe=None, permissions=0o600): - super().__init__(daemon=True) - self._in_pipe, self._out_pipe = in_pipe, out_pipe - self._create_pipes(permissions) - self._stop_event = Event() - self._reader_thread, self._writer_thread = self._create_io_threads() - self._io_threads = (self._reader_thread, self._writer_thread) - self._on_stop = lambda: None +class PipeIOServer(PipeReaderServer, PipeWriterServer): + # pylint: disable=abstract-method + def __init__( + self, + in_pipe, + out_pipe, + permissions=DEFAULT_PERMISSIONS, + manage_pipes=True + ): + super().__init__( + in_pipe=in_pipe, + out_pipe=out_pipe, + permissions=permissions, + manage_pipes=manage_pipes + ) - def _create_pipes(self, permissions): - Pipe(self.in_pipe).recreate(permissions) - Pipe(self.out_pipe).recreate(permissions) - - @property - def in_pipe(self): - return self._in_pipe - - @property - def out_pipe(self): - return self._out_pipe - - def _create_io_threads(self): - reader_thread = PipeReaderThread(self.in_pipe, self._stop_event, self.handle_message) - writer_thread = PipeWriterThread(self.out_pipe, self._stop_event) - return reader_thread, writer_thread - - @abstractmethod - def handle_message(self, message): - raise NotImplementedError() - - def send_message(self, message): - self._writer_thread.write(message) - - @terminate_process_on_failure - def run(self): - for thread in self._io_threads: - thread.start() - self._stop_event.wait() - self._stop_threads() - - def stop(self): - self._stop_event.set() - if self.is_alive(): - self.join() - - def _stop_threads(self): - for thread in self._io_threads: - if thread.is_alive(): - thread.stop() - Pipe(self.in_pipe).remove() - Pipe(self.out_pipe).remove() - self._on_stop() - - def _set_on_stop(self, value): - if not isinstance(value, Callable): - raise ValueError("Supplied object is not callable!") - self._on_stop = value - - on_stop = property(fset=_set_on_stop) - - def wait(self): - self._stop_event.wait() + def _io_threads(self): + # pylint: disable=no-member + yield from PipeReaderServer._io_threads(self) + yield from PipeWriterServer._io_threads(self) diff --git a/tfw/components/pipe_io/pipe_io_server/pipe_io_thread.py b/tfw/components/pipe_io/pipe_io_server/pipe_io_thread.py new file mode 100644 index 0000000..91a50b4 --- /dev/null +++ b/tfw/components/pipe_io/pipe_io_server/pipe_io_thread.py @@ -0,0 +1,45 @@ +from threading import Thread, Event + +from .terminate_process_on_failure import terminate_process_on_failure + + +class PipeIOThread(Thread): + def __init__(self): + super().__init__(daemon=True) + self._stop_event = Event() + self.__io_threads = [] + + @terminate_process_on_failure + def run(self): + self.__io_threads.extend(self._io_threads()) + for thread in self.__io_threads: + thread.start() + self._stop_event.wait() + self._stop_threads() + + def _io_threads(self): + raise NotImplementedError() + + def _stop_threads(self): + for thread in self.__io_threads: + if thread.is_alive(): + thread.stop() + self.on_stop() + + def on_stop(self): + pass + + def stop(self): + self._stop_event.set() + if self.is_alive(): + self.join() + + def wait(self): + self._stop_event.wait() + + def __enter__(self): + self.start() + return self + + def __exit__(self, type_, value, tb): + self.stop() diff --git a/tfw/components/pipe_io/pipe_io_server/pipe_reader_server.py b/tfw/components/pipe_io/pipe_io_server/pipe_reader_server.py new file mode 100644 index 0000000..e26bb87 --- /dev/null +++ b/tfw/components/pipe_io/pipe_io_server/pipe_reader_server.py @@ -0,0 +1,39 @@ +from .pipe import Pipe, DEFAULT_PERMISSIONS +from .pipe_io_thread import PipeIOThread +from .pipe_reader_thread import PipeReaderThread + + +class PipeReaderServer(PipeIOThread): + def __init__( + self, + in_pipe, + permissions=DEFAULT_PERMISSIONS, + manage_pipes=True, + **kwargs + ): + super().__init__(**kwargs) + self._reader_thread = None + self._manage_pipes = manage_pipes + self._in_pipe = in_pipe + if self._manage_pipes: + Pipe(self.in_pipe).recreate(permissions) + + @property + def in_pipe(self): + return self._in_pipe + + def _io_threads(self): + self._reader_thread = PipeReaderThread( + self.in_pipe, + self._stop_event, + self.handle_message + ) + yield self._reader_thread + + def handle_message(self, message): + raise NotImplementedError() + + def stop(self): + super().stop() + if self._manage_pipes: + Pipe(self.in_pipe).remove() diff --git a/tfw/components/pipe_io/pipe_io_server/pipe_writer_server.py b/tfw/components/pipe_io/pipe_io_server/pipe_writer_server.py new file mode 100644 index 0000000..1161917 --- /dev/null +++ b/tfw/components/pipe_io/pipe_io_server/pipe_writer_server.py @@ -0,0 +1,38 @@ +from .pipe import Pipe, DEFAULT_PERMISSIONS +from .pipe_io_thread import PipeIOThread +from .pipe_writer_thread import PipeWriterThread + + +class PipeWriterServer(PipeIOThread): + def __init__( + self, + out_pipe, + permissions=DEFAULT_PERMISSIONS, + manage_pipes=True, + **kwargs + ): + super().__init__(**kwargs) + self._writer_thread = None + self._manage_pipes = manage_pipes + self._out_pipe = out_pipe + if self._manage_pipes: + Pipe(self.out_pipe).recreate(permissions) + + @property + def out_pipe(self): + return self._out_pipe + + def _io_threads(self): + self._writer_thread = PipeWriterThread( + self.out_pipe, + self._stop_event + ) + yield self._writer_thread + + def send_message(self, message): + self._writer_thread.write(message) + + def stop(self): + super().stop() + if self._manage_pipes: + Pipe(self.out_pipe).remove() diff --git a/tfw/components/pipe_io/pipe_io_server/test_pipe_io_server.py b/tfw/components/pipe_io/pipe_io_server/test_pipe_io_server.py new file mode 100644 index 0000000..6a08847 --- /dev/null +++ b/tfw/components/pipe_io/pipe_io_server/test_pipe_io_server.py @@ -0,0 +1,187 @@ +# pylint: disable=redefined-outer-name +from os import stat, urandom +from os.path import exists, dirname, realpath, join +from stat import S_ISFIFO +from secrets import token_urlsafe +from random import randint, getrandbits, uniform +from threading import Thread +from json import dumps, loads + +import pytest + +from .pipe_io_server import PipeIOServer + + +class EchoPipeIOServer(PipeIOServer): + def handle_message(self, message): + self.send_message(message) + + +@pytest.fixture +def io_pipes(): + with EchoPipeIOServer(*get_test_init_params()) as pipe_io: + with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes: + yield io_pipes + + +def get_test_init_params(): + here = dirname(realpath(__file__)) + return join(here, 'in_pipe_tests'), join(here, 'out_pipe_tests') + + +def raise_if_thread_blocks(*, target, unblock_function): + thread = Thread(target=target) + thread.start() + unblock_function() + thread.join(timeout=1) + if thread.is_alive(): + raise RuntimeError('PipeIOServer failed to shut down!') + + +class IOPipes: + def __init__(self, in_pipe_path, out_pipe_path): + self.in_pipe_path = in_pipe_path + self.out_pipe_path = out_pipe_path + + def __enter__(self): + # pylint: disable=attribute-defined-outside-init + self.in_pipe = open(self.in_pipe_path, 'wb') + self.out_pipe = open(self.out_pipe_path, 'rb') + return self + + def __exit__(self, type_, value, traceback): + self.close() + + def close(self): + self.in_pipe.close() + self.out_pipe.close() + + def send_message(self, message): + self.in_pipe.write(message + b'\n') + self.in_pipe.flush() + + def recv(self): + return self.out_pipe.readline().rstrip(b'\n') + + +def pipes_exist(*paths): + predicate = lambda path: exists(path) and S_ISFIFO(stat(path).st_mode) + return all(predicate(path) for path in paths) + + +def test_manage_pipes(): + pipe_io = PipeIOServer(*get_test_init_params(), manage_pipes=True) + assert pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe) + pipe_io.stop() + assert not pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe) + + +def test_no_manage_pipes(): + pipe_io = PipeIOServer(*get_test_init_params(), manage_pipes=False) + assert not pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe) + pipe_io.stop() + assert not pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe) + + +def test_stop(): + pipe_io = EchoPipeIOServer(*get_test_init_params()) + pipe_io.start() + raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop) + + pipe_io = EchoPipeIOServer(*get_test_init_params()) + pipe_io.start() + with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as iopipes: + raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop) + + pipe_io = EchoPipeIOServer(*get_test_init_params()) + pipe_io.start() + with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as iopipes: + test_message = token_urlsafe(randint(128, 256)) + iopipes.send_message(test_message.encode()) + assert test_message == iopipes.recv().decode() + iopipes.send_message(test_message.encode()) + raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop) + + +@pytest.mark.parametrize( + 'test_data', [ + 'Cats and cheese', + 'You ever wonder why we are here?', + 'Lorem ipsum dolor sit amet', + 'You always have a plan, Dutch!', + ] +) +def test_io(io_pipes, test_data): + io_pipes.send_message(test_data.encode()) + assert io_pipes.recv().decode() == test_data + + +def test_io_random(io_pipes): + test_data = token_urlsafe(512) + for _ in range(100): + io_pipes.send_message(test_data.encode()) + assert io_pipes.recv().decode() == test_data + +@pytest.mark.parametrize( + 'test_data_size', [ + 1024, + 1024*1024, + 2*1024*1024, + 4*1024*1024, + 8*1024*1024, + 16*1024*1024, + 32*1024*1024 + ] +) +def test_io_large_data(io_pipes, test_data_size): + test_data = urandom(test_data_size).replace(b'\n', b'') + io_pipes.send_message(test_data) + received_data = io_pipes.recv() + assert received_data == test_data + + +def test_io_stress(io_pipes): + for _ in range(2222): + test_data = urandom(randint(1, 1024)).replace(b'\n', b'') + io_pipes.send_message(test_data) + assert io_pipes.recv() == test_data + + +def test_io_newlines(io_pipes): + times = randint(1, 512) + io_pipes.send_message(b'\n' * times) + for _ in range(times + 1): # IOPipes.send appends +1 + assert io_pipes.recv() == b'' + + +def test_json_io(io_pipes): + for _ in range(10): + test_data = { + f'{token_urlsafe(8)}': randint(1, 2 ** 20), + f'{token_urlsafe(9)}': [randint(1, 2 **10) for i in range(10)], + f'{token_urlsafe(4)}': f'{token_urlsafe(8)}\\\n{token_urlsafe(8)}\n{randint(1, 2 ** 10)}', + f'{token_urlsafe(11)}': { + f'{token_urlsafe(8)}': '', + f'{token_urlsafe(3)}': f'{token_urlsafe(8)}\n{token_urlsafe(8)}\n\\n{token_urlsafe(8)}', + f'{token_urlsafe(44)}': f'{token_urlsafe(8)}\n{token_urlsafe(8)} {token_urlsafe(8)}', + f'{token_urlsafe(6)}\n{token_urlsafe(4)}': bool(getrandbits(1)), + f'{token_urlsafe(8)}': None, + f'{token_urlsafe(21)} {token_urlsafe(4)}': None, + f'{token_urlsafe(3)}': uniform(randint(1, 100), randint(1, 100)), + f'{token_urlsafe(8)}': [token_urlsafe(4) for i in range(10)], + } + } + io_pipes.send_message(dumps(test_data).encode()) + assert loads(io_pipes.recv()) == test_data + + +def test_assign_message_handler(): + pipe_io = PipeIOServer(*get_test_init_params()) + pipe_io.handle_message = lambda msg: pipe_io.send_message(msg * 2) + pipe_io.start() + with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes: + for _ in range(100): + test_data = token_urlsafe(32).encode() + io_pipes.send_message(test_data) + assert io_pipes.recv() == test_data * 2 + pipe_io.stop()