mirror of
				https://github.com/avatao-content/baseimage-tutorial-framework
				synced 2025-10-25 04:22:55 +00:00 
			
		
		
		
	Fork PipeIOServer to TFW
This commit is contained in:
		| @@ -10,3 +10,4 @@ from .terminal_commands import TerminalCommands | |||||||
| from .log_monitoring_event_handler import LogMonitoringEventHandler | from .log_monitoring_event_handler import LogMonitoringEventHandler | ||||||
| from .fsm_managing_event_handler import FSMManagingEventHandler | from .fsm_managing_event_handler import FSMManagingEventHandler | ||||||
| from .snapshot_provider import SnapshotProvider | from .snapshot_provider import SnapshotProvider | ||||||
|  | from .pipe_io_event_handler import PipeIOEventHandler | ||||||
|   | |||||||
							
								
								
									
										2
									
								
								lib/tfw/components/pipe_io_server/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								lib/tfw/components/pipe_io_server/__init__.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,2 @@ | |||||||
|  | from .pipe_io_server import PipeIOServer | ||||||
|  | from .terminate_process_on_failure import terminate_process_on_failure | ||||||
							
								
								
									
										27
									
								
								lib/tfw/components/pipe_io_server/deque.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								lib/tfw/components/pipe_io_server/deque.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,27 @@ | |||||||
|  | from collections import deque | ||||||
|  | from threading import Lock, Condition | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class Deque: | ||||||
|  |     def __init__(self): | ||||||
|  |         self._queue = deque() | ||||||
|  |  | ||||||
|  |         self._mutex = Lock() | ||||||
|  |         self._not_empty = Condition(self._mutex) | ||||||
|  |  | ||||||
|  |     def pop(self): | ||||||
|  |         with self._mutex: | ||||||
|  |             while not self._queue: | ||||||
|  |                 self._not_empty.wait() | ||||||
|  |             return self._queue.pop() | ||||||
|  |  | ||||||
|  |     def push(self, item): | ||||||
|  |         self._push(item, self._queue.appendleft) | ||||||
|  |  | ||||||
|  |     def push_front(self, item): | ||||||
|  |         self._push(item, self._queue.append) | ||||||
|  |  | ||||||
|  |     def _push(self, item, put_method): | ||||||
|  |         with self._mutex: | ||||||
|  |             put_method(item) | ||||||
|  |             self._not_empty.notify() | ||||||
							
								
								
									
										16
									
								
								lib/tfw/components/pipe_io_server/pipe.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										16
									
								
								lib/tfw/components/pipe_io_server/pipe.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,16 @@ | |||||||
|  | from os import mkfifo, remove, chmod | ||||||
|  | from os.path import exists | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class Pipe: | ||||||
|  |     def __init__(self, path): | ||||||
|  |         self.path = path | ||||||
|  |  | ||||||
|  |     def recreate(self, permissions): | ||||||
|  |         self.remove() | ||||||
|  |         mkfifo(self.path) | ||||||
|  |         chmod(self.path, permissions)  # use chmod to ignore umask | ||||||
|  |  | ||||||
|  |     def remove(self): | ||||||
|  |         if exists(self.path): | ||||||
|  |             remove(self.path) | ||||||
							
								
								
									
										73
									
								
								lib/tfw/components/pipe_io_server/pipe_io_server.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										73
									
								
								lib/tfw/components/pipe_io_server/pipe_io_server.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,73 @@ | |||||||
