# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. from functools import partial import zmq from zmq.eventloop.zmqstream import ZMQStream from tfw.networking.zmq_connector_base import ZMQConnectorBase from tfw.networking.serialization import serialize_tfw_msg, with_deserialize_tfw_msg from tfw.config import TFWENV from tfw.config.logs import logging LOG = logging.getLogger(__name__) class ServerDownlinkConnector(ZMQConnectorBase): def __init__(self, zmq_context=None): super(ServerDownlinkConnector, self).__init__(zmq_context) self._zmq_sub_socket = self._zmq_context.socket(zmq.SUB) self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUBLISHER_PORT}') self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE) self.unsubscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.UNSUBSCRIBE) def register_callback(self, callback): callback = with_deserialize_tfw_msg(callback) self._zmq_sub_stream.on_recv(callback) class ServerUplinkConnector(ZMQConnectorBase): """ Class capable of sending messages to the TFW server and event handlers. """ def __init__(self, zmq_context=None): super(ServerUplinkConnector, self).__init__(zmq_context) self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH) self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.RECEIVER_PORT}') def send_to_eventhandler(self, message): """ Send a message to an event handler through the TFW server. This envelopes the desired message in the 'data' field of the message to TFWServer, which will mirror it to event handlers. :param message: JSON message you want to send """ self.send({ 'key': 'mirror', 'data': message }) def send(self, message): """ Send a message to the frontend through the TFW server. :param message: JSON message you want to send """ self._zmq_push_socket.send_multipart(serialize_tfw_msg(message)) def broadcast(self, message): """ Broadast a message through the TFW server. This envelopes the desired message in the 'data' field of the message to TFWServer, which will broadast it. :param message: JSON message you want to send """ self.send({ 'key': 'broadcast', 'data': message }) class ServerConnector(ServerUplinkConnector, ServerDownlinkConnector): pass