From ecbb61a862863dc2ff5e44d3f6a56de09969341f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Fri, 23 Feb 2018 12:07:30 +0100 Subject: [PATCH] Overhaul WS->ZMQ proxying --- lib/tfw/fsm_base.py | 19 ++++++------ lib/tfw/networking/server/tfw_server.py | 29 +++++++++++++++++-- .../server/zmq_websocket_handler.py | 28 +++++++----------- 3 files changed, 46 insertions(+), 30 deletions(-) diff --git a/lib/tfw/fsm_base.py b/lib/tfw/fsm_base.py index fc1300f..04494e6 100644 --- a/lib/tfw/fsm_base.py +++ b/lib/tfw/fsm_base.py @@ -7,7 +7,7 @@ class FSMBase: states, transitions = [], [] def __init__(self, initial: str = None, accepted_states: List[str] = None): - self.message_handlers = [] + self.callbacks = [] self.accepted_states = accepted_states or [self.states[-1]] self.machine = Machine(model=self, states=self.states, @@ -15,18 +15,17 @@ class FSMBase: initial=initial or self.states[0], send_event=True, ignore_invalid_triggers=True, - after_state_change='forward_message') + after_state_change='execute_callbacks') - def forward_message(self, event_data): - message = event_data.kwargs.get('message') - for msghandler in self.message_handlers: - msghandler(message) + def execute_callbacks(self, event_data): + for callback in self.callbacks: + callback(event_data.kwargs) - def subscribe_message_handler(self, msghandler): - self.message_handlers.append(msghandler) + def subscribe(self, callback): + self.callbacks.append(callback) - def unsubscribe_message_handler(self, msghandler): - self.message_handlers.remove(msghandler) + def unsubscribe(self, callback): + self.callbacks.remove(callback) def is_solved(self): return self.state in self.accepted_states diff --git a/lib/tfw/networking/server/tfw_server.py b/lib/tfw/networking/server/tfw_server.py index 0129686..f025e00 100644 --- a/lib/tfw/networking/server/tfw_server.py +++ b/lib/tfw/networking/server/tfw_server.py @@ -1,14 +1,20 @@ from tornado.web import Application from tfw.networking.server.controller_responder import ControllerResponder -from tfw.networking.server.zmq_websocket_handler import FSMManagingSocketHandler +from tfw.networking.server.zmq_websocket_handler import ZMQWebSocketProxy +from tfw.networking.event_handlers.server_connector import ServerUplinkConnector +from tfw.config.logs import logging +log = logging.getLogger(__name__) class TFWServer: def __init__(self, fsm_type): self._fsm = fsm_type() + self.uplink = ServerUplinkConnector() + self._fsm.subscribe(self.fsm_callback) self.application = Application( - [(r'/ws', FSMManagingSocketHandler, {'fsm': self.fsm})] + [(r'/ws', ZMQWebSocketProxy, {'make_response': self.make_response, + 'proxy_filter': self.proxy_filter})] ) self.controller_responder = ControllerResponder(self.fsm) @@ -16,5 +22,24 @@ class TFWServer: def fsm(self): return self._fsm + def make_response(self, message): + return message + + def proxy_filter(self, message): + try: return self.fsm.trigger(message['key'], message=message) + except AttributeError: return False + + def fsm_callback(self, kwargs_dict): + response, key = self.generate_fsm_update() # TODO: unify (key,response) order + self.uplink.send(key, response) + + def generate_fsm_update(self): + key = 'FSMUpdate' + response = {'key': key, + 'data': {'current_state': self.fsm.state, + 'valid_transitions': + [{'trigger': trigger} for trigger in self.fsm.machine.get_triggers()]}} + return response, key + def listen(self, port): self.application.listen(port) diff --git a/lib/tfw/networking/server/zmq_websocket_handler.py b/lib/tfw/networking/server/zmq_websocket_handler.py index cf79129..de2932e 100644 --- a/lib/tfw/networking/server/zmq_websocket_handler.py +++ b/lib/tfw/networking/server/zmq_websocket_handler.py @@ -30,7 +30,7 @@ class ZMQWebSocketHandler(WebSocketHandler): def on_message(self, message): log.debug('Received on WebSocket: {}'.format(message)) - self.send_message(*self.make_response(json.loads(message))) + self.send_message(self.make_response(message)) def make_response(self, message): raise NotImplementedError @@ -43,23 +43,15 @@ class ZMQWebSocketHandler(WebSocketHandler): return True -class FSMManagingSocketHandler(ZMQWebSocketHandler): - def initialize(self, fsm): - self.fsm = fsm - self.fsm.subscribe_message_handler(self.handle_fsm_message) +class ZMQWebSocketProxy(ZMQWebSocketHandler): + def initialize(self, make_response, proxy_filter): + self._make_response = make_response + self._proxy_filter = proxy_filter - def on_close(self): - super().on_close() - self.fsm.unsubscribe_message_handler(self.handle_fsm_message) - - def handle_fsm_message(self, message): - self._event_handler_connector.send_message(message) + def on_message(self, message): + message = json.loads(message) + if self._proxy_filter(message): + super().on_message(message) def make_response(self, message): - self.fsm.trigger(message['key'], message=message) - key = 'FSMUpdate' - response = {'key': key, - 'data': {'current_state': self.fsm.state, - 'valid_transitions': - [{'trigger': trigger} for trigger in self.fsm.machine.get_triggers()]}} - return response, key + return self._make_response(message)