diff --git a/lib/tfw/networking/serialization.py b/lib/tfw/networking/serialization.py index 6a6a3e7..c21fc47 100644 --- a/lib/tfw/networking/serialization.py +++ b/lib/tfw/networking/serialization.py @@ -25,7 +25,8 @@ import json def validate_message(message): - return 'key' in message + if 'key' not in message: + raise ValueError('Invalid TFW message format!') def serialize_tfw_msg(message): diff --git a/lib/tfw/networking/server/tfw_server.py b/lib/tfw/networking/server/tfw_server.py index 78cc8ee..46c9006 100644 --- a/lib/tfw/networking/server/tfw_server.py +++ b/lib/tfw/networking/server/tfw_server.py @@ -32,10 +32,8 @@ class TFWServer: self.application = Application([( r'/ws', ZMQWebSocketProxy,{ - 'make_eventhandler_message': self.make_eventhandler_message, - 'proxy_filter': self.proxy_filter, - 'handle_trigger': self.handle_trigger, - 'event_handler_connector': self._event_handler_connector + 'event_handler_connector': self._event_handler_connector, + 'message_handlers': [self.append_fsm_data, self.handle_trigger] })] ) # self.controller_responder = ControllerResponder(self.fsm) @@ -49,8 +47,7 @@ class TFWServer: def fsm_manager(self): return self._fsm_manager - def make_eventhandler_message(self, message): - self.trigger_fsm(message) + def append_fsm_data(self, message): message['FSMUpdate'] = self._fsm_updater.get_fsm_state_and_transitions() return message @@ -65,10 +62,6 @@ class TFWServer: except AttributeError: LOG.debug('FSM failed to execute nonexistent trigger: "%s"', trigger) - def proxy_filter(self, message): - # pylint: disable=unused-argument,no-self-use - return True - def listen(self, port): self.application.listen(port) diff --git a/lib/tfw/networking/server/zmq_websocket_handler.py b/lib/tfw/networking/server/zmq_websocket_handler.py index c842205..7eafe53 100644 --- a/lib/tfw/networking/server/zmq_websocket_handler.py +++ b/lib/tfw/networking/server/zmq_websocket_handler.py @@ -2,90 +2,109 @@ # All Rights Reserved. See LICENSE file for details. import json -from abc import ABC, abstractmethod from tornado.websocket import WebSocketHandler from tfw.networking import deserialize_tfw_msg, validate_message +from tfw.mixins import CallbackMixin from tfw.config.logs import logging LOG = logging.getLogger(__name__) -class ZMQWebSocketHandler(WebSocketHandler, ABC): +class TFWProxy: + def __init__(self, to_source, to_destination): + self.to_source = to_source + self.to_destination = to_destination + + self.proxy_filters = CallbackMixin() + self.proxy_callbacks = CallbackMixin() + + self.proxy_filters.subscribe_callback(validate_message) + + self.keyhandlers = { + 'mirror': self.mirror + } + + def __call__(self, message): + try: + self.proxy_filters._execute_callbacks(message) + except ValueError: + LOG.exception('Invalid TFW message received!') + return + + self.proxy_callbacks._execute_callbacks(message) + + if message['key'] not in self.keyhandlers: + self.to_destination(message) + else: + try: + self.keyhandlers[message['key']](message) + except KeyError: + LOG.error('Invalid mirror message format! Ignoring.') + + def mirror(self, message): + message = message['data'] + LOG.debug('Mirroring message: %s', message) + self.to_source(message) + + +class ZMQWebSocketProxy(WebSocketHandler): instances = set() def initialize(self, **kwargs): # pylint: disable=arguments-differ self._event_handler_connector = kwargs['event_handler_connector'] + self._message_handlers = kwargs.get('message_handlers', []) + self._proxy_filters = kwargs.get('proxy_filters', []) + + self.proxy_eventhandler_to_websocket = TFWProxy( + self.send_eventhandler_message, + self.send_websocket_message + ) + self.proxy_websocket_to_eventhandler = TFWProxy( + self.send_websocket_message, + self.send_eventhandler_message + ) + + proxies = (self.proxy_eventhandler_to_websocket, self.proxy_websocket_to_eventhandler) + for proxy in proxies: + proxy.proxy_filters.subscribe_callbacks(*self._proxy_filters) + proxy.proxy_callbacks.subscribe_callbacks(*self._message_handlers) def prepare(self): - ZMQWebSocketHandler.instances.add(self) + ZMQWebSocketProxy.instances.add(self) def on_close(self): - ZMQWebSocketHandler.instances.remove(self) + ZMQWebSocketProxy.instances.remove(self) def open(self, *args, **kwargs): LOG.debug('WebSocket connection initiated') self._event_handler_connector.register_callback(self.zmq_callback) def zmq_callback(self, msg_parts): - keyhandlers = {'mirror': self.mirror} - + """ + Invoked on ZMQ message. + """ message = deserialize_tfw_msg(*msg_parts) LOG.debug('Received on pull socket: %s', message) - if not validate_message(message): - return - - self.handle_trigger(message) - if message['key'] not in keyhandlers: - for instance in ZMQWebSocketHandler.instances: - instance.write_message(message) - else: - try: - keyhandlers[message['key']](message) - except KeyError: - LOG.error('Invalid mirror message format! Ignoring.') - - def mirror(self, message): - message = message['data'] - self._event_handler_connector.send_message(message) + self.proxy_eventhandler_to_websocket(message) def on_message(self, message): + """ + Invoked on WS message. + """ + message = json.loads(message) LOG.debug('Received on WebSocket: %s', message) - if validate_message(message): - self.send_message(self.make_eventhandler_message(message)) + self.proxy_websocket_to_eventhandler(message) - @abstractmethod - def make_eventhandler_message(self, message): - raise NotImplementedError - - def send_message(self, message: dict): + def send_eventhandler_message(self, message): self._event_handler_connector.send_message(message) - @abstractmethod - def handle_trigger(self, message): - raise NotImplementedError + @staticmethod + def send_websocket_message(message): + for instance in ZMQWebSocketProxy.instances: + instance.write_message(message) # much secure, very cors, wow def check_origin(self, origin): return True - - -class ZMQWebSocketProxy(ZMQWebSocketHandler): - # pylint: disable=abstract-method - def initialize(self, **kwargs): # pylint: disable=arguments-differ - super(ZMQWebSocketProxy, self).initialize(**kwargs) - self._make_eventhandler_message = kwargs['make_eventhandler_message'] - self._proxy_filter = kwargs['proxy_filter'] - self._handle_trigger = kwargs['handle_trigger'] - - def on_message(self, message): - message = json.loads(message) - if self._proxy_filter(message): - super().on_message(message) - - def make_eventhandler_message(self, message): - return self._make_eventhandler_message(message) - - def handle_trigger(self, message): - self._handle_trigger(message)