mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2025-01-22 22:01:55 +00:00
Refactor handling of shared EventHandlerConnector ownership to nicen imports
This commit is contained in:
parent
1f2e1f0489
commit
64bd2f1ba0
@ -5,3 +5,5 @@ from .serialization import decode_if_needed, encode_if_needed, serialize_all, de
|
||||
from .zmq_connector_base import ZMQConnectorBase
|
||||
from .controller_connector import ControllerConnector
|
||||
from .message_sender import MessageSender
|
||||
from .event_handlers.server_connector import ServerUplinkConnector
|
||||
from .server.tfw_server import TFWServer
|
||||
|
@ -7,6 +7,7 @@ from tornado.web import Application
|
||||
|
||||
from tfw.networking import MessageSender
|
||||
from tfw.networking.event_handlers import ServerUplinkConnector
|
||||
from tfw.networking.server import EventHandlerConnector
|
||||
from tfw.config.logs import logging
|
||||
from .zmq_websocket_handler import ZMQWebSocketProxy
|
||||
|
||||
@ -19,10 +20,12 @@ class TFWServer:
|
||||
self._fsm_updater = FSMUpdater(self._fsm)
|
||||
self._fsm_manager = FSMManager(self._fsm)
|
||||
self._fsm.subscribe_callback(self._fsm_updater.update)
|
||||
self._event_handler_connector = EventHandlerConnector()
|
||||
|
||||
self.application = Application(
|
||||
[(r'/ws', ZMQWebSocketProxy, {'make_response': self.make_response,
|
||||
'proxy_filter': self.proxy_filter})]
|
||||
'proxy_filter': self.proxy_filter,
|
||||
'event_handler_connector': self._event_handler_connector})]
|
||||
)
|
||||
#self.controller_responder = ControllerResponder(self.fsm) TODO: add this once controller stuff is resolved
|
||||
|
||||
|
@ -8,15 +8,16 @@ from tornado.websocket import WebSocketHandler
|
||||
|
||||
from tfw.networking import deserialize_all
|
||||
from tfw.config.logs import logging
|
||||
from .event_handler_connector import EventHandlerConnector
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ZMQWebSocketHandler(WebSocketHandler, ABC):
|
||||
_event_handler_connector = EventHandlerConnector()
|
||||
instances = set()
|
||||
|
||||
def initialize(self, **kwargs):
|
||||
self._event_handler_connector = kwargs['event_handler_connector']
|
||||
|
||||
def prepare(self):
|
||||
ZMQWebSocketHandler.instances.add(self)
|
||||
|
||||
@ -27,14 +28,13 @@ class ZMQWebSocketHandler(WebSocketHandler, ABC):
|
||||
LOG.debug('WebSocket connection initiated')
|
||||
self._event_handler_connector.register_callback(self.zmq_callback)
|
||||
|
||||
@classmethod
|
||||
def zmq_callback(cls, msg_parts):
|
||||
keyhandlers = {'mirror': cls.mirror}
|
||||
def zmq_callback(self, msg_parts):
|
||||
keyhandlers = {'mirror': self.mirror}
|
||||
|
||||
key, data = deserialize_all(*msg_parts)
|
||||
LOG.debug('Received on pull socket: %s', data)
|
||||
if key not in keyhandlers:
|
||||
for instance in cls.instances:
|
||||
for instance in ZMQWebSocketHandler.instances:
|
||||
instance.write_message(data)
|
||||
else:
|
||||
try:
|
||||
@ -42,10 +42,9 @@ class ZMQWebSocketHandler(WebSocketHandler, ABC):
|
||||
except KeyError:
|
||||
LOG.error('Invalid mirror message format! Ignoring.')
|
||||
|
||||
@classmethod
|
||||
def mirror(cls, data):
|
||||
def mirror(self, data):
|
||||
key = data['key']
|
||||
cls._event_handler_connector.send_message({'data': data}, key)
|
||||
self._event_handler_connector.send_message({'data': data}, key)
|
||||
|
||||
def on_message(self, message):
|
||||
LOG.debug('Received on WebSocket: %s', message)
|
||||
@ -65,10 +64,11 @@ class ZMQWebSocketHandler(WebSocketHandler, ABC):
|
||||
|
||||
class ZMQWebSocketProxy(ZMQWebSocketHandler):
|
||||
# pylint: disable=abstract-method
|
||||
def initialize(self, make_response, proxy_filter):
|
||||
def initialize(self, **kwargs):
|
||||
# pylint: disable=arguments-differ
|
||||
self._make_response = make_response
|
||||
self._proxy_filter = proxy_filter
|
||||
super(ZMQWebSocketProxy, self).initialize(**kwargs)
|
||||
self._make_response = kwargs['make_response']
|
||||
self._proxy_filter = kwargs['proxy_filter']
|
||||
|
||||
def on_message(self, message):
|
||||
message = json.loads(message)
|
||||
|
Loading…
Reference in New Issue
Block a user