|  | from abc import ABC, abstractmethod | ||||||
|  | from threading import Thread, Event | ||||||
|  | from typing import Callable | ||||||
|  |  | ||||||
|  | from .pipe_reader_thread import PipeReaderThread | ||||||
|  | from .pipe_writer_thread import PipeWriterThread | ||||||
|  | from .pipe import Pipe | ||||||
|  | from .terminate_process_on_failure import terminate_process_on_failure | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class PipeIOServer(ABC, Thread): | ||||||
|  |     def __init__(self, in_pipe=None, out_pipe=None, permissions=0o600): | ||||||
|  |         super().__init__(daemon=True) | ||||||
|  |         self._in_pipe, self._out_pipe = in_pipe, out_pipe | ||||||
|  |         self._create_pipes(permissions) | ||||||
|  |         self._stop_event = Event() | ||||||
|  |         self._reader_thread, self._writer_thread = self._create_io_threads() | ||||||
|  |         self._io_threads = (self._reader_thread, self._writer_thread) | ||||||
|  |         self._on_stop = lambda: None | ||||||
|  |  | ||||||
|  |     def _create_pipes(self, permissions): | ||||||
|  |         Pipe(self.in_pipe).recreate(permissions) | ||||||
|  |         Pipe(self.out_pipe).recreate(permissions) | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def in_pipe(self): | ||||||
|  |         return self._in_pipe | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def out_pipe(self): | ||||||
|  |         return self._out_pipe | ||||||
|  |  | ||||||
|  |     def _create_io_threads(self): | ||||||
|  |         reader_thread = PipeReaderThread(self.in_pipe, self._stop_event, self.handle_message) | ||||||
|  |         writer_thread = PipeWriterThread(self.out_pipe, self._stop_event) | ||||||
|  |         return reader_thread, writer_thread | ||||||
|  |  | ||||||
|  |     @abstractmethod | ||||||
|  |     def handle_message(self, message): | ||||||
|  |         raise NotImplementedError() | ||||||
|  |  | ||||||
|  |     def send_message(self, message): | ||||||
|  |         self._writer_thread.write(message) | ||||||
|  |  | ||||||
|  |     @terminate_process_on_failure | ||||||
|  |     def run(self): | ||||||
|  |         for thread in self._io_threads: | ||||||
|  |             thread.start() | ||||||
|  |         self._stop_event.wait() | ||||||
|  |         self._stop_threads() | ||||||
|  |  | ||||||
|  |     def stop(self): | ||||||
|  |         self._stop_event.set() | ||||||
|  |         if self.is_alive(): | ||||||
|  |             self.join() | ||||||
|  |  | ||||||
|  |     def _stop_threads(self): | ||||||
|  |         for thread in self._io_threads: | ||||||
|  |             if thread.is_alive(): | ||||||
|  |                 thread.stop() | ||||||
|  |         Pipe(self.in_pipe).remove() | ||||||
|  |         Pipe(self.out_pipe).remove() | ||||||
|  |         self._on_stop() | ||||||
|  |  | ||||||
|  |     def _set_on_stop(self, value): | ||||||
|  |         if not isinstance(value, Callable): | ||||||
|  |             raise ValueError("Supplied object is not callable!") | ||||||
|  |         self._on_stop = value | ||||||
|  |  | ||||||
|  |     on_stop = property(fset=_set_on_stop) | ||||||
|  |  | ||||||
|  |     def wait(self): | ||||||
|  |         self._stop_event.wait() | ||||||
							
								
								
									
										44
									
								
								lib/tfw/components/pipe_io_server/pipe_reader_thread.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										44
									
								
								lib/tfw/components/pipe_io_server/pipe_reader_thread.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,44 @@ | |||||||
|  | from contextlib import suppress | ||||||
|  | from os import open as osopen | ||||||
|  | from os import write, close, O_WRONLY, O_NONBLOCK | ||||||
|  | from threading import Thread | ||||||
|  |  | ||||||
|  | from .terminate_process_on_failure import terminate_process_on_failure | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class PipeReaderThread(Thread): | ||||||
|  |     eof = b'' | ||||||
|  |     stop_sequence = b'stop_reading\n' | ||||||
|  |  | ||||||
|  |     def __init__(self, pipe_path, stop_event, message_handler): | ||||||
|  |         super().__init__(daemon=True) | ||||||
|  |         self._message_handler = message_handler | ||||||
|  |         self._pipe_path = pipe_path | ||||||
|  |         self._stop_event = stop_event | ||||||
|  |  | ||||||
|  |     @terminate_process_on_failure | ||||||
|  |     def run(self): | ||||||
|  |         with self._open() as pipe: | ||||||
|  |             while True: | ||||||
|  |                 message = pipe.readline() | ||||||
|  |                 if message == self.stop_sequence: | ||||||
|  |                     self._stop_event.set() | ||||||
|  |                     break | ||||||
|  |                 if message == self.eof: | ||||||
|  |                     self._open().close() | ||||||
|  |                     continue | ||||||
|  |                 self._message_handler(message[:-1]) | ||||||
|  |  | ||||||
|  |     def _open(self): | ||||||
|  |         return open(self._pipe_path, 'rb') | ||||||
|  |  | ||||||
|  |     def stop(self): | ||||||
|  |         while self.is_alive(): | ||||||
|  |             self._unblock() | ||||||
|  |         self.join() | ||||||
|  |  | ||||||
|  |     def _unblock(self): | ||||||
|  |         with suppress(OSError): | ||||||
|  |             fd = osopen(self._pipe_path, O_WRONLY | O_NONBLOCK) | ||||||
|  |             write(fd, self.stop_sequence) | ||||||
|  |             close(fd) | ||||||
							
								
								
									
										50
									
								
								lib/tfw/components/pipe_io_server/pipe_writer_thread.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										50
									
								
								lib/tfw/components/pipe_io_server/pipe_writer_thread.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,50 @@ | |||||||
