# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. import json from tornado.websocket import WebSocketHandler from tfw.networking import validate_message from tfw.mixins import CallbackMixin from tfw.config.logs import logging LOG = logging.getLogger(__name__) class TFWProxy: def __init__(self, to_source, to_destination): self.to_source = to_source self.to_destination = to_destination self.proxy_filters = CallbackMixin() self.proxy_callbacks = CallbackMixin() self.proxy_filters.subscribe_callback(validate_message) self.keyhandlers = { 'mirror': self.mirror, 'broadcast': self.broadcast } def __call__(self, message): try: self.proxy_filters._execute_callbacks(message) except ValueError: LOG.exception('Invalid TFW message received!') return self.proxy_callbacks._execute_callbacks(message) if message['key'] not in self.keyhandlers: self.to_destination(message) else: handler = self.keyhandlers[message['key']] try: handler(message) except KeyError: LOG.error('Invalid "%s" message format! Ignoring.', handler.__name__) def mirror(self, message): message = message['data'] LOG.debug('Mirroring message: %s', message) self.to_source(message) def broadcast(self, message): message = message['data'] LOG.debug('Broadcasting message: %s', message) self.to_source(message) self.to_destination(message) class ZMQWebSocketProxy(WebSocketHandler): instances = set() def initialize(self, **kwargs): # pylint: disable=arguments-differ self._event_handler_connector = kwargs['event_handler_connector'] self._message_handlers = kwargs.get('message_handlers', []) self._proxy_filters = kwargs.get('proxy_filters', []) self.proxy_eventhandler_to_websocket = TFWProxy( self.send_eventhandler_message, self.send_websocket_message ) self.proxy_websocket_to_eventhandler = TFWProxy( self.send_websocket_message, self.send_eventhandler_message ) proxies = (self.proxy_eventhandler_to_websocket, self.proxy_websocket_to_eventhandler) for proxy in proxies: proxy.proxy_filters.subscribe_callbacks(*self._proxy_filters) proxy.proxy_callbacks.subscribe_callbacks(*self._message_handlers) def prepare(self): ZMQWebSocketProxy.instances.add(self) def on_close(self): ZMQWebSocketProxy.instances.remove(self) def open(self, *args, **kwargs): LOG.debug('WebSocket connection initiated') self._event_handler_connector.register_callback(self.eventhander_callback) def eventhander_callback(self, message): """ Invoked on ZMQ messages from event handlers. """ LOG.debug('Received on pull socket: %s', message) self.proxy_eventhandler_to_websocket(message) def on_message(self, message): """ Invoked on WS messages from frontend. """ message = json.loads(message) LOG.debug('Received on WebSocket: %s', message) self.proxy_websocket_to_eventhandler(message) def send_eventhandler_message(self, message): self._event_handler_connector.send_message(message) @staticmethod def send_websocket_message(message): for instance in ZMQWebSocketProxy.instances: instance.write_message(message) # much secure, very cors, wow def check_origin(self, origin): return True