From 726fc165b0c6f8b41e1ea13ccdbc3a2c7432e7ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C3=A1lint=20Bokros?= Date: Mon, 27 Nov 2017 18:34:26 +0100 Subject: [PATCH] Add subscription management to ComponentBase --- src/components/component_base.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/components/component_base.py b/src/components/component_base.py index 16626ae..3139e95 100644 --- a/src/components/component_base.py +++ b/src/components/component_base.py @@ -14,7 +14,8 @@ class ComponentBase: self.event_handler = event_handler self.zmq_context = zmq_context or zmq.Context.instance() self.zmq_sub_socket = self.zmq_context.socket(zmq.SUB) - self.zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, anchor) + self.subscriptions = {self.anchor} + self.zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, self.anchor) self.zmq_sub_socket.connect('tcp://localhost:{}'.format(PUBLISHER_PORT)) self.zmq_sub_stream = ZMQStream(self.zmq_sub_socket) self.zmq_push_socket = self.zmq_context.socket(zmq.PUSH) @@ -29,3 +30,19 @@ class ComponentBase: encoded_message = json.dumps(message).encode('utf-8') self.zmq_push_socket.send_multipart([encoded_anchor, encoded_message]) + def subscribe(self, anchor): + if anchor not in self.subscriptions: + self.subscriptions.add(anchor) + self.zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, anchor) + + def unsubscribe(self, anchor): + try: + self.subscriptions.remove(anchor) + self.zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, anchor) + except KeyError: + pass + + def unsubscribe_all(self): + for sub in self.subscriptions: + self.zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, sub) + self.subscriptions.clear()