Create StatefulComponent which yields values from a generator instead of returning a static value

This commit is contained in:
Bálint Bokros 2017-11-27 18:36:02 +01:00
parent 726fc165b0
commit 40914e767f

View File

@ -0,0 +1,39 @@
import json
from functools import partial
import zmq
from component_base import ComponentBase
class StatefulComponent(ComponentBase):
def __init__(self, anchor, event_handler, zmq_context=None):
super().__init__(anchor, event_handler, zmq_context)
self.generator = None
self.subscribe('reset')
def wrapper(msg_parts, handler):
anchor, message = msg_parts
if anchor == b'reset':
self.generator = None
self.unsubscribe_all()
self.subscribe(self.anchor)
self.subscribe('reset')
return
data_json = json.loads(message)
if self.generator is None:
self.generator = handler(data_json, self)
response_anchor, response_data = next(self.generator)
if response_anchor is None:
return
if response_anchor not in self.subscriptions:
self.subscriptions.add(response_anchor)
self.zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, response_anchor)
response_data = json.dumps({
'anchor': response_anchor,
'data': response_data,
})
response = [r.encode('utf-8') for r in (response_anchor, response_data)]
self.zmq_push_socket.send_multipart(response)
self.zmq_sub_stream.on_recv(partial(wrapper, handler=event_handler))