From d507b3d066d060f55a7fe3433356c5fe53708859 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C3=A1lint=20Bokros?= Date: Mon, 27 Nov 2017 18:16:35 +0100 Subject: [PATCH] Move socket management to a separate package --- src/app/buttons.py | 7 +++-- src/app/component_connector.py | 35 +++++++++++++++++++++ src/app/handlers/zmq_websocket_handler.py | 37 +++++------------------ 3 files changed, 46 insertions(+), 33 deletions(-) create mode 100644 src/app/component_connector.py diff --git a/src/app/buttons.py b/src/app/buttons.py index 9eabb39..f298531 100644 --- a/src/app/buttons.py +++ b/src/app/buttons.py @@ -1,5 +1,7 @@ from transitions import Machine +import component_connector + class Buttons: states = ['ayy', 'bee', 'cee'] @@ -9,8 +11,7 @@ class Buttons: {'trigger': 'anchor_c', 'source': 'cee', 'dest': 'ayy'}, ] - def __init__(self, handler=None): - self.handler = handler + def __init__(self): self.machine = Machine(model=self, states=Buttons.states, transitions=Buttons.transitions, @@ -21,4 +22,4 @@ class Buttons: def forward_message(self, event_data): message = event_data.kwargs.get('message') - self.handler.send_message(message) + component_connector.send_message(message) diff --git a/src/app/component_connector.py b/src/app/component_connector.py new file mode 100644 index 0000000..073be32 --- /dev/null +++ b/src/app/component_connector.py @@ -0,0 +1,35 @@ +import logging + +import json +import zmq +from zmq.eventloop import ioloop +from zmq.eventloop.zmqstream import ZMQStream + +from config import PUBLISHER_PORT, RECEIVER_PORT +from util import parse_anchor_from_message + +ioloop.install() + +_zmq_context = zmq.Context.instance() +_zmq_pull_socket = _zmq_context.socket(zmq.PULL) +_zmq_pull_stream = ZMQStream(_zmq_pull_socket) +_zmq_pub_socket = _zmq_context.socket(zmq.PUB) + +pub_socket_address = 'tcp://*:{}'.format(PUBLISHER_PORT) +_zmq_pub_socket.bind(pub_socket_address) +logging.debug('Pub socket bound to {}'.format(pub_socket_address)) + +pull_socket_address = 'tcp://*:{}'.format(RECEIVER_PORT) +_zmq_pull_socket.bind(pull_socket_address) +logging.debug('Pull socket bound to {}'.format(pull_socket_address)) + + +def register_callback(callback): + _zmq_pull_stream.on_recv(callback) + + +def send_message(message: str, anchor: str = None): + if not anchor: + anchor = parse_anchor_from_message(message) + encoded_message = [part.encode('utf-8') for part in (anchor, message)] + _zmq_pub_socket.send_multipart(encoded_message) diff --git a/src/app/handlers/zmq_websocket_handler.py b/src/app/handlers/zmq_websocket_handler.py index d236ea1..18fc09d 100644 --- a/src/app/handlers/zmq_websocket_handler.py +++ b/src/app/handlers/zmq_websocket_handler.py @@ -1,60 +1,37 @@ import logging -import json from tornado.websocket import WebSocketHandler -import zmq -from zmq.eventloop.zmqstream import ZMQStream -from zmq.eventloop import ioloop -from config import PUBLISHER_PORT, RECEIVER_PORT from buttons import Buttons +from util import parse_anchor_from_message -ioloop.install() +import component_connector class ZMQWebSocketHandler(WebSocketHandler): - def __init__(self, application, request, zmq_context=None, **kwargs): + def __init__(self, application, request, **kwargs): super().__init__(application, request, **kwargs) - self.zmq_context = zmq_context or zmq.Context.instance() - self.zmq_pull_socket = self.zmq_context.socket(zmq.PULL) - self.zmq_pull_stream = ZMQStream(self.zmq_pull_socket) - self.zmq_pub_socket = self.zmq_context.socket(zmq.PUB) self.fsm = Buttons(self) def open(self, *args, **kwargs): - pub_socket_address = 'tcp://*:{}'.format(PUBLISHER_PORT) - self.zmq_pub_socket.bind(pub_socket_address) - logging.debug('Pub socket bound to {}'.format(pub_socket_address)) - pull_socket_address = 'tcp://*:{}'.format(RECEIVER_PORT) - self.zmq_pull_socket.bind(pull_socket_address) - logging.debug('Pull socket bound to {}'.format(pull_socket_address)) - def zmq_callback(msg_parts): anchor, data = msg_parts logging.debug('Received on pull socket: {}'.format(data.decode())) self.write_message(data.decode()) - self.zmq_pull_stream.on_recv(zmq_callback) + component_connector.register_callback(zmq_callback) def on_message(self, message): logging.debug('Received on WebSocket: {}'.format(message)) self.fsm.trigger(self._parse_anchor(message), message=message) - def send_message(self, message, anchor: str = None): - if not anchor: - anchor = self._parse_anchor(message) - encoded_message = [part.encode('utf-8') for part in (anchor, message)] - self.zmq_pub_socket.send_multipart(encoded_message) + def send_message(self, message: str, anchor: str = None): + component_connector.send_message(message, anchor) def on_close(self): - self.zmq_pull_socket.close() - self.zmq_pub_socket.close() + pass # much secure, very cors, wow def check_origin(self, origin): return True - @staticmethod - def _parse_anchor(message): - message_json = json.loads(message) - return message_json['anchor']