import logging import zmq from zmq.eventloop.zmqstream import ZMQStream from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg LOG = logging.getLogger(__name__) class ZMQDownlinkListener: def __init__(self, bind_addr): self._zmq_pull_socket = zmq.Context.instance().socket(zmq.PULL) self._zmq_pull_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket) self._zmq_pull_socket.bind(bind_addr) LOG.debug('Pull socket bound to %s', bind_addr) def register_callback(self, callback): callback = with_deserialize_tfw_msg(callback) self._zmq_pull_stream.on_recv(callback) def close(self): self._zmq_pull_stream.close() class ZMQUplinkListener: def __init__(self, bind_addr): self._zmq_pub_socket = zmq.Context.instance().socket(zmq.PUB) self._zmq_pub_socket.setsockopt(zmq.SNDHWM, 0) self._zmq_pub_socket.bind(bind_addr) LOG.debug('Pub socket bound to %s', bind_addr) def send_message(self, message: dict): self._zmq_pub_socket.send_multipart(serialize_tfw_msg(message)) def close(self): self._zmq_pub_socket.close() class ZMQListener(ZMQDownlinkListener, ZMQUplinkListener): def __init__(self, downlink_bind_addr, uplink_bind_addr): ZMQDownlinkListener.__init__(self, downlink_bind_addr) ZMQUplinkListener.__init__(self, uplink_bind_addr) def close(self): ZMQDownlinkListener.close(self) ZMQUplinkListener.close(self)