import json from functools import partial import zmq from event_handler_base import EventHandlerBase class StatefulEventHandler(EventHandlerBase): def __init__(self, anchor, event_handler, zmq_context=None): super().__init__(anchor, event_handler, zmq_context) self.generator = None self.subscribe('reset') def wrapper(msg_parts, handler): anchor, message = msg_parts if anchor == b'reset': self.generator = None self.unsubscribe_all() self.subscribe(self.anchor) self.subscribe('reset') return data_json = json.loads(message) if self.generator is None: self.generator = handler(data_json, self) response_anchor, response_data = next(self.generator) if response_anchor is None: return if response_anchor not in self.subscriptions: self.subscriptions.add(response_anchor) self.zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, response_anchor) response_data = json.dumps({ 'anchor': response_anchor, 'data': response_data, }) response = [r.encode('utf-8') for r in (response_anchor, response_data)] self.zmq_push_socket.send_multipart(response) self.zmq_sub_stream.on_recv(partial(wrapper, handler=event_handler))