# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. import zmq from zmq.eventloop.zmqstream import ZMQStream from tfw.networking import ZMQConnectorBase, serialize_all from tfw.config import TFWENV from tfw.config.logs import logging LOG = logging.getLogger(__name__) class EventHandlerDownlinkConnector(ZMQConnectorBase): def __init__(self, zmq_context=None): super(EventHandlerDownlinkConnector, self).__init__(zmq_context) self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL) self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket) address = 'tcp://*:{}'.format(TFWENV.RECEIVER_PORT) self._zmq_pull_socket.bind(address) LOG.debug('Pull socket bound to %s', address) class EventHandlerUplinkConnector(ZMQConnectorBase): def __init__(self, zmq_context=None): super(EventHandlerUplinkConnector, self).__init__(zmq_context) self._zmq_pub_socket = self._zmq_context.socket(zmq.PUB) address = 'tcp://*:{}'.format(TFWENV.PUBLISHER_PORT) self._zmq_pub_socket.bind(address) LOG.debug('Pub socket bound to %s', address) class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector): def register_callback(self, callback): self._zmq_pull_stream.on_recv(callback) def send_message(self, message: dict, key: str = None): if not key: key = message.get('key', '') self._zmq_pub_socket.send_multipart(serialize_all(key, message))