# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. from abc import ABC, abstractmethod from tornado.web import Application from tfw.networking.event_handlers import ServerUplinkConnector from tfw.networking.server import EventHandlerConnector from tfw.networking import MessageSender from tfw.config.logs import logging from .zmq_websocket_proxy import ZMQWebSocketProxy LOG = logging.getLogger(__name__) class TFWServer: """ This class handles the proxying of messages between the frontend and event handers. It proxies messages from the "/ws" route to all event handlers subscribed to a ZMQ SUB socket. """ def __init__(self): self._event_handler_connector = EventHandlerConnector() self._uplink_connector = ServerUplinkConnector() self.application = Application([( r'/ws', ZMQWebSocketProxy, { 'event_handler_connector': self._event_handler_connector, 'message_handlers': [self.handle_trigger, self.handle_recover], 'frontend_message_handlers': [self.save_frontend_messages] })]) self._frontend_messages = FrontendMessageStorage() def handle_trigger(self, message): if 'trigger' in message: LOG.debug('Executing handler for trigger "%s"', message.get('trigger', '')) self._uplink_connector.send_to_eventhandler({ 'key': 'fsm', 'data': { 'command': 'trigger', 'value': message.get('trigger', '') } }) def handle_recover(self, message): if message['key'] == 'recover': self._frontend_messages.replay_messages(self._uplink_connector) def save_frontend_messages(self, message): self._frontend_messages.save_message(message) def listen(self, port): self.application.listen(port) class MessageStorage(ABC): def __init__(self): self.saved_messages = [] def save_message(self, message): if self.filter_message(message): self.saved_messages.extend(self.transform_message(message)) @abstractmethod def filter_message(self, message): raise NotImplementedError def transform_message(self, message): # pylint: disable=no-self-use yield message class FrontendMessageStorage(MessageStorage): def filter_message(self, message): key = message['key'] command = message.get('data', {}).get('command') return ( key in ('message', 'dashboard', 'queueMessages') or key == 'ide' and command in ('select', 'read') ) def transform_message(self, message): if message['key'] == 'queueMessages': yield from MessageSender.generate_messages_from_queue(message) else: yield message def replay_messages(self, connector): for message in self.saved_messages: connector.send(message)