From 40914e767f7f421734ad6af9d672f57d6c8f5906 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C3=A1lint=20Bokros?= Date: Mon, 27 Nov 2017 18:36:02 +0100 Subject: [PATCH] Create StatefulComponent which yields values from a generator instead of returning a static value --- src/components/stateful_component.py | 39 ++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 src/components/stateful_component.py diff --git a/src/components/stateful_component.py b/src/components/stateful_component.py new file mode 100644 index 0000000..f9e142b --- /dev/null +++ b/src/components/stateful_component.py @@ -0,0 +1,39 @@ +import json +from functools import partial + +import zmq + +from component_base import ComponentBase + + +class StatefulComponent(ComponentBase): + def __init__(self, anchor, event_handler, zmq_context=None): + super().__init__(anchor, event_handler, zmq_context) + self.generator = None + self.subscribe('reset') + + def wrapper(msg_parts, handler): + anchor, message = msg_parts + if anchor == b'reset': + self.generator = None + self.unsubscribe_all() + self.subscribe(self.anchor) + self.subscribe('reset') + return + data_json = json.loads(message) + if self.generator is None: + self.generator = handler(data_json, self) + response_anchor, response_data = next(self.generator) + if response_anchor is None: + return + if response_anchor not in self.subscriptions: + self.subscriptions.add(response_anchor) + self.zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, response_anchor) + response_data = json.dumps({ + 'anchor': response_anchor, + 'data': response_data, + }) + response = [r.encode('utf-8') for r in (response_anchor, response_data)] + self.zmq_push_socket.send_multipart(response) + + self.zmq_sub_stream.on_recv(partial(wrapper, handler=event_handler))