diff --git a/src/components/stateful_component.py b/src/components/stateful_component.py new file mode 100644 index 0000000..f9e142b --- /dev/null +++ b/src/components/stateful_component.py @@ -0,0 +1,39 @@ +import json +from functools import partial + +import zmq + +from component_base import ComponentBase + + +class StatefulComponent(ComponentBase): + def __init__(self, anchor, event_handler, zmq_context=None): + super().__init__(anchor, event_handler, zmq_context) + self.generator = None + self.subscribe('reset') + + def wrapper(msg_parts, handler): + anchor, message = msg_parts + if anchor == b'reset': + self.generator = None + self.unsubscribe_all() + self.subscribe(self.anchor) + self.subscribe('reset') + return + data_json = json.loads(message) + if self.generator is None: + self.generator = handler(data_json, self) + response_anchor, response_data = next(self.generator) + if response_anchor is None: + return + if response_anchor not in self.subscriptions: + self.subscriptions.add(response_anchor) + self.zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, response_anchor) + response_data = json.dumps({ + 'anchor': response_anchor, + 'data': response_data, + }) + response = [r.encode('utf-8') for r in (response_anchor, response_data)] + self.zmq_push_socket.send_multipart(response) + + self.zmq_sub_stream.on_recv(partial(wrapper, handler=event_handler))