From 6e6d775a056ec7de2bf0e9f0b2e70c3b88ee7160 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Mon, 29 Jan 2018 15:28:51 +0100 Subject: [PATCH] OO Refactor event_handler_connector.py temporary solution --- lib/util.py | 10 ++++- src/app/event_handler_connector.py | 53 +++++++++++++++-------- src/app/fsm_base.py | 4 +- src/app/handlers/zmq_websocket_handler.py | 7 ++- 4 files changed, 47 insertions(+), 27 deletions(-) diff --git a/lib/util.py b/lib/util.py index e7f0c6b..0717101 100644 --- a/lib/util.py +++ b/lib/util.py @@ -1,4 +1,4 @@ -import json, xmlrpc.client +import json, xmlrpc.client, zmq from config.envvars import SUPERVISOR_HTTP_URI @@ -15,5 +15,11 @@ def create_source_code_response_data(filename, content, language): 'language': language } + class SupervisorMixin: - supervisor = xmlrpc.client.ServerProxy(SUPERVISOR_HTTP_URI).supervisor \ No newline at end of file + supervisor = xmlrpc.client.ServerProxy(SUPERVISOR_HTTP_URI).supervisor + + +class ZMQConnectorBase: + def __init__(self, zmq_context=None): + self._zmq_context = zmq_context or zmq.Context.instance() diff --git a/src/app/event_handler_connector.py b/src/app/event_handler_connector.py index 501e1d7..6c29039 100644 --- a/src/app/event_handler_connector.py +++ b/src/app/event_handler_connector.py @@ -5,30 +5,45 @@ from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream from config import PUBLISHER_PORT, RECEIVER_PORT -from util import parse_anchor_from_message +from util import parse_anchor_from_message, ZMQConnectorBase ioloop.install() -_zmq_context = zmq.Context.instance() -_zmq_pull_socket = _zmq_context.socket(zmq.PULL) -_zmq_pull_stream = ZMQStream(_zmq_pull_socket) -_zmq_pub_socket = _zmq_context.socket(zmq.PUB) -pub_socket_address = 'tcp://*:{}'.format(PUBLISHER_PORT) -_zmq_pub_socket.bind(pub_socket_address) -logging.debug('Pub socket bound to {}'.format(pub_socket_address)) - -pull_socket_address = 'tcp://*:{}'.format(RECEIVER_PORT) -_zmq_pull_socket.bind(pull_socket_address) -logging.debug('Pull socket bound to {}'.format(pull_socket_address)) +class EventHandlerDownlinkConnector(ZMQConnectorBase): + def __init__(self, zmq_context=None): + super().__init__(zmq_context) + self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL) + self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket) + address = 'tcp://*:{}'.format(RECEIVER_PORT) + self._zmq_pull_socket.bind(address) + logging.debug('Pull socket bound to {}'.format(address)) -def register_callback(callback): - _zmq_pull_stream.on_recv(callback) +class EventHandlerUplinkConnector(ZMQConnectorBase): + def __init__(self, zmq_context=None): + super().__init__(zmq_context) + self._zmq_pub_socket = self._zmq_context.socket(zmq.PUB) + address = 'tcp://*:{}'.format(PUBLISHER_PORT) + self._zmq_pub_socket.bind(address) + logging.debug('Pub socket bound to {}'.format(address)) -def send_message(message: str, anchor: str = None): - if not anchor: - anchor = parse_anchor_from_message(message) - encoded_message = [part.encode('utf-8') for part in (anchor, message)] - _zmq_pub_socket.send_multipart(encoded_message) +class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector): + def __init__(self, zmq_context=None): + self.downlink = EventHandlerDownlinkConnector(zmq_context) + self.uplink = EventHandlerUplinkConnector(zmq_context) + #EventHandlerDownlinkConnector.__init__(self, zmq_context) # TODO: solve this with multiple inheritance + #EventHandlerUplinkConnector.__init__(self, zmq_context) + + def register_callback(self, callback): + self.downlink._zmq_pull_stream.on_recv(callback) + + def send_message(self, message: str, anchor: str = None): + if not anchor: + anchor = parse_anchor_from_message(message) + encoded_message = [part.encode('utf-8') for part in (anchor, message)] + self.uplink._zmq_pub_socket.send_multipart(encoded_message) + + +ehc = EventHandlerConnector() diff --git a/src/app/fsm_base.py b/src/app/fsm_base.py index f22b06d..e1ac5f0 100644 --- a/src/app/fsm_base.py +++ b/src/app/fsm_base.py @@ -1,6 +1,6 @@ from transitions import Machine -import event_handler_connector +from event_handler_connector import ehc class FSMBase: @@ -17,4 +17,4 @@ class FSMBase: def forward_message(self, event_data): message = event_data.kwargs.get('message') - event_handler_connector.send_message(message) + ehc.send_message(message) diff --git a/src/app/handlers/zmq_websocket_handler.py b/src/app/handlers/zmq_websocket_handler.py index d2e4ce5..30edcc5 100644 --- a/src/app/handlers/zmq_websocket_handler.py +++ b/src/app/handlers/zmq_websocket_handler.py @@ -3,8 +3,7 @@ import json from tornado.websocket import WebSocketHandler from util import parse_anchor_from_message -import event_handler_connector - +from event_handler_connector import ehc class ZMQWebSocketHandler(WebSocketHandler): def __init__(self, application, request, **kwargs): @@ -12,7 +11,7 @@ class ZMQWebSocketHandler(WebSocketHandler): def open(self, *args, **kwargs): logging.debug('WebSocket connection initiated') - event_handler_connector.register_callback(self.zmq_callback) + ehc.register_callback(self.zmq_callback) def zmq_callback(self, msg_parts): anchor, data = msg_parts @@ -27,7 +26,7 @@ class ZMQWebSocketHandler(WebSocketHandler): raise NotImplementedError def send_message(self, message: str, anchor: str = None): - event_handler_connector.send_message(message, anchor) + ehc.send_message(message, anchor) def on_close(self): pass