diff --git a/tfw/components/frontend/frontend_handler.py b/tfw/components/frontend/frontend_handler.py index 7e35bc5..3b8a76a 100644 --- a/tfw/components/frontend/frontend_handler.py +++ b/tfw/components/frontend/frontend_handler.py @@ -4,26 +4,25 @@ from .message_storage import FrontendMessageStorage class FrontendHandler: - # TODO: do not store dashboard messages like reloadFrontend - keys = [ - 'console.read', - 'console.write', - 'console.showLiveLogs', - 'console.rewriteContentWithProcessLogsOnDeploy', - 'dashboard.layout', - 'dashboard.hideMessages', - 'dashboard.terminalMenuItem', - 'dashboard.reloadFrontend', - 'dashboard.reloadIframe', - 'message.config', - 'message.queue', - 'message.send' - ] + # keys = [ + # 'console.read', + # 'console.write', + # 'console.showLiveLogs', + # 'console.rewriteContentWithProcessLogsOnDeploy', + # 'dashboard.layout', + # 'dashboard.hideMessages', + # 'dashboard.terminalMenuItem', + # 'dashboard.reloadFrontend', + # 'dashboard.reloadIframe', + # 'message.config', + # 'message.queue', + # 'message.send' + # ] + keys = ['console', 'dashboard', 'message', 'ide.read', 'recover'] def __init__(self): self.connector = None - self.keys = [*type(self).keys, 'recover'] - self._frontend_message_storage = FrontendMessageStorage(type(self).keys) + self._frontend_message_storage = FrontendMessageStorage() def send_message(self, message): self.connector.send_message(message, scope=Scope.WEBSOCKET) @@ -32,7 +31,12 @@ class FrontendHandler: self._frontend_message_storage.save_message(message) if message['key'] == 'recover': self.recover_frontend() - self.send_message(message) + if self._filter_message(message): + self.send_message(message) + + @staticmethod + def _filter_message(message): + return not message['key'].startswith('ide') def recover_frontend(self): for message in self._frontend_message_storage.messages: diff --git a/tfw/components/frontend/message_storage.py b/tfw/components/frontend/message_storage.py index 1c63ffc..2478827 100644 --- a/tfw/components/frontend/message_storage.py +++ b/tfw/components/frontend/message_storage.py @@ -29,16 +29,25 @@ class MessageStorage(ABC): class FrontendMessageStorage(MessageStorage): - def __init__(self, keys): - self._keys = keys - super().__init__() - def _filter_message(self, message): - key = message['key'] - return key in self._keys + return message['key'].startswith(( + 'console.write', + 'dashboard.layout', + 'dashboard.terminalMenuItem', + 'message.send', + 'message.config', + 'ide.read' + )) def _transform_message(self, message): - if message['key'] == 'queueMessages': - yield from MessageSender.generate_messages_from_queue(message) - else: - yield message + transformations = { + 'message.queue': MessageSender.generate_messages_from_queue, + 'ide.read': self._delete_ide_content + } + if message['key'] in transformations: + yield from transformations[message['key']](message) + + @staticmethod + def _delete_ide_content(message): + del message['content'] + yield message diff --git a/tfw/components/fsm/fsm_handler.py b/tfw/components/fsm/fsm_handler.py index 5d28c1f..24b8231 100644 --- a/tfw/components/fsm/fsm_handler.py +++ b/tfw/components/fsm/fsm_handler.py @@ -9,7 +9,8 @@ LOG = logging.getLogger(__name__) class FSMHandler: - keys = ['fsm.step', 'fsm.announce'] + keys = ['fsm'] + def __init__(self, *, fsm_type): self.fsm = fsm_type() self._fsm_updater = FSMUpdater(self.fsm) diff --git a/tfw/components/ide/ide_handler.py b/tfw/components/ide/ide_handler.py index f7435d6..bf1307b 100644 --- a/tfw/components/ide/ide_handler.py +++ b/tfw/components/ide/ide_handler.py @@ -33,7 +33,7 @@ BUILD_ARTIFACTS = ( class IdeHandler: - keys = ['ide.read', 'ide.write'] + keys = ['ide'] def __init__(self, *, patterns, initial_file=''): self.connector = None diff --git a/tfw/components/process_management/process_handler.py b/tfw/components/process_management/process_handler.py index 80e1a4e..33e58fa 100644 --- a/tfw/components/process_management/process_handler.py +++ b/tfw/components/process_management/process_handler.py @@ -9,7 +9,8 @@ LOG = logging.getLogger(__name__) class ProcessHandler(ProcessManager, ProcessLogManager): - keys = ['process.start', 'process.stop', 'process.restart'] + keys = ['process'] + def __init__(self, *, supervisor_uri, log_tail=0): ProcessManager.__init__(self, supervisor_uri) ProcessLogManager.__init__(self, supervisor_uri) @@ -38,4 +39,5 @@ class ProcessHandler(ProcessManager, ProcessLogManager): ) connector.send_message(message, scope=Scope.WEBSOCKET) except KeyError: - LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) + if not message['key'].startswith('process.log'): + LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) diff --git a/tfw/components/process_management/process_log_handler.py b/tfw/components/process_management/process_log_handler.py index 144b1be..9e3bb42 100644 --- a/tfw/components/process_management/process_log_handler.py +++ b/tfw/components/process_management/process_log_handler.py @@ -6,7 +6,7 @@ LOG = logging.getLogger(__name__) class ProcessLogHandler: - keys = ['process.log.set'] + keys = ['process.log'] def __init__(self, *, process_name, supervisor_uri, log_tail=0): self.connector, self._monitor = None, None diff --git a/tfw/components/snapshots/snapshot_handler.py b/tfw/components/snapshots/snapshot_handler.py index f992b07..0c8f797 100644 --- a/tfw/components/snapshots/snapshot_handler.py +++ b/tfw/components/snapshots/snapshot_handler.py @@ -6,8 +6,6 @@ from datetime import datetime from dateutil import parser as dateparser -from tfw.internals.networking import Scope - from .snapshot_provider import SnapshotProvider LOG = logging.getLogger(__name__) @@ -23,9 +21,9 @@ class SnapshotHandler: self.init_snapshot_providers(directories) self.command_handlers = { - 'take_snapshot': self.handle_take_snapshot, - 'restore_snapshot': self.handle_restore_snapshot, - 'exclude': self.handle_exclude + 'snapshot.take': self.handle_take_snapshot, + 'snapshot.restore': self.handle_restore_snapshot, + 'snapshot.exclude': self.handle_exclude } def init_snapshot_providers(self, directories): @@ -45,23 +43,20 @@ class SnapshotHandler: makedirs(git_dir, exist_ok=True) return git_dir - def handle_event(self, message, connector): + def handle_event(self, message, _): try: - data = message['data'] - message['data'] = self.command_handlers[data['command']](data) - connector.send_message(message, scope=Scope.WEBSOCKET) + self.command_handlers[message['key']](message) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) - def handle_take_snapshot(self, data): + def handle_take_snapshot(self, _): LOG.debug('Taking snapshots of directories %s', self.snapshot_providers.keys()) for provider in self.snapshot_providers.values(): provider.take_snapshot() - return data - def handle_restore_snapshot(self, data): + def handle_restore_snapshot(self, message): date = dateparser.parse( - data.get( + message.get( 'value', datetime.now().isoformat() ) @@ -73,13 +68,11 @@ class SnapshotHandler: ) for provider in self.snapshot_providers.values(): provider.restore_snapshot(date) - return data - def handle_exclude(self, data): - exclude_unix_patterns = data['value'] + def handle_exclude(self, message): + exclude_unix_patterns = message['value'] if not isinstance(exclude_unix_patterns, list): raise KeyError for provider in self.snapshot_providers.values(): provider.exclude = exclude_unix_patterns - return data diff --git a/tfw/components/terminal/terminal_handler.py b/tfw/components/terminal/terminal_handler.py index 7d3e041..dd17e97 100644 --- a/tfw/components/terminal/terminal_handler.py +++ b/tfw/components/terminal/terminal_handler.py @@ -7,7 +7,7 @@ LOG = logging.getLogger(__name__) class TerminalHandler: - keys = ['terminal.write'] + keys = ['terminal'] def __init__(self, *, port, user, working_directory, histfile): self.connector, self._historymonitor = None, None diff --git a/tfw/internals/networking/zmq_connector.py b/tfw/internals/networking/zmq_connector.py index 7dfa77c..99920c7 100644 --- a/tfw/internals/networking/zmq_connector.py +++ b/tfw/internals/networking/zmq_connector.py @@ -15,8 +15,6 @@ LOG = logging.getLogger(__name__) class ZMQDownlinkConnector: def __init__(self, connect_addr): - self.keys = [] - self._on_recv_callback = None self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB) self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_sub_socket.connect(connect_addr) @@ -25,19 +23,14 @@ class ZMQDownlinkConnector: def subscribe(self, *keys): for key in keys: self._zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, key) - self.keys.append(key) def unsubscribe(self, *keys): for key in keys: self._zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, key) - self.keys.remove(key) def register_callback(self, callback): - if callback: - self._on_recv_callback = callback - self._zmq_sub_stream.on_recv(with_deserialize_tfw_msg(self._on_recv)) - else: - self._zmq_sub_stream.on_recv(None) + callback = with_deserialize_tfw_msg(callback) if callback else None + self._zmq_sub_stream.on_recv(callback) def recv_message(self, *, block=True): if self._zmq_sub_stream.receiving(): @@ -48,11 +41,6 @@ class ZMQDownlinkConnector: except zmq.ZMQError: raise IOError("No data available to recv!") - def _on_recv(self, message): - key = message['key'] - if key in self.keys or '' in self.keys: - self._on_recv_callback(message) - def close(self): self._zmq_sub_stream.close()