diff --git a/lib/tfw/networking/server/tfw_server.py b/lib/tfw/networking/server/tfw_server.py index bdc5179..067bb6c 100644 --- a/lib/tfw/networking/server/tfw_server.py +++ b/lib/tfw/networking/server/tfw_server.py @@ -1,10 +1,13 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. +from abc import ABC, abstractmethod + from tornado.web import Application from tfw.networking.event_handlers import ServerUplinkConnector from tfw.networking.server import EventHandlerConnector +from tfw.networking import MessageSender from tfw.config.logs import logging from .zmq_websocket_proxy import ZMQWebSocketProxy @@ -24,10 +27,13 @@ class TFWServer: self.application = Application([( r'/ws', ZMQWebSocketProxy,{ 'event_handler_connector': self._event_handler_connector, - 'message_handlers': [self.handle_trigger] + 'message_handlers': [self.handle_trigger, self.handle_recover], + 'frontend_message_handlers': [self.save_frontend_messages] })] ) + self._frontend_messages = FrontendMessageStorage() + def handle_trigger(self, message): if 'trigger' in message: LOG.debug('Executing handler for trigger "%s"', message.get('trigger', '')) @@ -39,5 +45,43 @@ class TFWServer: } }) + def handle_recover(self, message): + if message['key'] == 'recover': + self._frontend_messages.replay_messages(self._uplink_connector) + + def save_frontend_messages(self, message): + self._frontend_messages.save_message(message) + def listen(self, port): self.application.listen(port) + + +class MessageStorage(ABC): + def __init__(self): + self.saved_messages = [] + + def save_message(self, message): + if self.filter_message(message): + self.saved_messages.extend(self.transform_message(message)) + + @abstractmethod + def filter_message(self, message): + raise NotImplementedError + + def transform_message(self, message): + yield message + + +class FrontendMessageStorage(MessageStorage): + def filter_message(self, message): + return message['key'] in ('message', 'dashboard', 'queueMessages') + + def transform_message(self, message): + if message['key'] == 'queueMessages': + yield from MessageSender.generate_messages_from_queue(message) + else: + yield message + + def replay_messages(self, connector): + for message in self.saved_messages: + connector.send(message)