mirror of
				https://github.com/avatao-content/baseimage-tutorial-framework
				synced 2025-11-04 12:12:55 +00:00 
			
		
		
		
	Completely rework TFWServer networking ✨🍰✨
This commit is contained in:
		@@ -32,10 +32,8 @@ class TFWServer:
 | 
			
		||||
 | 
			
		||||
        self.application = Application([(
 | 
			
		||||
            r'/ws', ZMQWebSocketProxy,{
 | 
			
		||||
                'make_eventhandler_message': self.make_eventhandler_message,
 | 
			
		||||
                'proxy_filter': self.proxy_filter,
 | 
			
		||||
                'handle_trigger': self.handle_trigger,
 | 
			
		||||
                'event_handler_connector': self._event_handler_connector
 | 
			
		||||
                'event_handler_connector': self._event_handler_connector,
 | 
			
		||||
                'message_handlers': [self.append_fsm_data, self.handle_trigger]
 | 
			
		||||
            })]
 | 
			
		||||
        )
 | 
			
		||||
        # self.controller_responder = ControllerResponder(self.fsm)
 | 
			
		||||
@@ -49,8 +47,7 @@ class TFWServer:
 | 
			
		||||
    def fsm_manager(self):
 | 
			
		||||
        return self._fsm_manager
 | 
			
		||||
 | 
			
		||||
    def make_eventhandler_message(self, message):
 | 
			
		||||
        self.trigger_fsm(message)
 | 
			
		||||
    def append_fsm_data(self, message):
 | 
			
		||||
        message['FSMUpdate'] = self._fsm_updater.get_fsm_state_and_transitions()
 | 
			
		||||
        return message
 | 
			
		||||
 | 
			
		||||
@@ -65,10 +62,6 @@ class TFWServer:
 | 
			
		||||
        except AttributeError:
 | 
			
		||||
            LOG.debug('FSM failed to execute nonexistent trigger: "%s"', trigger)
 | 
			
		||||
 | 
			
		||||
    def proxy_filter(self, message):
 | 
			
		||||
        # pylint: disable=unused-argument,no-self-use
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def listen(self, port):
 | 
			
		||||
        self.application.listen(port)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -2,90 +2,109 @@
 | 
			
		||||
# All Rights Reserved. See LICENSE file for details.
 | 
			
		||||
 | 
			
		||||
import json
 | 
			
		||||
from abc import ABC, abstractmethod
 | 
			
		||||
 | 
			
		||||
from tornado.websocket import WebSocketHandler
 | 
			
		||||
 | 
			
		||||
from tfw.networking import deserialize_tfw_msg, validate_message
 | 
			
		||||
from tfw.mixins import CallbackMixin
 | 
			
		||||
from tfw.config.logs import logging
 | 
			
		||||
 | 
			
		||||
LOG = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ZMQWebSocketHandler(WebSocketHandler, ABC):
 | 
			
		||||
