# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. from functools import partial import zmq from zmq.eventloop.zmqstream import ZMQStream from tfw.config import TFWENV from tfw.config.logs import logging from .scope import Scope from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg from .zmq_connector_base import ZMQConnectorBase LOG = logging.getLogger(__name__) class ServerDownlinkConnector(ZMQConnectorBase): def __init__(self, zmq_context=None): super(ServerDownlinkConnector, self).__init__(zmq_context) self._zmq_sub_socket = self._zmq_context.socket(zmq.SUB) self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUBLISHER_PORT}') self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE) self.unsubscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.UNSUBSCRIBE) def register_callback(self, callback): callback = with_deserialize_tfw_msg(callback) self._zmq_sub_stream.on_recv(callback) def close(self): self._zmq_sub_stream.close() class ServerUplinkConnector(ZMQConnectorBase): def __init__(self, zmq_context=None): super(ServerUplinkConnector, self).__init__(zmq_context) self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH) self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.RECEIVER_PORT}') self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0) def send_message(self, message, scope=Scope.ZMQ): message['scope'] = scope.value self._zmq_push_socket.send_multipart(serialize_tfw_msg(message)) def close(self): self._zmq_push_socket.close() class ServerConnector(ServerUplinkConnector, ServerDownlinkConnector): def close(self): ServerUplinkConnector.close(self) ServerDownlinkConnector.close(self)