mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-23 00:11:33 +00:00
Refactor 'mirror' message enveloping to use new serialization
This commit is contained in:
parent
aabaadc4c4
commit
9eb6717972
@ -3,7 +3,7 @@
|
|||||||
|
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
from tfw.networking import deserialize_all
|
from tfw.networking import deserialize_tfw_msg
|
||||||
from tfw.networking.event_handlers import ServerConnector
|
from tfw.networking.event_handlers import ServerConnector
|
||||||
|
|
||||||
|
|
||||||
@ -17,7 +17,8 @@ class EventHandlerBase(ABC):
|
|||||||
self.server_connector.register_callback(self.event_handler_callback)
|
self.server_connector.register_callback(self.event_handler_callback)
|
||||||
|
|
||||||
def event_handler_callback(self, msg_parts):
|
def event_handler_callback(self, msg_parts):
|
||||||
key, message = deserialize_all(*msg_parts)
|
message = deserialize_tfw_msg(*msg_parts)
|
||||||
|
key = message['key']
|
||||||
response = self.dispatch_handling(key, message)
|
response = self.dispatch_handling(key, message)
|
||||||
response['key'] = key
|
response['key'] = key
|
||||||
if response is None:
|
if response is None:
|
||||||
|
@ -30,8 +30,9 @@ class ServerUplinkConnector(ZMQConnectorBase):
|
|||||||
self._zmq_push_socket.connect('tcp://localhost:{}'.format(TFWENV.RECEIVER_PORT))
|
self._zmq_push_socket.connect('tcp://localhost:{}'.format(TFWENV.RECEIVER_PORT))
|
||||||
|
|
||||||
def send_to_eventhandler(self, message):
|
def send_to_eventhandler(self, message):
|
||||||
message['data']['key'] = message['key']
|
nested_message = {'key': message['key'], 'data': message.pop('data')}
|
||||||
message['key'] = 'mirror'
|
message['key'] = 'mirror'
|
||||||
|
message['data'] = nested_message
|
||||||
self.send(message)
|
self.send(message)
|
||||||
|
|
||||||
def send(self, message):
|
def send(self, message):
|
||||||
|
@ -4,7 +4,7 @@
|
|||||||
import zmq
|
import zmq
|
||||||
from zmq.eventloop.zmqstream import ZMQStream
|
from zmq.eventloop.zmqstream import ZMQStream
|
||||||
|
|
||||||
from tfw.networking import ZMQConnectorBase, serialize_all
|
from tfw.networking import ZMQConnectorBase, serialize_tfw_msg
|
||||||
from tfw.config import TFWENV
|
from tfw.config import TFWENV
|
||||||
from tfw.config.logs import logging
|
from tfw.config.logs import logging
|
||||||
|
|
||||||
@ -35,6 +35,4 @@ class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkCon
|
|||||||
self._zmq_pull_stream.on_recv(callback)
|
self._zmq_pull_stream.on_recv(callback)
|
||||||
|
|
||||||
def send_message(self, message: dict, key: str = None):
|
def send_message(self, message: dict, key: str = None):
|
||||||
if not key:
|
self._zmq_pub_socket.send_multipart(serialize_tfw_msg(message))
|
||||||
key = message.get('key', '')
|
|
||||||
self._zmq_pub_socket.send_multipart(serialize_all(key, message))
|
|
||||||
|
@ -40,13 +40,13 @@ class ZMQWebSocketHandler(WebSocketHandler, ABC):
|
|||||||
instance.write_message(message)
|
instance.write_message(message)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
keyhandlers[key](message['data'])
|
keyhandlers[key](message)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
LOG.error('Invalid mirror message format! Ignoring.')
|
LOG.error('Invalid mirror message format! Ignoring.')
|
||||||
|
|
||||||
def mirror(self, data):
|
def mirror(self, message):
|
||||||
key = data['key']
|
message = message['data']
|
||||||
self._event_handler_connector.send_message({'data': data}, key)
|
self._event_handler_connector.send_message(message, message['key'])
|
||||||
|
|
||||||
def on_message(self, message):
|
def on_message(self, message):
|
||||||
LOG.debug('Received on WebSocket: %s', message)
|
LOG.debug('Received on WebSocket: %s', message)
|
||||||
|
Loading…
Reference in New Issue
Block a user