""" TFW JSON message format message: { "key": string, # addressing "data": {...}, # payload "trigger": string # FSM trigger } ZeroMQ's sub-pub sockets use enveloped messages (http://zguide.zeromq.org/page:all#Pub-Sub-Message-Envelopes) and TFW also uses them internally. This means that on ZMQ sockets we always send the messages key separately and then the actual message (which contains the key as well) like so: socket.send_multipart([message['key'], message]) The purpose of this module is abstracting away this low level behaviour. """ import json from functools import wraps def serialize_tfw_msg(message): """ Create TFW multipart data from message dict """ return _serialize_all(message['key'], message) def with_deserialize_tfw_msg(fun): @wraps(fun) def wrapper(message_parts): message = deserialize_tfw_msg(*message_parts) return fun(message) return wrapper def deserialize_tfw_msg(*args): """ Return message from TFW multipart data """ envelope = _deserialize_all(*args) return _repair_if_needed(envelope) def _serialize_all(*args): return tuple( _serialize_single(arg) for arg in args ) def _deserialize_all(*args): return tuple( _deserialize_single(arg) for arg in args ) def _serialize_single(data): """ Return input as bytes (serialize input if it is JSON) """ if not isinstance(data, str): data = message_bytes(data) return _encode_if_needed(data) def message_bytes(message): return json.dumps(message, sort_keys=True).encode() def _deserialize_single(data): """ Try parsing input as JSON, return it as string if parsing fails. """ try: return json.loads(data) except ValueError: return _decode_if_needed(data) def _repair_if_needed(envelope): """ Quick fix for broken messages received from separate processes. """ if len(envelope) == 2: return envelope[1] for part in envelope: if isinstance(part, dict): return part return {} def _encode_if_needed(value): """ Return input as bytes (encode if input is string) """ if isinstance(value, str): value = value.encode('utf-8') return value def _decode_if_needed(value): """ Return input as string (decode if input is bytes) """ if isinstance(value, (bytes, bytearray)): value = value.decode('utf-8') return value