import json from event_handler_base import EventHandlerBase class StatefulEventHandler(EventHandlerBase): def __init__(self, anchor, event_handler_function, zmq_context=None): super().__init__(anchor, zmq_context) self.event_handler_function = event_handler_function self.generator = None self.subscribe('reset') def event_handler_callback(msg_parts): anchor, message = msg_parts if anchor == b'reset': self.reset() return data_json = json.loads(message) if self.generator is None: self.generator = self.event_handler_function(data_json) try: response_anchor, response_data = next(self.generator) except StopIteration: self.reset() return self.subscribe(response_anchor) response_data = json.dumps({ 'anchor': response_anchor, 'data': response_data, }) response = [r.encode('utf-8') for r in (response_anchor, response_data)] self.zmq_push_socket.send_multipart(response) self.zmq_sub_stream.on_recv(event_handler_callback) def unsubscribe_all(self): super().unsubscribe_all() self.subscribe('reset') def reset(self): self.generator = None self.unsubscribe_all() self.subscribe(self.anchor)