import json from abc import ABC, abstractmethod from tornado.websocket import WebSocketHandler from tfw.networking.serialization import deserialize_all from tfw.networking.server.event_handler_connector import EventHandlerConnector from tfw.config.logs import logging LOG = logging.getLogger(__name__) class ZMQWebSocketHandler(WebSocketHandler, ABC): _event_handler_connector = EventHandlerConnector() instances = set() def prepare(self): ZMQWebSocketHandler.instances.add(self) def on_close(self): ZMQWebSocketHandler.instances.remove(self) def open(self, *args, **kwargs): LOG.debug('WebSocket connection initiated') self._event_handler_connector.register_callback(self.zmq_callback) @classmethod def zmq_callback(cls, msg_parts): keyhandlers = {'mirror': cls.mirror} key, data = deserialize_all(*msg_parts) LOG.debug('Received on pull socket: %s', data) if key not in keyhandlers: for instance in cls.instances: instance.write_message(data) else: try: keyhandlers[key](data['data']) except KeyError: LOG.error('Invalid mirror message format! Ignoring.') @classmethod def mirror(cls, data): key = data['key'] cls._event_handler_connector.send_message({'data': data}, key) def on_message(self, message): LOG.debug('Received on WebSocket: %s', message) self.send_message(self.make_response(message)) @abstractmethod def make_response(self, message): raise NotImplementedError def send_message(self, message: dict, key: str = None): self._event_handler_connector.send_message(message, key) # much secure, very cors, wow def check_origin(self, origin): return True class ZMQWebSocketProxy(ZMQWebSocketHandler): def initialize(self, make_response, proxy_filter): self._make_response = make_response self._proxy_filter = proxy_filter def on_message(self, message): message = json.loads(message) if self._proxy_filter(message): super().on_message(message) def make_response(self, message): return self._make_response(message)