baseimage-tutorial-framework/src/event_handlers/stateful_event_handler.py

45 lines
1.5 KiB
Python
Raw Normal View History

import json
from functools import partial
import zmq
from event_handler_base import EventHandlerBase
class StatefulEventHandler(EventHandlerBase):
def __init__(self, anchor, event_handler_function, zmq_context=None):
super().__init__(anchor, zmq_context)
self.event_handler_function = event_handler_function
self.generator = None
self.subscribe('reset')
def event_handler_callback(msg_parts):
anchor, message = msg_parts
if anchor == b'reset':
2017-12-05 17:35:22 +00:00
self.reset()
return
data_json = json.loads(message)
if self.generator is None:
self.generator = self.event_handler_function(data_json, self)
response_anchor, response_data = next(self.generator)
if response_anchor not in self.subscriptions:
self.subscriptions.add(response_anchor)
self.zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, response_anchor)
response_data = json.dumps({
'anchor': response_anchor,
'data': response_data,
})
response = [r.encode('utf-8') for r in (response_anchor, response_data)]
self.zmq_push_socket.send_multipart(response)
self.zmq_sub_stream.on_recv(event_handler_callback)
2017-12-05 17:35:22 +00:00
def unsubscribe_all(self):
super().unsubscribe_all()
self.subscribe('reset')
def reset(self):
self.generator = None
self.unsubscribe_all()
self.subscribe(self.anchor)