From 6ea0967a21618227e7cd5e9c26d03871cc61011d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Thu, 2 May 2019 14:53:59 +0200 Subject: [PATCH] Fork PipeIOServer to TFW --- lib/tfw/components/__init__.py | 1 + lib/tfw/components/pipe_io_server/__init__.py | 2 + lib/tfw/components/pipe_io_server/deque.py | 27 +++++++ lib/tfw/components/pipe_io_server/pipe.py | 16 ++++ .../pipe_io_server/pipe_io_server.py | 73 +++++++++++++++++++ .../pipe_io_server/pipe_reader_thread.py | 44 +++++++++++ .../pipe_io_server/pipe_writer_thread.py | 50 +++++++++++++ .../terminate_process_on_failure.py | 15 ++++ 8 files changed, 228 insertions(+) create mode 100644 lib/tfw/components/pipe_io_server/__init__.py create mode 100644 lib/tfw/components/pipe_io_server/deque.py create mode 100644 lib/tfw/components/pipe_io_server/pipe.py create mode 100644 lib/tfw/components/pipe_io_server/pipe_io_server.py create mode 100644 lib/tfw/components/pipe_io_server/pipe_reader_thread.py create mode 100644 lib/tfw/components/pipe_io_server/pipe_writer_thread.py create mode 100644 lib/tfw/components/pipe_io_server/terminate_process_on_failure.py diff --git a/lib/tfw/components/__init__.py b/lib/tfw/components/__init__.py index 4c0475c..5251a4e 100644 --- a/lib/tfw/components/__init__.py +++ b/lib/tfw/components/__init__.py @@ -10,3 +10,4 @@ from .terminal_commands import TerminalCommands from .log_monitoring_event_handler import LogMonitoringEventHandler from .fsm_managing_event_handler import FSMManagingEventHandler from .snapshot_provider import SnapshotProvider +from .pipe_io_event_handler import PipeIOEventHandler diff --git a/lib/tfw/components/pipe_io_server/__init__.py b/lib/tfw/components/pipe_io_server/__init__.py new file mode 100644 index 0000000..cab334b --- /dev/null +++ b/lib/tfw/components/pipe_io_server/__init__.py @@ -0,0 +1,2 @@ +from .pipe_io_server import PipeIOServer +from .terminate_process_on_failure import terminate_process_on_failure diff --git a/lib/tfw/components/pipe_io_server/deque.py b/lib/tfw/components/pipe_io_server/deque.py new file mode 100644 index 0000000..b2f1ab4 --- /dev/null +++ b/lib/tfw/components/pipe_io_server/deque.py @@ -0,0 +1,27 @@ +from collections import deque +from threading import Lock, Condition + + +class Deque: + def __init__(self): + self._queue = deque() + + self._mutex = Lock() + self._not_empty = Condition(self._mutex) + + def pop(self): + with self._mutex: + while not self._queue: + self._not_empty.wait() + return self._queue.pop() + + def push(self, item): + self._push(item, self._queue.appendleft) + + def push_front(self, item): + self._push(item, self._queue.append) + + def _push(self, item, put_method): + with self._mutex: + put_method(item) + self._not_empty.notify() diff --git a/lib/tfw/components/pipe_io_server/pipe.py b/lib/tfw/components/pipe_io_server/pipe.py new file mode 100644 index 0000000..eb021ae --- /dev/null +++ b/lib/tfw/components/pipe_io_server/pipe.py @@ -0,0 +1,16 @@ +from os import mkfifo, remove, chmod +from os.path import exists + + +class Pipe: + def __init__(self, path): + self.path = path + + def recreate(self, permissions): + self.remove() + mkfifo(self.path) + chmod(self.path, permissions) # use chmod to ignore umask + + def remove(self): + if exists(self.path): + remove(self.path) diff --git a/lib/tfw/components/pipe_io_server/pipe_io_server.py b/lib/tfw/components/pipe_io_server/pipe_io_server.py new file mode 100644 index 0000000..2715f40 --- /dev/null +++ b/lib/tfw/components/pipe_io_server/pipe_io_server.py @@ -0,0 +1,73 @@ +from abc import ABC, abstractmethod +from threading import Thread, Event +from typing import Callable + +from .pipe_reader_thread import PipeReaderThread +from .pipe_writer_thread import PipeWriterThread +from .pipe import Pipe +from .terminate_process_on_failure import terminate_process_on_failure + + +class PipeIOServer(ABC, Thread): + def __init__(self, in_pipe=None, out_pipe=None, permissions=0o600): + super().__init__(daemon=True) + self._in_pipe, self._out_pipe = in_pipe, out_pipe + self._create_pipes(permissions) + self._stop_event = Event() + self._reader_thread, self._writer_thread = self._create_io_threads() + self._io_threads = (self._reader_thread, self._writer_thread) + self._on_stop = lambda: None + + def _create_pipes(self, permissions): + Pipe(self.in_pipe).recreate(permissions) + Pipe(self.out_pipe).recreate(permissions) + + @property + def in_pipe(self): + return self._in_pipe + + @property + def out_pipe(self): + return self._out_pipe + + def _create_io_threads(self): + reader_thread = PipeReaderThread(self.in_pipe, self._stop_event, self.handle_message) + writer_thread = PipeWriterThread(self.out_pipe, self._stop_event) + return reader_thread, writer_thread + + @abstractmethod + def handle_message(self, message): + raise NotImplementedError() + + def send_message(self, message): + self._writer_thread.write(message) + + @terminate_process_on_failure + def run(self): + for thread in self._io_threads: + thread.start() + self._stop_event.wait() + self._stop_threads() + + def stop(self): + self._stop_event.set() + if self.is_alive(): + self.join() + + def _stop_threads(self): + for thread in self._io_threads: + if thread.is_alive(): + thread.stop() + Pipe(self.in_pipe).remove() + Pipe(self.out_pipe).remove() + self._on_stop() + + def _set_on_stop(self, value): + if not isinstance(value, Callable): + raise ValueError("Supplied object is not callable!") + self._on_stop = value + + on_stop = property(fset=_set_on_stop) + + def wait(self): + self._stop_event.wait() diff --git a/lib/tfw/components/pipe_io_server/pipe_reader_thread.py b/lib/tfw/components/pipe_io_server/pipe_reader_thread.py new file mode 100644 index 0000000..4bce19d --- /dev/null +++ b/lib/tfw/components/pipe_io_server/pipe_reader_thread.py @@ -0,0 +1,44 @@ +from contextlib import suppress +from os import open as osopen +from os import write, close, O_WRONLY, O_NONBLOCK +from threading import Thread + +from .terminate_process_on_failure import terminate_process_on_failure + + +class PipeReaderThread(Thread): + eof = b'' + stop_sequence = b'stop_reading\n' + + def __init__(self, pipe_path, stop_event, message_handler): + super().__init__(daemon=True) + self._message_handler = message_handler + self._pipe_path = pipe_path + self._stop_event = stop_event + + @terminate_process_on_failure + def run(self): + with self._open() as pipe: + while True: + message = pipe.readline() + if message == self.stop_sequence: + self._stop_event.set() + break + if message == self.eof: + self._open().close() + continue + self._message_handler(message[:-1]) + + def _open(self): + return open(self._pipe_path, 'rb') + + def stop(self): + while self.is_alive(): + self._unblock() + self.join() + + def _unblock(self): + with suppress(OSError): + fd = osopen(self._pipe_path, O_WRONLY | O_NONBLOCK) + write(fd, self.stop_sequence) + close(fd) diff --git a/lib/tfw/components/pipe_io_server/pipe_writer_thread.py b/lib/tfw/components/pipe_io_server/pipe_writer_thread.py new file mode 100644 index 0000000..3bc3fe7 --- /dev/null +++ b/lib/tfw/components/pipe_io_server/pipe_writer_thread.py @@ -0,0 +1,50 @@ +from contextlib import suppress +from os import O_NONBLOCK, O_RDONLY, close +from os import open as osopen +from threading import Thread + +from .terminate_process_on_failure import terminate_process_on_failure +from .deque import Deque + + +class PipeWriterThread(Thread): + def __init__(self, pipe_path, stop_event): + super().__init__(daemon=True) + self._pipe_path = pipe_path + self._stop_event = stop_event + self._write_queue = Deque() + + def write(self, message): + self._write_queue.push(message) + + @terminate_process_on_failure + def run(self): + with self._open() as pipe: + while True: + message = self._write_queue.pop() + if message is None: + self._stop_event.set() + break + try: + pipe.write(message + b'\n') + pipe.flush() + except BrokenPipeError: + try: # pipe was reopened, close() flushed the message + pipe.close() + except BrokenPipeError: # close() discarded the message + self._write_queue.push_front(message) + pipe = self._open() + + def _open(self): + return open(self._pipe_path, 'wb') + + def stop(self): + while self.is_alive(): + self._unblock() + self.join() + + def _unblock(self): + with suppress(OSError): + fd = osopen(self._pipe_path, O_RDONLY | O_NONBLOCK) + self._write_queue.push_front(None) + close(fd) diff --git a/lib/tfw/components/pipe_io_server/terminate_process_on_failure.py b/lib/tfw/components/pipe_io_server/terminate_process_on_failure.py new file mode 100644 index 0000000..7a0804c --- /dev/null +++ b/lib/tfw/components/pipe_io_server/terminate_process_on_failure.py @@ -0,0 +1,15 @@ +from functools import wraps +from os import kill, getpid +from signal import SIGTERM +from traceback import print_exc + + +def terminate_process_on_failure(fun): + @wraps(fun) + def wrapper(*args, **kwargs): + try: + return fun(*args, **kwargs) + except: # pylint: disable=bare-except + print_exc() + kill(getpid(), SIGTERM) + return wrapper