From 9ad2a53a51a89001db9c971b94b18983592f8f0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C3=A1lint=20Bokros?= Date: Wed, 24 Jan 2018 18:19:49 +0100 Subject: [PATCH] Move ZMQ connection logic to a separate class --- src/event_handlers/event_handler_base.py | 32 ++++++------------- src/event_handlers/event_handler_main.py | 12 ++++--- src/event_handlers/messaging.py | 25 +++++++++++++++ .../source_code_event_handler.py | 4 +-- src/event_handlers/stateful_event_handler.py | 8 ++--- src/event_handlers/terminado_event_handler.py | 4 +-- 6 files changed, 49 insertions(+), 36 deletions(-) create mode 100644 src/event_handlers/messaging.py diff --git a/src/event_handlers/event_handler_base.py b/src/event_handlers/event_handler_base.py index 22af417..ac7d277 100644 --- a/src/event_handlers/event_handler_base.py +++ b/src/event_handlers/event_handler_base.py @@ -1,25 +1,13 @@ import json -import zmq -from zmq.eventloop import ioloop -from zmq.eventloop.zmqstream import ZMQStream - -from config import PUBLISHER_PORT, RECEIVER_PORT - -ioloop.install() class EventHandlerBase: - def __init__(self, anchor, zmq_context=None): + def __init__(self, messaging, anchor): + self.messaging = messaging self.anchor = anchor - self.zmq_context = zmq_context or zmq.Context.instance() - self.zmq_sub_socket = self.zmq_context.socket(zmq.SUB) + self.messaging.subscribe(self.anchor) self.subscriptions = {self.anchor} - self.zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, self.anchor) - self.zmq_sub_socket.connect('tcp://localhost:{}'.format(PUBLISHER_PORT)) - self.zmq_sub_stream = ZMQStream(self.zmq_sub_socket) - self.zmq_push_socket = self.zmq_context.socket(zmq.PUSH) - self.zmq_push_socket.connect('tcp://localhost:{}'.format(RECEIVER_PORT)) - self.zmq_sub_stream.on_recv(self.event_handler_callback) + self.messaging.register_callback(self.event_handler_callback) def event_handler_callback(self, msg_parts): anchor, message = msg_parts @@ -27,7 +15,7 @@ class EventHandlerBase: response = self.handle_event(anchor, data_json) if anchor != b'reset' else self.handle_reset(data_json) if response is None: return encoded_response = json.dumps(response).encode('utf-8') - self.zmq_push_socket.send_multipart([anchor, encoded_response]) + self.messaging.send(anchor, encoded_response) def handle_event(self, anchor, data_json): raise NotImplementedError @@ -42,23 +30,21 @@ class EventHandlerBase: 'data': data } encoded_message = json.dumps(message).encode('utf-8') - self.zmq_push_socket.send_multipart([encoded_anchor, encoded_message]) + self.messaging.send(encoded_anchor, encoded_message) def subscribe(self, anchor): if anchor not in self.subscriptions: self.subscriptions.add(anchor) - self.zmq_sub_socket.setsockopt_string( - zmq.SUBSCRIBE, anchor - ) + self.messaging.subscribe(anchor) def unsubscribe(self, anchor): try: self.subscriptions.remove(anchor) - self.zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, anchor) + self.messaging.unsubscribe(anchor) except KeyError: pass def unsubscribe_all(self): for sub in self.subscriptions: - self.zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, sub) + self.messaging.unsubscribe(anchor=sub) self.subscriptions.clear() diff --git a/src/event_handlers/event_handler_main.py b/src/event_handlers/event_handler_main.py index 421cf21..7373d67 100644 --- a/src/event_handlers/event_handler_main.py +++ b/src/event_handlers/event_handler_main.py @@ -4,6 +4,7 @@ from functools import partial import source_code from event_handler_base import EventHandlerBase +from messaging import Messaging from source_code_event_handler import SourceCodeEventHandler from terminado_event_handler import TerminadoEventHandler from tornado.ioloop import IOLoop @@ -83,10 +84,11 @@ class LoginHandler(EventHandlerBase): if __name__ == '__main__': - anchor_a = ChangeCaseHandler('anchor_a') - anchor_b = Rot13Handler('anchor_b') - anchor_c = ReverseHandler('anchor_c') - anchor_webide = SourceCodeEventHandler('anchor_webide', 'login_component.py', 'login') - anchor_terminado = TerminadoEventHandler('anchor_terminado', 'terminado') + messaging = Messaging() + anchor_a = ChangeCaseHandler(messaging, 'anchor_a') + anchor_b = Rot13Handler(messaging, 'anchor_b') + anchor_c = ReverseHandler(messaging, 'anchor_c') + anchor_webide = SourceCodeEventHandler(messaging, 'anchor_webide', 'login_component.py', 'login') + anchor_terminado = TerminadoEventHandler(messaging, 'anchor_terminado', 'terminado') IOLoop.instance().start() diff --git a/src/event_handlers/messaging.py b/src/event_handlers/messaging.py new file mode 100644 index 0000000..2aea906 --- /dev/null +++ b/src/event_handlers/messaging.py @@ -0,0 +1,25 @@ +from functools import partial + +import zmq +from zmq.eventloop import ioloop +from zmq.eventloop.zmqstream import ZMQStream + +from config import PUBLISHER_PORT, RECEIVER_PORT + + +class Messaging: + def __init__(self): + ioloop.install() + self._zmq_context = zmq.Context.instance() + self._zmq_sub_socket = self._zmq_context.socket(zmq.SUB) + self._zmq_sub_socket.connect('tcp://localhost:{}'.format(PUBLISHER_PORT)) + self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) + self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH) + self._zmq_push_socket.connect('tcp://localhost:{}'.format(RECEIVER_PORT)) + + self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE) + self.unsubscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.UNSUBSCRIBE) + self.register_callback = self._zmq_sub_stream.on_recv + + def send(self, anchor, response): + self._zmq_push_socket.send_multipart([anchor, response]) diff --git a/src/event_handlers/source_code_event_handler.py b/src/event_handlers/source_code_event_handler.py index a2b99d7..17c9007 100644 --- a/src/event_handlers/source_code_event_handler.py +++ b/src/event_handlers/source_code_event_handler.py @@ -9,8 +9,8 @@ from event_handler_base import EventHandlerBase class SourceCodeEventHandler(EventHandlerBase, SupervisorMixin): - def __init__(self, anchor, filename, process_name=None, zmq_context=None): - super().__init__(anchor, zmq_context) + def __init__(self, messaging, anchor, filename, process_name=None): + super().__init__(messaging, anchor) self.working_directory = LOGIN_APP_DIR self.filename = filename self.language = map_file_extension_to_language(filename) diff --git a/src/event_handlers/stateful_event_handler.py b/src/event_handlers/stateful_event_handler.py index 2f91f4b..f76f510 100644 --- a/src/event_handlers/stateful_event_handler.py +++ b/src/event_handlers/stateful_event_handler.py @@ -4,8 +4,8 @@ from event_handler_base import EventHandlerBase class StatefulEventHandler(EventHandlerBase): - def __init__(self, anchor, event_handler_function, zmq_context=None): - super().__init__(anchor, zmq_context) + def __init__(self, messaging, anchor, event_handler_function): + super().__init__(messaging, anchor) self.event_handler_function = event_handler_function self.generator = None self.subscribe('reset') @@ -29,9 +29,9 @@ class StatefulEventHandler(EventHandlerBase): 'data': response_data, }) response = [r.encode('utf-8') for r in (response_anchor, response_data)] - self.zmq_push_socket.send_multipart(response) + self.messaging.send(*response) - self.zmq_sub_stream.on_recv(event_handler_callback) + self.messaging.register_callback(event_handler_callback) def unsubscribe_all(self): super().unsubscribe_all() diff --git a/src/event_handlers/terminado_event_handler.py b/src/event_handlers/terminado_event_handler.py index 966cb88..973c3d5 100644 --- a/src/event_handlers/terminado_event_handler.py +++ b/src/event_handlers/terminado_event_handler.py @@ -9,8 +9,8 @@ from config import TERMINADO_DIR class TerminadoEventHandler(EventHandlerBase, SupervisorMixin): - def __init__(self, anchor, process_name, zmq_context=None): - super().__init__(anchor, zmq_context) + def __init__(self, messaging, anchor, process_name): + super().__init__(messaging, anchor) self.working_directory = TERMINADO_DIR self.process_name = process_name self.setup_terminado_server()