diff --git a/lib/tfw/event_handler_base.py b/lib/tfw/event_handler_base.py index dbff745..1ab3192 100644 --- a/lib/tfw/event_handler_base.py +++ b/lib/tfw/event_handler_base.py @@ -1,5 +1,3 @@ -import json - from .networking.serialization import deserialize_all from .networking.server_connector import ServerConnector @@ -15,10 +13,9 @@ class EventHandlerBase: def event_handler_callback(self, msg_parts): anchor, message = deserialize_all(*msg_parts) - data_json = json.loads(message) - response = self.handle_event(anchor, data_json) if anchor != 'reset' else self.handle_reset(data_json) + response = self.handle_event(anchor, message) if anchor != 'reset' else self.handle_reset(message) if response is None: return - self.server_connector.send(anchor, json.dumps(response)) + self.server_connector.send(anchor, response) def handle_event(self, anchor, data_json): raise NotImplementedError @@ -31,7 +28,7 @@ class EventHandlerBase: 'anchor': anchor, 'data': data } - self.server_connector.send(anchor, json.dumps(message)) + self.server_connector.send(anchor, message) def subscribe(self, anchor): if anchor not in self.subscriptions: diff --git a/lib/tfw/message_sender.py b/lib/tfw/message_sender.py index 6b44f65..d5cb528 100644 --- a/lib/tfw/message_sender.py +++ b/lib/tfw/message_sender.py @@ -1,4 +1,3 @@ -import json from datetime import datetime from .networking.server_connector import ServerUplinkConnector @@ -19,4 +18,4 @@ class MessageSender: 'anchor': self.anchor, 'data': data } - self.server_connector.send(self.anchor, json.dumps(response)) + self.server_connector.send(self.anchor, response) diff --git a/lib/tfw/networking/event_handler_connector.py b/lib/tfw/networking/event_handler_connector.py index 3099ff6..c324961 100644 --- a/lib/tfw/networking/event_handler_connector.py +++ b/lib/tfw/networking/event_handler_connector.py @@ -6,7 +6,7 @@ from .serialization import serialize_all from ..config import PUBLISHER_PORT, RECEIVER_PORT from ..config.logs import logging log = logging.getLogger(__name__) -from ..util import parse_anchor_from_message, ZMQConnectorBase +from ..util import ZMQConnectorBase ioloop.install() @@ -38,7 +38,7 @@ class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkCon def register_callback(self, callback): self._zmq_pull_stream.on_recv(callback) - def send_message(self, message: str, anchor: str = None): + def send_message(self, message: dict, anchor: str = None): if not anchor: - anchor = parse_anchor_from_message(message) + anchor = message['anchor'] self._zmq_pub_socket.send_multipart(serialize_all(anchor, message)) diff --git a/lib/tfw/networking/serialization.py b/lib/tfw/networking/serialization.py index 33120e0..7333873 100644 --- a/lib/tfw/networking/serialization.py +++ b/lib/tfw/networking/serialization.py @@ -1,3 +1,6 @@ +import json + + def encode_if_needed(value): if isinstance(value, str): value = value.encode('utf-8') @@ -10,9 +13,9 @@ def decode_if_needed(value): return value -def serialize_all(*args): - return [encode_if_needed(a) for a in args] +def serialize_all(anchor, message): + return [encode_if_needed(a) for a in (anchor, json.dumps(message))] -def deserialize_all(*args): - return [decode_if_needed(a) for a in args] +def deserialize_all(anchor, message): + return decode_if_needed(anchor), json.loads(message) diff --git a/lib/tfw/networking/zmq_websocket_handler.py b/lib/tfw/networking/zmq_websocket_handler.py index 7f58ff8..0b64b0e 100644 --- a/lib/tfw/networking/zmq_websocket_handler.py +++ b/lib/tfw/networking/zmq_websocket_handler.py @@ -1,8 +1,8 @@ import json + from tornado.websocket import WebSocketHandler from .serialization import deserialize_all -from ..util import parse_anchor_from_message from .event_handler_connector import EventHandlerConnector from ..config.logs import logging log = logging.getLogger(__name__) @@ -27,12 +27,12 @@ class ZMQWebSocketHandler(WebSocketHandler): def on_message(self, message): log.debug('Received on WebSocket: {}'.format(message)) - self.send_message(*self.make_response(message)) + self.send_message(*self.make_response(json.loads(message))) def make_response(self, message): raise NotImplementedError - def send_message(self, message: str, anchor: str = None): + def send_message(self, message: dict, anchor: str = None): self._event_handler_connector.send_message(message, anchor) def on_close(self): @@ -53,10 +53,10 @@ class FSMManagingSocketHandler(ZMQWebSocketHandler): self._event_handler_connector.send_message(message) def make_response(self, message): - self.fsm.trigger(parse_anchor_from_message(message), message=message) + self.fsm.trigger(message['anchor'], message=message) anchor = 'FSMUpdate' response = {'anchor': anchor, 'data': {'current_state': self.fsm.state, 'valid_transitions': [{'trigger': trigger} for trigger in self.fsm.machine.get_triggers()]}} - return json.dumps(response), anchor + return response, anchor diff --git a/lib/tfw/util.py b/lib/tfw/util.py index 5aa685b..c2b1446 100644 --- a/lib/tfw/util.py +++ b/lib/tfw/util.py @@ -3,11 +3,6 @@ import json, xmlrpc.client, zmq from .config.envvars import SUPERVISOR_HTTP_URI -def parse_anchor_from_message(message): - message_json = json.loads(message) - return message_json['anchor'] - - def create_source_code_response_data(filename, content, language): return { 'filename': filename,