|  | from contextlib import suppress | ||||||
|  | from os import O_NONBLOCK, O_RDONLY, close | ||||||
|  | from os import open as osopen | ||||||
|  | from threading import Thread | ||||||
|  |  | ||||||
|  | from .terminate_process_on_failure import terminate_process_on_failure | ||||||
|  | from .deque import Deque | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class PipeWriterThread(Thread): | ||||||
|  |     def __init__(self, pipe_path, stop_event): | ||||||
|  |         super().__init__(daemon=True) | ||||||
|  |         self._pipe_path = pipe_path | ||||||
|  |         self._stop_event = stop_event | ||||||
|  |         self._write_queue = Deque() | ||||||
|  |  | ||||||
|  |     def write(self, message): | ||||||
|  |         self._write_queue.push(message) | ||||||
|  |  | ||||||
|  |     @terminate_process_on_failure | ||||||
|  |     def run(self): | ||||||
|  |         with self._open() as pipe: | ||||||
|  |             while True: | ||||||
|  |                 message = self._write_queue.pop() | ||||||
|  |                 if message is None: | ||||||
|  |                     self._stop_event.set() | ||||||
|  |                     break | ||||||
|  |                 try: | ||||||
|  |                     pipe.write(message + b'\n') | ||||||
|  |                     pipe.flush() | ||||||
|  |                 except BrokenPipeError: | ||||||
|  |                     try:  # pipe was reopened, close() flushed the message | ||||||
|  |                         pipe.close() | ||||||
|  |                     except BrokenPipeError:  # close() discarded the message | ||||||
|  |                         self._write_queue.push_front(message) | ||||||
|  |                     pipe = self._open() | ||||||
|  |  | ||||||
|  |     def _open(self): | ||||||
|  |         return open(self._pipe_path, 'wb') | ||||||
|  |  | ||||||
|  |     def stop(self): | ||||||
|  |         while self.is_alive(): | ||||||
|  |             self._unblock() | ||||||
|  |         self.join() | ||||||
|  |  | ||||||
|  |     def _unblock(self): | ||||||
|  |         with suppress(OSError): | ||||||
|  |             fd = osopen(self._pipe_path, O_RDONLY | O_NONBLOCK) | ||||||
|  |             self._write_queue.push_front(None) | ||||||
|  |             close(fd) | ||||||
| @@ -0,0 +1,15 @@ | |||||||
|  | from functools import wraps | ||||||
|  | from os import kill, getpid | ||||||
|  | from signal import SIGTERM | ||||||
|  | from traceback import print_exc | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def terminate_process_on_failure(fun): | ||||||
|  |     @wraps(fun) | ||||||
|  |     def wrapper(*args, **kwargs): | ||||||
|  |         try: | ||||||
|  |             return fun(*args, **kwargs) | ||||||
|  |         except:  # pylint: disable=bare-except | ||||||
|  |             print_exc() | ||||||
|  |             kill(getpid(), SIGTERM) | ||||||
|  |     return wrapper | ||||||
		Reference in New Issue
	
	Block a user