import logging import zmq from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream from config import PUBLISHER_PORT, RECEIVER_PORT from util import parse_anchor_from_message ioloop.install() _zmq_context = zmq.Context.instance() _zmq_pull_socket = _zmq_context.socket(zmq.PULL) _zmq_pull_stream = ZMQStream(_zmq_pull_socket) _zmq_pub_socket = _zmq_context.socket(zmq.PUB) pub_socket_address = 'tcp://*:{}'.format(PUBLISHER_PORT) _zmq_pub_socket.bind(pub_socket_address) logging.debug('Pub socket bound to {}'.format(pub_socket_address)) pull_socket_address = 'tcp://*:{}'.format(RECEIVER_PORT) _zmq_pull_socket.bind(pull_socket_address) logging.debug('Pull socket bound to {}'.format(pull_socket_address)) def register_callback(callback): _zmq_pull_stream.on_recv(callback) def send_message(message: str, anchor: str = None): if not anchor: anchor = parse_anchor_from_message(message) encoded_message = [part.encode('utf-8') for part in (anchor, message)] _zmq_pub_socket.send_multipart(encoded_message)