class TFWProxy:
 | 
			
		||||
    def __init__(self, to_source, to_destination):
 | 
			
		||||
        self.to_source = to_source
 | 
			
		||||
        self.to_destination = to_destination
 | 
			
		||||
 | 
			
		||||
        self.proxy_filters = CallbackMixin()
 | 
			
		||||
        self.proxy_callbacks = CallbackMixin()
 | 
			
		||||
 | 
			
		||||
        self.proxy_filters.subscribe_callback(validate_message)
 | 
			
		||||
 | 
			
		||||
        self.keyhandlers = {
 | 
			
		||||
            'mirror': self.mirror
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    def __call__(self, message):
 | 
			
		||||
        try:
 | 
			
		||||
            self.proxy_filters._execute_callbacks(message)
 | 
			
		||||
        except ValueError:
 | 
			
		||||
            LOG.exception('Invalid TFW message received!')
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        self.proxy_callbacks._execute_callbacks(message)
 | 
			
		||||
 | 
			
		||||
        if message['key'] not in self.keyhandlers:
 | 
			
		||||
            self.to_destination(message)
 | 
			
		||||
        else:
 | 
			
		||||
            try:
 | 
			
		||||
                self.keyhandlers[message['key']](message)
 | 
			
		||||
            except KeyError:
 | 
			
		||||
                LOG.error('Invalid mirror message format! Ignoring.')
 | 
			
		||||
 | 
			
		||||
    def mirror(self, message):
 | 
			
		||||
        message = message['data']
 | 
			
		||||
        LOG.debug('Mirroring message: %s', message)
 | 
			
		||||
        self.to_source(message)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ZMQWebSocketProxy(WebSocketHandler):
 | 
			
		||||
    instances = set()
 | 
			
		||||
 | 
			
		||||
    def initialize(self, **kwargs): # pylint: disable=arguments-differ
 | 
			
		||||
        self._event_handler_connector = kwargs['event_handler_connector']
 | 
			
		||||
        self._message_handlers = kwargs.get('message_handlers', [])
 | 
			
		||||
        self._proxy_filters = kwargs.get('proxy_filters', [])
 | 
			
		||||
 | 
			
		||||
        self.proxy_eventhandler_to_websocket = TFWProxy(
 | 
			
		||||
            self.send_eventhandler_message,
 | 
			
		||||
            self.send_websocket_message
 | 
			
		||||
        )
 | 
			
		||||
        self.proxy_websocket_to_eventhandler = TFWProxy(
 | 
			
		||||
            self.send_websocket_message,
 | 
			
		||||
            self.send_eventhandler_message
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        proxies = (self.proxy_eventhandler_to_websocket, self.proxy_websocket_to_eventhandler)
 | 
			
		||||
        for proxy in proxies:
 | 
			
		||||
            proxy.proxy_filters.subscribe_callbacks(*self._proxy_filters)
 | 
			
		||||
            proxy.proxy_callbacks.subscribe_callbacks(*self._message_handlers)
 | 
			
		||||
 | 
			
		||||
    def prepare(self):
 | 
			
		||||
        ZMQWebSocketHandler.instances.add(self)
 | 
			
		||||
        ZMQWebSocketProxy.instances.add(self)
 | 
			
		||||
 | 
			
		||||
    def on_close(self):
 | 
			
		||||
        ZMQWebSocketHandler.instances.remove(self)
 | 
			
		||||
        ZMQWebSocketProxy.instances.remove(self)
 | 
			
		||||
 | 
			
		||||
    def open(self, *args, **kwargs):
 | 
			
		||||
        LOG.debug('WebSocket connection initiated')
 | 
			
		||||
        self._event_handler_connector.register_callback(self.zmq_callback)
 | 
			
		||||
 | 
			
		||||
    def zmq_callback(self, msg_parts):
 | 
			
		||||
        keyhandlers = {'mirror': self.mirror}
 | 
			
		||||
 | 
			
		||||
        """
 | 
			
		||||
        Invoked on ZMQ message.
 | 
			
		||||
        """
 | 
			
		||||
        message = deserialize_tfw_msg(*msg_parts)
 | 
			
		||||
        LOG.debug('Received on pull socket: %s', message)
 | 
			
		||||
        if not validate_message(message):
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        self.handle_trigger(message)
 | 
			
		||||
        if message['key'] not in keyhandlers:
 | 
			
		||||
            for instance in ZMQWebSocketHandler.instances:
 | 
			
		||||
                instance.write_message(message)
 | 
			
		||||
        else:
 | 
			
		||||
            try:
 | 
			
		||||
                keyhandlers[message['key']](message)
 | 
			
		||||
            except KeyError:
 | 
			
		||||
                LOG.error('Invalid mirror message format! Ignoring.')
 | 
			
		||||
 | 
			
		||||
    def mirror(self, message):
 | 
			
		||||
        message = message['data']
 | 
			
		||||
        self._event_handler_connector.send_message(message)
 | 
			
		||||
        self.proxy_eventhandler_to_websocket(message)
 | 
			
		||||
 | 
			
		||||
    def on_message(self, message):
 | 
			
		||||
        """
 | 
			
		||||
        Invoked on WS message.
 | 
			
		||||
        """
 | 
			
		||||
        message = json.loads(message)
 | 
			
		||||
        LOG.debug('Received on WebSocket: %s', message)
 | 
			
		||||
        if validate_message(message):
 | 
			
		||||
            self.send_message(self.make_eventhandler_message(message))
 | 
			
		||||
        self.proxy_websocket_to_eventhandler(message)
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def make_eventhandler_message(self, message):
 | 
			
		||||
        raise NotImplementedError
 | 
			
		||||
 | 
			
		||||
    def send_message(self, message: dict):
 | 
			
		||||
    def send_eventhandler_message(self, message):
 | 
			
		||||
        self._event_handler_connector.send_message(message)
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def handle_trigger(self, message):
 | 
			
		||||
        raise NotImplementedError
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def send_websocket_message(message):
 | 
			
		||||
        for instance in ZMQWebSocketProxy.instances:
 | 
			
		||||
            instance.write_message(message)
 | 
			
		||||
 | 
			
		||||
    # much secure, very cors, wow
 | 
			
		||||
    def check_origin(self, origin):
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ZMQWebSocketProxy(ZMQWebSocketHandler):
 | 
			
		||||
    # pylint: disable=abstract-method
 | 
			
		||||
    def initialize(self, **kwargs): # pylint: disable=arguments-differ
 | 
			
		||||
        super(ZMQWebSocketProxy, self).initialize(**kwargs)
 | 
			
		||||
        self._make_eventhandler_message = kwargs['make_eventhandler_message']
 | 
			
		||||
        self._proxy_filter = kwargs['proxy_filter']
 | 
			
		||||
        self._handle_trigger = kwargs['handle_trigger']
 | 
			
		||||
 | 
			
		||||
    def on_message(self, message):
 | 
			
		||||
        message = json.loads(message)
 | 
			
		||||
        if self._proxy_filter(message):
 | 
			
		||||
            super().on_message(message)
 | 
			
		||||
 | 
			
		||||
    def make_eventhandler_message(self, message):
 | 
			
		||||
        return self._make_eventhandler_message(message)
 | 
			
		||||
 | 
			
		||||
    def handle_trigger(self, message):
 | 
			
		||||
        self._handle_trigger(message)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user