# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. from abc import ABC, abstractmethod from contextlib import suppress from tornado.web import Application from tfw.networking.server.zmq_websocket_proxy import ZMQWebSocketProxy from tfw.networking.event_handlers.server_connector import ServerUplinkConnector from tfw.networking.server.event_handler_connector import EventHandlerConnector from tfw.networking.message_sender import MessageSender from tfw.networking.fsm_aware import FSMAware from tfw.crypto import KeyManager, verify_message, sign_message from tfw.config.logs import logging LOG = logging.getLogger(__name__) class TFWServer(FSMAware): """ This class handles the proxying of messages between the frontend and event handers. It proxies messages from the "/ws" route to all event handlers subscribed to a ZMQ SUB socket. """ def __init__(self): super().__init__() self._event_handler_connector = EventHandlerConnector() self._uplink_connector = ServerUplinkConnector() self._auth_key = KeyManager().auth_key self.application = Application([( r'/ws', ZMQWebSocketProxy, { 'event_handler_connector': self._event_handler_connector, 'proxy_filters_and_callbacks': { 'message_handlers': [ self.handle_trigger, self.handle_recover, self.handle_fsm_update ], 'frontend_message_handlers': [self.save_frontend_messages] } } )]) self._frontend_messages = FrontendMessageStorage() def handle_trigger(self, message): if 'trigger' in message: LOG.debug('Executing handler for trigger "%s"', message.get('trigger', '')) fsm_eh_command = { 'key': 'fsm', 'data': { 'command': 'trigger', 'value': message['trigger'] } } if verify_message(self._auth_key, message): sign_message(self._auth_key, fsm_eh_command) self._uplink_connector.send_to_eventhandler(fsm_eh_command) def handle_recover(self, message): if message['key'] == 'recover': self._frontend_messages.replay_messages(self._uplink_connector) self._frontend_messages.clear() def handle_fsm_update(self, message): self.update_fsm_data(message) def save_frontend_messages(self, message): self._frontend_messages.save_message(message) def listen(self, port): self.application.listen(port) class MessageStorage(ABC): def __init__(self): self.saved_messages = [] def save_message(self, message): with suppress(KeyError, AttributeError): if self.filter_message(message): self.saved_messages.extend(self.transform_message(message)) @abstractmethod def filter_message(self, message): raise NotImplementedError def transform_message(self, message): # pylint: disable=no-self-use yield message def clear(self): self.saved_messages.clear() class FrontendMessageStorage(MessageStorage): def filter_message(self, message): key = message['key'] command = message.get('data', {}).get('command') return ( key in ('message', 'dashboard', 'queueMessages') or key == 'ide' and command in ('select', 'read') ) def transform_message(self, message): if message['key'] == 'queueMessages': yield from MessageSender.generate_messages_from_queue(message) else: yield message def replay_messages(self, connector): for message in self.saved_messages: connector.send(message)