From e6d277752094292a41de73cda03a078431ecd019 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Tue, 6 Aug 2019 11:33:26 +0200 Subject: [PATCH] Replace pipe-io-server source with the new pip package --- requirements.txt | 1 + tfw/components/pipe_io/pipe_io_handler.py | 2 +- .../pipe_io/pipe_io_server/__init__.py | 4 - .../pipe_io/pipe_io_server/deque.py | 27 --- tfw/components/pipe_io/pipe_io_server/pipe.py | 18 -- .../pipe_io/pipe_io_server/pipe_io_server.py | 25 --- .../pipe_io/pipe_io_server/pipe_io_thread.py | 45 ----- .../pipe_io_server/pipe_reader_server.py | 39 ---- .../pipe_io_server/pipe_reader_thread.py | 44 ----- .../pipe_io_server/pipe_writer_server.py | 38 ---- .../pipe_io_server/pipe_writer_thread.py | 50 ----- .../terminate_process_on_failure.py | 15 -- .../pipe_io_server/test_pipe_io_server.py | 187 ------------------ 13 files changed, 2 insertions(+), 493 deletions(-) delete mode 100644 tfw/components/pipe_io/pipe_io_server/__init__.py delete mode 100644 tfw/components/pipe_io/pipe_io_server/deque.py delete mode 100644 tfw/components/pipe_io/pipe_io_server/pipe.py delete mode 100644 tfw/components/pipe_io/pipe_io_server/pipe_io_server.py delete mode 100644 tfw/components/pipe_io/pipe_io_server/pipe_io_thread.py delete mode 100644 tfw/components/pipe_io/pipe_io_server/pipe_reader_server.py delete mode 100644 tfw/components/pipe_io/pipe_io_server/pipe_reader_thread.py delete mode 100644 tfw/components/pipe_io/pipe_io_server/pipe_writer_server.py delete mode 100644 tfw/components/pipe_io/pipe_io_server/pipe_writer_thread.py delete mode 100644 tfw/components/pipe_io/pipe_io_server/terminate_process_on_failure.py delete mode 100644 tfw/components/pipe_io/pipe_io_server/test_pipe_io_server.py diff --git a/requirements.txt b/requirements.txt index 3c34b45..be5fcd3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ python-dateutil>=2.0.0,<3.0.0 pytest>=5.0.0,<6.0.0 pylint>=2.0.0,<3.0.0 rope>=0.0.0,<1.0.0 +pipe-io-server>=1.0.0,<2.0.0 diff --git a/tfw/components/pipe_io/pipe_io_handler.py b/tfw/components/pipe_io/pipe_io_handler.py index 74aabe8..671f1db 100644 --- a/tfw/components/pipe_io/pipe_io_handler.py +++ b/tfw/components/pipe_io/pipe_io_handler.py @@ -10,7 +10,7 @@ from secrets import token_urlsafe from threading import Thread from contextlib import suppress -from .pipe_io_server import PipeIOServer, terminate_process_on_failure +from pipe_io_server import PipeIOServer, terminate_process_on_failure LOG = logging.getLogger(__name__) DEFAULT_PERMISSIONS = 0o600 diff --git a/tfw/components/pipe_io/pipe_io_server/__init__.py b/tfw/components/pipe_io/pipe_io_server/__init__.py deleted file mode 100644 index b0d450a..0000000 --- a/tfw/components/pipe_io/pipe_io_server/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .pipe_io_server import PipeIOServer -from .pipe_reader_server import PipeReaderServer -from .pipe_writer_server import PipeWriterServer -from .terminate_process_on_failure import terminate_process_on_failure diff --git a/tfw/components/pipe_io/pipe_io_server/deque.py b/tfw/components/pipe_io/pipe_io_server/deque.py deleted file mode 100644 index b2f1ab4..0000000 --- a/tfw/components/pipe_io/pipe_io_server/deque.py +++ /dev/null @@ -1,27 +0,0 @@ -from collections import deque -from threading import Lock, Condition - - -class Deque: - def __init__(self): - self._queue = deque() - - self._mutex = Lock() - self._not_empty = Condition(self._mutex) - - def pop(self): - with self._mutex: - while not self._queue: - self._not_empty.wait() - return self._queue.pop() - - def push(self, item): - self._push(item, self._queue.appendleft) - - def push_front(self, item): - self._push(item, self._queue.append) - - def _push(self, item, put_method): - with self._mutex: - put_method(item) - self._not_empty.notify() diff --git a/tfw/components/pipe_io/pipe_io_server/pipe.py b/tfw/components/pipe_io/pipe_io_server/pipe.py deleted file mode 100644 index f83664a..0000000 --- a/tfw/components/pipe_io/pipe_io_server/pipe.py +++ /dev/null @@ -1,18 +0,0 @@ -from os import mkfifo, remove, chmod -from os.path import exists - -DEFAULT_PERMISSIONS = 0o600 - - -class Pipe: - def __init__(self, path): - self.path = path - - def recreate(self, permissions): - self.remove() - mkfifo(self.path) - chmod(self.path, permissions) # use chmod to ignore umask - - def remove(self): - if exists(self.path): - remove(self.path) diff --git a/tfw/components/pipe_io/pipe_io_server/pipe_io_server.py b/tfw/components/pipe_io/pipe_io_server/pipe_io_server.py deleted file mode 100644 index 8204adb..0000000 --- a/tfw/components/pipe_io/pipe_io_server/pipe_io_server.py +++ /dev/null @@ -1,25 +0,0 @@ -from .pipe import DEFAULT_PERMISSIONS -from .pipe_reader_server import PipeReaderServer -from .pipe_writer_server import PipeWriterServer - - -class PipeIOServer(PipeReaderServer, PipeWriterServer): - # pylint: disable=abstract-method - def __init__( - self, - in_pipe, - out_pipe, - permissions=DEFAULT_PERMISSIONS, - manage_pipes=True - ): - super().__init__( - in_pipe=in_pipe, - out_pipe=out_pipe, - permissions=permissions, - manage_pipes=manage_pipes - ) - - def _io_threads(self): - # pylint: disable=no-member - yield from PipeReaderServer._io_threads(self) - yield from PipeWriterServer._io_threads(self) diff --git a/tfw/components/pipe_io/pipe_io_server/pipe_io_thread.py b/tfw/components/pipe_io/pipe_io_server/pipe_io_thread.py deleted file mode 100644 index 91a50b4..0000000 --- a/tfw/components/pipe_io/pipe_io_server/pipe_io_thread.py +++ /dev/null @@ -1,45 +0,0 @@ -from threading import Thread, Event - -from .terminate_process_on_failure import terminate_process_on_failure - - -class PipeIOThread(Thread): - def __init__(self): - super().__init__(daemon=True) - self._stop_event = Event() - self.__io_threads = [] - - @terminate_process_on_failure - def run(self): - self.__io_threads.extend(self._io_threads()) - for thread in self.__io_threads: - thread.start() - self._stop_event.wait() - self._stop_threads() - - def _io_threads(self): - raise NotImplementedError() - - def _stop_threads(self): - for thread in self.__io_threads: - if thread.is_alive(): - thread.stop() - self.on_stop() - - def on_stop(self): - pass - - def stop(self): - self._stop_event.set() - if self.is_alive(): - self.join() - - def wait(self): - self._stop_event.wait() - - def __enter__(self): - self.start() - return self - - def __exit__(self, type_, value, tb): - self.stop() diff --git a/tfw/components/pipe_io/pipe_io_server/pipe_reader_server.py b/tfw/components/pipe_io/pipe_io_server/pipe_reader_server.py deleted file mode 100644 index e26bb87..0000000 --- a/tfw/components/pipe_io/pipe_io_server/pipe_reader_server.py +++ /dev/null @@ -1,39 +0,0 @@ -from .pipe import Pipe, DEFAULT_PERMISSIONS -from .pipe_io_thread import PipeIOThread -from .pipe_reader_thread import PipeReaderThread - - -class PipeReaderServer(PipeIOThread): - def __init__( - self, - in_pipe, - permissions=DEFAULT_PERMISSIONS, - manage_pipes=True, - **kwargs - ): - super().__init__(**kwargs) - self._reader_thread = None - self._manage_pipes = manage_pipes - self._in_pipe = in_pipe - if self._manage_pipes: - Pipe(self.in_pipe).recreate(permissions) - - @property - def in_pipe(self): - return self._in_pipe - - def _io_threads(self): - self._reader_thread = PipeReaderThread( - self.in_pipe, - self._stop_event, - self.handle_message - ) - yield self._reader_thread - - def handle_message(self, message): - raise NotImplementedError() - - def stop(self): - super().stop() - if self._manage_pipes: - Pipe(self.in_pipe).remove() diff --git a/tfw/components/pipe_io/pipe_io_server/pipe_reader_thread.py b/tfw/components/pipe_io/pipe_io_server/pipe_reader_thread.py deleted file mode 100644 index 4bce19d..0000000 --- a/tfw/components/pipe_io/pipe_io_server/pipe_reader_thread.py +++ /dev/null @@ -1,44 +0,0 @@ -from contextlib import suppress -from os import open as osopen -from os import write, close, O_WRONLY, O_NONBLOCK -from threading import Thread - -from .terminate_process_on_failure import terminate_process_on_failure - - -class PipeReaderThread(Thread): - eof = b'' - stop_sequence = b'stop_reading\n' - - def __init__(self, pipe_path, stop_event, message_handler): - super().__init__(daemon=True) - self._message_handler = message_handler - self._pipe_path = pipe_path - self._stop_event = stop_event - - @terminate_process_on_failure - def run(self): - with self._open() as pipe: - while True: - message = pipe.readline() - if message == self.stop_sequence: - self._stop_event.set() - break - if message == self.eof: - self._open().close() - continue - self._message_handler(message[:-1]) - - def _open(self): - return open(self._pipe_path, 'rb') - - def stop(self): - while self.is_alive(): - self._unblock() - self.join() - - def _unblock(self): - with suppress(OSError): - fd = osopen(self._pipe_path, O_WRONLY | O_NONBLOCK) - write(fd, self.stop_sequence) - close(fd) diff --git a/tfw/components/pipe_io/pipe_io_server/pipe_writer_server.py b/tfw/components/pipe_io/pipe_io_server/pipe_writer_server.py deleted file mode 100644 index 1161917..0000000 --- a/tfw/components/pipe_io/pipe_io_server/pipe_writer_server.py +++ /dev/null @@ -1,38 +0,0 @@ -from .pipe import Pipe, DEFAULT_PERMISSIONS -from .pipe_io_thread import PipeIOThread -from .pipe_writer_thread import PipeWriterThread - - -class PipeWriterServer(PipeIOThread): - def __init__( - self, - out_pipe, - permissions=DEFAULT_PERMISSIONS, - manage_pipes=True, - **kwargs - ): - super().__init__(**kwargs) - self._writer_thread = None - self._manage_pipes = manage_pipes - self._out_pipe = out_pipe - if self._manage_pipes: - Pipe(self.out_pipe).recreate(permissions) - - @property - def out_pipe(self): - return self._out_pipe - - def _io_threads(self): - self._writer_thread = PipeWriterThread( - self.out_pipe, - self._stop_event - ) - yield self._writer_thread - - def send_message(self, message): - self._writer_thread.write(message) - - def stop(self): - super().stop() - if self._manage_pipes: - Pipe(self.out_pipe).remove() diff --git a/tfw/components/pipe_io/pipe_io_server/pipe_writer_thread.py b/tfw/components/pipe_io/pipe_io_server/pipe_writer_thread.py deleted file mode 100644 index 3bc3fe7..0000000 --- a/tfw/components/pipe_io/pipe_io_server/pipe_writer_thread.py +++ /dev/null @@ -1,50 +0,0 @@ -from contextlib import suppress -from os import O_NONBLOCK, O_RDONLY, close -from os import open as osopen -from threading import Thread - -from .terminate_process_on_failure import terminate_process_on_failure -from .deque import Deque - - -class PipeWriterThread(Thread): - def __init__(self, pipe_path, stop_event): - super().__init__(daemon=True) - self._pipe_path = pipe_path - self._stop_event = stop_event - self._write_queue = Deque() - - def write(self, message): - self._write_queue.push(message) - - @terminate_process_on_failure - def run(self): - with self._open() as pipe: - while True: - message = self._write_queue.pop() - if message is None: - self._stop_event.set() - break - try: - pipe.write(message + b'\n') - pipe.flush() - except BrokenPipeError: - try: # pipe was reopened, close() flushed the message - pipe.close() - except BrokenPipeError: # close() discarded the message - self._write_queue.push_front(message) - pipe = self._open() - - def _open(self): - return open(self._pipe_path, 'wb') - - def stop(self): - while self.is_alive(): - self._unblock() - self.join() - - def _unblock(self): - with suppress(OSError): - fd = osopen(self._pipe_path, O_RDONLY | O_NONBLOCK) - self._write_queue.push_front(None) - close(fd) diff --git a/tfw/components/pipe_io/pipe_io_server/terminate_process_on_failure.py b/tfw/components/pipe_io/pipe_io_server/terminate_process_on_failure.py deleted file mode 100644 index 7a0804c..0000000 --- a/tfw/components/pipe_io/pipe_io_server/terminate_process_on_failure.py +++ /dev/null @@ -1,15 +0,0 @@ -from functools import wraps -from os import kill, getpid -from signal import SIGTERM -from traceback import print_exc - - -def terminate_process_on_failure(fun): - @wraps(fun) - def wrapper(*args, **kwargs): - try: - return fun(*args, **kwargs) - except: # pylint: disable=bare-except - print_exc() - kill(getpid(), SIGTERM) - return wrapper diff --git a/tfw/components/pipe_io/pipe_io_server/test_pipe_io_server.py b/tfw/components/pipe_io/pipe_io_server/test_pipe_io_server.py deleted file mode 100644 index 6a08847..0000000 --- a/tfw/components/pipe_io/pipe_io_server/test_pipe_io_server.py +++ /dev/null @@ -1,187 +0,0 @@ -# pylint: disable=redefined-outer-name -from os import stat, urandom -from os.path import exists, dirname, realpath, join -from stat import S_ISFIFO -from secrets import token_urlsafe -from random import randint, getrandbits, uniform -from threading import Thread -from json import dumps, loads - -import pytest - -from .pipe_io_server import PipeIOServer - - -class EchoPipeIOServer(PipeIOServer): - def handle_message(self, message): - self.send_message(message) - - -@pytest.fixture -def io_pipes(): - with EchoPipeIOServer(*get_test_init_params()) as pipe_io: - with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes: - yield io_pipes - - -def get_test_init_params(): - here = dirname(realpath(__file__)) - return join(here, 'in_pipe_tests'), join(here, 'out_pipe_tests') - - -def raise_if_thread_blocks(*, target, unblock_function): - thread = Thread(target=target) - thread.start() - unblock_function() - thread.join(timeout=1) - if thread.is_alive(): - raise RuntimeError('PipeIOServer failed to shut down!') - - -class IOPipes: - def __init__(self, in_pipe_path, out_pipe_path): - self.in_pipe_path = in_pipe_path - self.out_pipe_path = out_pipe_path - - def __enter__(self): - # pylint: disable=attribute-defined-outside-init - self.in_pipe = open(self.in_pipe_path, 'wb') - self.out_pipe = open(self.out_pipe_path, 'rb') - return self - - def __exit__(self, type_, value, traceback): - self.close() - - def close(self): - self.in_pipe.close() - self.out_pipe.close() - - def send_message(self, message): - self.in_pipe.write(message + b'\n') - self.in_pipe.flush() - - def recv(self): - return self.out_pipe.readline().rstrip(b'\n') - - -def pipes_exist(*paths): - predicate = lambda path: exists(path) and S_ISFIFO(stat(path).st_mode) - return all(predicate(path) for path in paths) - - -def test_manage_pipes(): - pipe_io = PipeIOServer(*get_test_init_params(), manage_pipes=True) - assert pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe) - pipe_io.stop() - assert not pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe) - - -def test_no_manage_pipes(): - pipe_io = PipeIOServer(*get_test_init_params(), manage_pipes=False) - assert not pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe) - pipe_io.stop() - assert not pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe) - - -def test_stop(): - pipe_io = EchoPipeIOServer(*get_test_init_params()) - pipe_io.start() - raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop) - - pipe_io = EchoPipeIOServer(*get_test_init_params()) - pipe_io.start() - with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as iopipes: - raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop) - - pipe_io = EchoPipeIOServer(*get_test_init_params()) - pipe_io.start() - with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as iopipes: - test_message = token_urlsafe(randint(128, 256)) - iopipes.send_message(test_message.encode()) - assert test_message == iopipes.recv().decode() - iopipes.send_message(test_message.encode()) - raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop) - - -@pytest.mark.parametrize( - 'test_data', [ - 'Cats and cheese', - 'You ever wonder why we are here?', - 'Lorem ipsum dolor sit amet', - 'You always have a plan, Dutch!', - ] -) -def test_io(io_pipes, test_data): - io_pipes.send_message(test_data.encode()) - assert io_pipes.recv().decode() == test_data - - -def test_io_random(io_pipes): - test_data = token_urlsafe(512) - for _ in range(100): - io_pipes.send_message(test_data.encode()) - assert io_pipes.recv().decode() == test_data - -@pytest.mark.parametrize( - 'test_data_size', [ - 1024, - 1024*1024, - 2*1024*1024, - 4*1024*1024, - 8*1024*1024, - 16*1024*1024, - 32*1024*1024 - ] -) -def test_io_large_data(io_pipes, test_data_size): - test_data = urandom(test_data_size).replace(b'\n', b'') - io_pipes.send_message(test_data) - received_data = io_pipes.recv() - assert received_data == test_data - - -def test_io_stress(io_pipes): - for _ in range(2222): - test_data = urandom(randint(1, 1024)).replace(b'\n', b'') - io_pipes.send_message(test_data) - assert io_pipes.recv() == test_data - - -def test_io_newlines(io_pipes): - times = randint(1, 512) - io_pipes.send_message(b'\n' * times) - for _ in range(times + 1): # IOPipes.send appends +1 - assert io_pipes.recv() == b'' - - -def test_json_io(io_pipes): - for _ in range(10): - test_data = { - f'{token_urlsafe(8)}': randint(1, 2 ** 20), - f'{token_urlsafe(9)}': [randint(1, 2 **10) for i in range(10)], - f'{token_urlsafe(4)}': f'{token_urlsafe(8)}\\\n{token_urlsafe(8)}\n{randint(1, 2 ** 10)}', - f'{token_urlsafe(11)}': { - f'{token_urlsafe(8)}': '', - f'{token_urlsafe(3)}': f'{token_urlsafe(8)}\n{token_urlsafe(8)}\n\\n{token_urlsafe(8)}', - f'{token_urlsafe(44)}': f'{token_urlsafe(8)}\n{token_urlsafe(8)} {token_urlsafe(8)}', - f'{token_urlsafe(6)}\n{token_urlsafe(4)}': bool(getrandbits(1)), - f'{token_urlsafe(8)}': None, - f'{token_urlsafe(21)} {token_urlsafe(4)}': None, - f'{token_urlsafe(3)}': uniform(randint(1, 100), randint(1, 100)), - f'{token_urlsafe(8)}': [token_urlsafe(4) for i in range(10)], - } - } - io_pipes.send_message(dumps(test_data).encode()) - assert loads(io_pipes.recv()) == test_data - - -def test_assign_message_handler(): - pipe_io = PipeIOServer(*get_test_init_params()) - pipe_io.handle_message = lambda msg: pipe_io.send_message(msg * 2) - pipe_io.start() - with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes: - for _ in range(100): - test_data = token_urlsafe(32).encode() - io_pipes.send_message(test_data) - assert io_pipes.recv() == test_data * 2 - pipe_io.stop()