From 7d50ee607afb00662993cd26f5ed5c63ccfa4705 Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Mon, 20 May 2019 11:06:57 +0200 Subject: [PATCH] Close ZMQ sockets gracefully --- lib/tfw/event_handler_base/event_handler_base.py | 4 ++++ .../networking/event_handlers/server_connector.py | 12 +++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/lib/tfw/event_handler_base/event_handler_base.py b/lib/tfw/event_handler_base/event_handler_base.py index c013a8d..18541ee 100644 --- a/lib/tfw/event_handler_base/event_handler_base.py +++ b/lib/tfw/event_handler_base/event_handler_base.py @@ -106,6 +106,10 @@ class EventHandlerBase(ABC): self.server_connector.unsubscribe(key) self.keys.remove(key) + def stop(self): + self.server_connector.close() + self.cleanup() + def cleanup(self): """ Perform cleanup actions such as releasing database diff --git a/lib/tfw/networking/event_handlers/server_connector.py b/lib/tfw/networking/event_handlers/server_connector.py index 7d24803..4c2a2b9 100644 --- a/lib/tfw/networking/event_handlers/server_connector.py +++ b/lib/tfw/networking/event_handlers/server_connector.py @@ -19,6 +19,7 @@ class ServerDownlinkConnector(ZMQConnectorBase): super(ServerDownlinkConnector, self).__init__(zmq_context) self._zmq_sub_socket = self._zmq_context.socket(zmq.SUB) self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUBLISHER_PORT}') + self._zmq_sub_socket.setsockopt(zmq.RCVHWM,0) self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE) @@ -28,6 +29,9 @@ class ServerDownlinkConnector(ZMQConnectorBase): callback = with_deserialize_tfw_msg(callback) self._zmq_sub_stream.on_recv(callback) + def close(self): + self._zmq_sub_stream.close() + class ServerUplinkConnector(ZMQConnectorBase): """ @@ -37,6 +41,7 @@ class ServerUplinkConnector(ZMQConnectorBase): super(ServerUplinkConnector, self).__init__(zmq_context) self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH) self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.RECEIVER_PORT}') + self._zmq_push_socket.setsockopt(zmq.SNDHWM,0) def send_to_eventhandler(self, message): """ @@ -74,6 +79,11 @@ class ServerUplinkConnector(ZMQConnectorBase): 'data': message }) + def close(self): + self._zmq_push_socket.close() + class ServerConnector(ServerUplinkConnector, ServerDownlinkConnector): - pass + def close(self): + ServerUplinkConnector.close(self) + ServerDownlinkConnector.close(self)