diff --git a/lib/tfw/builtins/__init__.py b/lib/tfw/builtins/__init__.py index af322b8..c8ab18b 100644 --- a/lib/tfw/builtins/__init__.py +++ b/lib/tfw/builtins/__init__.py @@ -1,13 +1,12 @@ -from .tfw_server_connector import TFWServerUplinkConnector, TFWServerConnector -from .event_handler import EventHandler -from .fsm_aware_event_handler import FSMAwareEventHandler from .directory_snapshotting_event_handler import DirectorySnapshottingEventHandler +from .event_handler import EventHandler from .frontend_event_handler import FrontendEventHandler +from .fsm_aware_event_handler import FSMAwareEventHandler from .fsm_managing_event_handler import FSMManagingEventHandler from .ide_event_handler import IdeEventHandler from .log_monitoring_event_handler import LogMonitoringEventHandler -from .message_sender import MessageSender from .pipe_io_event_handler import PipeIOEventHandlerBase, PipeIOEventHandler from .pipe_io_event_handler import TransformerPipeIOEventHandler, CommandEventHandler from .process_managing_event_handler import ProcessManagingEventHandler from .terminal_event_handler import TerminalEventHandler +from .tfw_server_connector import TFWServerUplinkConnector, TFWServerConnector diff --git a/lib/tfw/builtins/frontend_event_handler.py b/lib/tfw/builtins/frontend_event_handler.py index 1099564..ba39c27 100644 --- a/lib/tfw/builtins/frontend_event_handler.py +++ b/lib/tfw/builtins/frontend_event_handler.py @@ -1,9 +1,6 @@ -from abc import ABC, abstractmethod -from contextlib import suppress - from tfw.networking import Scope +from tfw.components import FrontendMessageStorage -from .message_sender import MessageSender from .event_handler import EventHandler @@ -22,43 +19,3 @@ class FrontendEventHandler(EventHandler): def recover_frontend(self): for message in self._frontend_message_storage.messages: self.send_message(message) - - -class MessageStorage(ABC): - def __init__(self): - self._messages = [] - - def save_message(self, message): - with suppress(KeyError, AttributeError): - if self._filter_message(message): - self._messages.extend(self._transform_message(message)) - - @abstractmethod - def _filter_message(self, message): - raise NotImplementedError - - def _transform_message(self, message): # pylint: disable=no-self-use - yield message - - def clear(self): - self._messages.clear() - - @property - def messages(self): - yield from self._messages - - -class FrontendMessageStorage(MessageStorage): - def __init__(self, keys): - self._keys = keys - super().__init__() - - def _filter_message(self, message): - key = message['key'] - return key in self._keys - - def _transform_message(self, message): - if message['key'] == 'queueMessages': - yield from MessageSender.generate_messages_from_queue(message) - else: - yield message diff --git a/lib/tfw/builtins/fsm_managing_event_handler.py b/lib/tfw/builtins/fsm_managing_event_handler.py index fc3bcda..ccbb7de 100644 --- a/lib/tfw/builtins/fsm_managing_event_handler.py +++ b/lib/tfw/builtins/fsm_managing_event_handler.py @@ -2,6 +2,7 @@ import logging from tfw.crypto import KeyManager, sign_message, verify_message from tfw.networking import Scope +from tfw.components import FSMUpdater from .event_handler import EventHandler @@ -69,30 +70,3 @@ class FSMManagingEventHandler(EventHandler): """ # pylint: disable=no-self-use return message - - -class FSMUpdater: - def __init__(self, fsm): - self.fsm = fsm - - @property - def fsm_update(self): - return { - 'key': 'fsm_update', - 'data': self.fsm_update_data - } - - @property - def fsm_update_data(self): - valid_transitions = [ - {'trigger': trigger} - for trigger in self.fsm.get_triggers(self.fsm.state) - ] - last_fsm_event = self.fsm.event_log[-1] - last_fsm_event['timestamp'] = last_fsm_event['timestamp'].isoformat() - return { - 'current_state': self.fsm.state, - 'valid_transitions': valid_transitions, - 'in_accepted_state': self.fsm.in_accepted_state, - 'last_event': last_fsm_event - } diff --git a/lib/tfw/builtins/log_monitoring_event_handler.py b/lib/tfw/builtins/log_monitoring_event_handler.py index 9b9a900..eebade5 100644 --- a/lib/tfw/builtins/log_monitoring_event_handler.py +++ b/lib/tfw/builtins/log_monitoring_event_handler.py @@ -1,52 +1,14 @@ import logging +from tfw.config import TFWENV from tfw.networking import Scope -from tfw.components.inotify import InotifyObserver -from tfw.mixins.supervisor_mixin import SupervisorLogMixin +from tfw.components import LogInotifyObserver from .event_handler import EventHandler LOG = logging.getLogger(__name__) -class LogInotifyObserver(InotifyObserver, SupervisorLogMixin): - def __init__(self, server_connector, process_name, log_tail=0): - self._prevent_log_recursion() - self._server_connector = server_connector - self._process_name = process_name - self.log_tail = log_tail - self._procinfo = None - InotifyObserver.__init__(self, self._get_logfiles()) - - @staticmethod - def _prevent_log_recursion(): - # This is done to prevent inotify event logs triggering themselves (infinite log recursion) - logging.getLogger('watchdog.observers.inotify_buffer').propagate = False - - def _get_logfiles(self): - self._procinfo = self.supervisor.getProcessInfo(self._process_name) - return self._procinfo['stdout_logfile'], self._procinfo['stderr_logfile'] - - @property - def process_name(self): - return self._process_name - - @process_name.setter - def process_name(self, process_name): - self._process_name = process_name - self.paths = self._get_logfiles() - - def on_modified(self, event): - self._server_connector.send_message({ - 'key': 'processlog', - 'data': { - 'command': 'new_log', - 'stdout': self.read_stdout(self.process_name, tail=self.log_tail), - 'stderr': self.read_stderr(self.process_name, tail=self.log_tail) - } - }, Scope.BROADCAST) - - class LogMonitoringEventHandler(EventHandler): """ Monitors the output of a supervisor process (stdout, stderr) and @@ -60,7 +22,12 @@ class LogMonitoringEventHandler(EventHandler): def __init__(self, key, process_name, log_tail=0): super().__init__(key, scope=Scope.WEBSOCKET) self.process_name = process_name - self._monitor = LogInotifyObserver(self.server_connector, process_name, log_tail) + self._monitor = LogInotifyObserver( + server_connector=self.server_connector, + supervisor_uri=TFWENV.SUPERVISOR_HTTP_URI, + process_name=process_name, + log_tail=log_tail + ) self._monitor.start() self.command_handlers = { diff --git a/lib/tfw/builtins/process_managing_event_handler.py b/lib/tfw/builtins/process_managing_event_handler.py index abaec9d..2a1f968 100644 --- a/lib/tfw/builtins/process_managing_event_handler.py +++ b/lib/tfw/builtins/process_managing_event_handler.py @@ -1,27 +1,16 @@ import logging from xmlrpc.client import Fault as SupervisorFault +from tfw.config import TFWENV from tfw.networking import Scope -from tfw.mixins.supervisor_mixin import SupervisorMixin, SupervisorLogMixin +from tfw.components import ProcessManager, LogManager from .event_handler import EventHandler LOG = logging.getLogger(__name__) -class ProcessManager(SupervisorMixin, SupervisorLogMixin): - def __init__(self): - self.commands = { - 'start': self.start_process, - 'stop': self.stop_process, - 'restart': self.restart_process - } - - def __call__(self, command, process_name): - return self.commands[command](process_name) - - -class ProcessManagingEventHandler(EventHandler): +class ProcessManagingEventHandler(EventHandler, ProcessManager, LogManager): """ Event handler that can manage processes managed by supervisor. @@ -35,23 +24,29 @@ class ProcessManagingEventHandler(EventHandler): (the names are as self-documenting as it gets) """ def __init__(self, key, log_tail=0): - super().__init__(key, scope=Scope.WEBSOCKET) - self.processmanager = ProcessManager() + EventHandler.__init__(self, key, scope=Scope.WEBSOCKET) + ProcessManager.__init__(self, TFWENV.SUPERVISOR_HTTP_URI) + LogManager.__init__(self, TFWENV.SUPERVISOR_HTTP_URI) self.log_tail = log_tail + self.commands = { + 'start': self.start_process, + 'stop': self.stop_process, + 'restart': self.restart_process + } def handle_event(self, message): try: data = message['data'] try: - self.processmanager(data['command'], data['process_name']) + self.commands[data['command']](data['process_name']) except SupervisorFault as fault: message['data']['error'] = fault.faultString finally: - message['data']['stdout'] = self.processmanager.read_stdout( + message['data']['stdout'] = self.read_stdout( data['process_name'], self.log_tail ) - message['data']['stderr'] = self.processmanager.read_stderr( + message['data']['stderr'] = self.read_stderr( data['process_name'], self.log_tail ) diff --git a/lib/tfw/builtins/terminal_event_handler.py b/lib/tfw/builtins/terminal_event_handler.py index 2d45417..1f7bf5f 100644 --- a/lib/tfw/builtins/terminal_event_handler.py +++ b/lib/tfw/builtins/terminal_event_handler.py @@ -1,8 +1,7 @@ import logging from tfw.networking import Scope -from tfw.components import BashMonitor -from tfw.components.terminado_mini_server import TerminadoMiniServer +from tfw.components import BashMonitor, TerminadoMiniServer from tfw.config import TFWENV from tao.config import TAOENV diff --git a/lib/tfw/components/__init__.py b/lib/tfw/components/__init__.py index 99c5149..a1355d7 100644 --- a/lib/tfw/components/__init__.py +++ b/lib/tfw/components/__init__.py @@ -1,6 +1,12 @@ from .commands_equal import CommandsEqual from .file_manager import FileManager -from .history_monitor import HistoryMonitor, BashMonitor, GDBMonitor -from .snapshot_provider import SnapshotProvider -from .terminal_commands import TerminalCommands from .fsm_aware import FSMAware +from .fsm_updater import FSMUpdater +from .history_monitor import BashMonitor, GDBMonitor +from .log_inotify_observer import LogInotifyObserver +from .message_sender import MessageSender +from .message_storage import FrontendMessageStorage +from .snapshot_provider import SnapshotProvider +from .supervisor import ProcessManager, LogManager +from .terminado_mini_server import TerminadoMiniServer +from .terminal_commands import TerminalCommands diff --git a/lib/tfw/components/fsm_updater.py b/lib/tfw/components/fsm_updater.py new file mode 100644 index 0000000..6bff16a --- /dev/null +++ b/lib/tfw/components/fsm_updater.py @@ -0,0 +1,25 @@ +class FSMUpdater: + def __init__(self, fsm): + self.fsm = fsm + + @property + def fsm_update(self): + return { + 'key': 'fsm_update', + 'data': self.fsm_update_data + } + + @property + def fsm_update_data(self): + valid_transitions = [ + {'trigger': trigger} + for trigger in self.fsm.get_triggers(self.fsm.state) + ] + last_fsm_event = self.fsm.event_log[-1] + last_fsm_event['timestamp'] = last_fsm_event['timestamp'].isoformat() + return { + 'current_state': self.fsm.state, + 'valid_transitions': valid_transitions, + 'in_accepted_state': self.fsm.in_accepted_state, + 'last_event': last_fsm_event + } diff --git a/lib/tfw/components/log_inotify_observer.py b/lib/tfw/components/log_inotify_observer.py new file mode 100644 index 0000000..4a21f08 --- /dev/null +++ b/lib/tfw/components/log_inotify_observer.py @@ -0,0 +1,45 @@ +import logging + +from tfw.networking import Scope + +from .inotify import InotifyObserver +from .supervisor import LogManager + + +class LogInotifyObserver(InotifyObserver, LogManager): + def __init__(self, server_connector, supervisor_uri, process_name, log_tail=0): + self._prevent_log_recursion() + self._server_connector = server_connector + self._process_name = process_name + self.log_tail = log_tail + self._procinfo = None + LogManager.__init__(self, supervisor_uri) + InotifyObserver.__init__(self, self._get_logfiles()) + + @staticmethod + def _prevent_log_recursion(): + # This is done to prevent inotify event logs triggering themselves (infinite log recursion) + logging.getLogger('watchdog.observers.inotify_buffer').propagate = False + + def _get_logfiles(self): + self._procinfo = self.supervisor.getProcessInfo(self._process_name) + return self._procinfo['stdout_logfile'], self._procinfo['stderr_logfile'] + + @property + def process_name(self): + return self._process_name + + @process_name.setter + def process_name(self, process_name): + self._process_name = process_name + self.paths = self._get_logfiles() + + def on_modified(self, event): + self._server_connector.send_message({ + 'key': 'processlog', + 'data': { + 'command': 'new_log', + 'stdout': self.read_stdout(self.process_name, tail=self.log_tail), + 'stderr': self.read_stderr(self.process_name, tail=self.log_tail) + } + }, Scope.BROADCAST) diff --git a/lib/tfw/builtins/message_sender.py b/lib/tfw/components/message_sender.py similarity index 91% rename from lib/tfw/builtins/message_sender.py rename to lib/tfw/components/message_sender.py index 821b811..72915c2 100644 --- a/lib/tfw/builtins/message_sender.py +++ b/lib/tfw/components/message_sender.py @@ -1,12 +1,9 @@ -from .tfw_server_connector import TFWServerUplinkConnector - - class MessageSender: """ Provides mechanisms to send messages to our frontend messaging component. """ - def __init__(self): - self.uplink = TFWServerUplinkConnector() + def __init__(self, uplink): + self.uplink = uplink self.key = 'message' self.queue_key = 'queueMessages' diff --git a/lib/tfw/components/message_storage.py b/lib/tfw/components/message_storage.py new file mode 100644 index 0000000..1c63ffc --- /dev/null +++ b/lib/tfw/components/message_storage.py @@ -0,0 +1,44 @@ +from abc import ABC, abstractmethod +from contextlib import suppress + +from .message_sender import MessageSender + + +class MessageStorage(ABC): + def __init__(self): + self._messages = [] + + def save_message(self, message): + with suppress(KeyError, AttributeError): + if self._filter_message(message): + self._messages.extend(self._transform_message(message)) + + @abstractmethod + def _filter_message(self, message): + raise NotImplementedError + + def _transform_message(self, message): # pylint: disable=no-self-use + yield message + + def clear(self): + self._messages.clear() + + @property + def messages(self): + yield from self._messages + + +class FrontendMessageStorage(MessageStorage): + def __init__(self, keys): + self._keys = keys + super().__init__() + + def _filter_message(self, message): + key = message['key'] + return key in self._keys + + def _transform_message(self, message): + if message['key'] == 'queueMessages': + yield from MessageSender.generate_messages_from_queue(message) + else: + yield message diff --git a/lib/tfw/mixins/supervisor_mixin.py b/lib/tfw/components/supervisor.py similarity index 73% rename from lib/tfw/mixins/supervisor_mixin.py rename to lib/tfw/components/supervisor.py index 216f34b..83e8bc4 100644 --- a/lib/tfw/mixins/supervisor_mixin.py +++ b/lib/tfw/components/supervisor.py @@ -1,20 +1,15 @@ +from os import remove +from contextlib import suppress import xmlrpc.client from xmlrpc.client import Fault as SupervisorFault -from contextlib import suppress -from os import remove - -from tfw.decorators.lazy_property import lazy_property -from tfw.config import TFWENV -class SupervisorBaseMixin: - @lazy_property - def supervisor(self): - # pylint: disable=no-self-use - return xmlrpc.client.ServerProxy(TFWENV.SUPERVISOR_HTTP_URI).supervisor +class SupervisorBase: + def __init__(self, supervisor_uri): + self.supervisor = xmlrpc.client.ServerProxy(supervisor_uri).supervisor -class SupervisorMixin(SupervisorBaseMixin): +class ProcessManager(SupervisorBase): def stop_process(self, process_name): with suppress(SupervisorFault): self.supervisor.stopProcess(process_name) @@ -27,7 +22,7 @@ class SupervisorMixin(SupervisorBaseMixin): self.start_process(process_name) -class SupervisorLogMixin(SupervisorBaseMixin): +class LogManager(SupervisorBase): def read_stdout(self, process_name, tail=0): return self.supervisor.readProcessStdoutLog(process_name, -tail, 0)