diff --git a/component.py b/component.py new file mode 100644 index 0000000..a4e2a05 --- /dev/null +++ b/component.py @@ -0,0 +1,31 @@ +import json +from functools import partial + +import zmq +from zmq.eventloop.zmqstream import ZMQStream +from zmq.eventloop import ioloop + +from config import RECEIVER_PORT, PUBLISHER_PORT + +ioloop.install() + + +class Component: + def __init__(self, anchor, event_handler, zmq_context=None): + self.anchor = anchor + self.event_handler = event_handler + self.zmq_context = zmq_context or zmq.Context.instance() + self.zmq_sub_socket = self.zmq_context.socket(zmq.SUB) + self.zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, anchor) + self.zmq_sub_socket.connect('tcp://localhost:{}'.format(PUBLISHER_PORT)) + self.zmq_sub_stream = ZMQStream(self.zmq_sub_socket) + self.zmq_push_socket = self.zmq_context.socket(zmq.PUSH) + self.zmq_push_socket.connect('tcp://localhost:{}'.format(RECEIVER_PORT)) + + def wrapper(msg_parts, handler): + anchor, message = msg_parts + data_json = json.loads(message) + response = json.dumps(handler(data_json)).encode('utf-8') + self.zmq_push_socket.send_multipart([anchor, response]) + + self.zmq_sub_stream.on_recv(partial(wrapper, handler=event_handler))