From d214caff3d0a0c39cef0d659f0e44eed4776b117 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C3=A1lint=20Bokros?= Date: Fri, 17 Nov 2017 15:55:48 +0100 Subject: [PATCH] Add Component class describing an event handler --- component.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 component.py diff --git a/component.py b/component.py new file mode 100644 index 0000000..a4e2a05 --- /dev/null +++ b/component.py @@ -0,0 +1,31 @@ +import json +from functools import partial + +import zmq +from zmq.eventloop.zmqstream import ZMQStream +from zmq.eventloop import ioloop + +from config import RECEIVER_PORT, PUBLISHER_PORT + +ioloop.install() + + +class Component: + def __init__(self, anchor, event_handler, zmq_context=None): + self.anchor = anchor + self.event_handler = event_handler + self.zmq_context = zmq_context or zmq.Context.instance() + self.zmq_sub_socket = self.zmq_context.socket(zmq.SUB) + self.zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, anchor) + self.zmq_sub_socket.connect('tcp://localhost:{}'.format(PUBLISHER_PORT)) + self.zmq_sub_stream = ZMQStream(self.zmq_sub_socket) + self.zmq_push_socket = self.zmq_context.socket(zmq.PUSH) + self.zmq_push_socket.connect('tcp://localhost:{}'.format(RECEIVER_PORT)) + + def wrapper(msg_parts, handler): + anchor, message = msg_parts + data_json = json.loads(message) + response = json.dumps(handler(data_json)).encode('utf-8') + self.zmq_push_socket.send_multipart([anchor, response]) + + self.zmq_sub_stream.on_recv(partial(wrapper, handler=event_handler))