Close ZMQ sockets gracefully

This commit is contained in:
R. Richard 2019-05-20 11:06:57 +02:00
parent 2a3d29f080
commit 7d50ee607a
2 changed files with 15 additions and 1 deletions

View File

@ -106,6 +106,10 @@ class EventHandlerBase(ABC):
self.server_connector.unsubscribe(key) self.server_connector.unsubscribe(key)
self.keys.remove(key) self.keys.remove(key)
def stop(self):
self.server_connector.close()
self.cleanup()
def cleanup(self): def cleanup(self):
""" """
Perform cleanup actions such as releasing database Perform cleanup actions such as releasing database

View File

@ -19,6 +19,7 @@ class ServerDownlinkConnector(ZMQConnectorBase):
super(ServerDownlinkConnector, self).__init__(zmq_context) super(ServerDownlinkConnector, self).__init__(zmq_context)
self._zmq_sub_socket = self._zmq_context.socket(zmq.SUB) self._zmq_sub_socket = self._zmq_context.socket(zmq.SUB)
self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUBLISHER_PORT}') self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUBLISHER_PORT}')
self._zmq_sub_socket.setsockopt(zmq.RCVHWM,0)
self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket)
self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE) self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE)
@ -28,6 +29,9 @@ class ServerDownlinkConnector(ZMQConnectorBase):
callback = with_deserialize_tfw_msg(callback) callback = with_deserialize_tfw_msg(callback)
self._zmq_sub_stream.on_recv(callback) self._zmq_sub_stream.on_recv(callback)
def close(self):
self._zmq_sub_stream.close()
class ServerUplinkConnector(ZMQConnectorBase): class ServerUplinkConnector(ZMQConnectorBase):
""" """
@ -37,6 +41,7 @@ class ServerUplinkConnector(ZMQConnectorBase):
super(ServerUplinkConnector, self).__init__(zmq_context) super(ServerUplinkConnector, self).__init__(zmq_context)
self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH) self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH)
self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.RECEIVER_PORT}') self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.RECEIVER_PORT}')
self._zmq_push_socket.setsockopt(zmq.SNDHWM,0)
def send_to_eventhandler(self, message): def send_to_eventhandler(self, message):
""" """
@ -74,6 +79,11 @@ class ServerUplinkConnector(ZMQConnectorBase):
'data': message 'data': message
}) })
def close(self):
self._zmq_push_socket.close()
class ServerConnector(ServerUplinkConnector, ServerDownlinkConnector): class ServerConnector(ServerUplinkConnector, ServerDownlinkConnector):
pass def close(self):
ServerUplinkConnector.close(self)
ServerDownlinkConnector.close(self)