import zmq from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream from .serialization import serialize_all from ..config import PUBLISHER_PORT, RECEIVER_PORT from ..config.logs import logging log = logging.getLogger(__name__) from ..util import parse_anchor_from_message, ZMQConnectorBase ioloop.install() class EventHandlerDownlinkConnector(ZMQConnectorBase): def __init__(self, zmq_context=None): super(EventHandlerDownlinkConnector, self).__init__(zmq_context) self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL) self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket) address = 'tcp://*:{}'.format(RECEIVER_PORT) self._zmq_pull_socket.bind(address) log.debug('Pull socket bound to {}'.format(address)) class EventHandlerUplinkConnector(ZMQConnectorBase): def __init__(self, zmq_context=None): super(EventHandlerUplinkConnector, self).__init__(zmq_context) self._zmq_pub_socket = self._zmq_context.socket(zmq.PUB) address = 'tcp://*:{}'.format(PUBLISHER_PORT) self._zmq_pub_socket.bind(address) log.debug('Pub socket bound to {}'.format(address)) class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector): def __init__(self, zmq_context=None): super(EventHandlerConnector, self).__init__(zmq_context) def register_callback(self, callback): self._zmq_pull_stream.on_recv(callback) def send_message(self, message: str, anchor: str = None): if not anchor: anchor = parse_anchor_from_message(message) self._zmq_pub_socket.send_multipart(serialize_all(anchor, message))