From 7ec400f33c078c5896f35048d196253ebc44e578 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C3=A1lint=20Bokros?= Date: Fri, 17 Nov 2017 15:47:22 +0100 Subject: [PATCH] Add initial version of WS<->FSM<->ZMQ handler --- buttons.py | 24 +++++++++++++ handlers/zmq_websocket_handler.py | 60 +++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 buttons.py create mode 100644 handlers/zmq_websocket_handler.py diff --git a/buttons.py b/buttons.py new file mode 100644 index 0000000..9eabb39 --- /dev/null +++ b/buttons.py @@ -0,0 +1,24 @@ +from transitions import Machine + + +class Buttons: + states = ['ayy', 'bee', 'cee'] + transitions = [ + {'trigger': 'anchor_a', 'source': 'ayy', 'dest': 'bee'}, + {'trigger': 'anchor_b', 'source': 'bee', 'dest': 'cee'}, + {'trigger': 'anchor_c', 'source': 'cee', 'dest': 'ayy'}, + ] + + def __init__(self, handler=None): + self.handler = handler + self.machine = Machine(model=self, + states=Buttons.states, + transitions=Buttons.transitions, + initial='ayy', + send_event=True, + ignore_invalid_triggers=True, + after_state_change='forward_message') + + def forward_message(self, event_data): + message = event_data.kwargs.get('message') + self.handler.send_message(message) diff --git a/handlers/zmq_websocket_handler.py b/handlers/zmq_websocket_handler.py new file mode 100644 index 0000000..d236ea1 --- /dev/null +++ b/handlers/zmq_websocket_handler.py @@ -0,0 +1,60 @@ +import logging + +import json +from tornado.websocket import WebSocketHandler +import zmq +from zmq.eventloop.zmqstream import ZMQStream +from zmq.eventloop import ioloop + +from config import PUBLISHER_PORT, RECEIVER_PORT +from buttons import Buttons + +ioloop.install() + + +class ZMQWebSocketHandler(WebSocketHandler): + def __init__(self, application, request, zmq_context=None, **kwargs): + super().__init__(application, request, **kwargs) + self.zmq_context = zmq_context or zmq.Context.instance() + self.zmq_pull_socket = self.zmq_context.socket(zmq.PULL) + self.zmq_pull_stream = ZMQStream(self.zmq_pull_socket) + self.zmq_pub_socket = self.zmq_context.socket(zmq.PUB) + self.fsm = Buttons(self) + + def open(self, *args, **kwargs): + pub_socket_address = 'tcp://*:{}'.format(PUBLISHER_PORT) + self.zmq_pub_socket.bind(pub_socket_address) + logging.debug('Pub socket bound to {}'.format(pub_socket_address)) + pull_socket_address = 'tcp://*:{}'.format(RECEIVER_PORT) + self.zmq_pull_socket.bind(pull_socket_address) + logging.debug('Pull socket bound to {}'.format(pull_socket_address)) + + def zmq_callback(msg_parts): + anchor, data = msg_parts + logging.debug('Received on pull socket: {}'.format(data.decode())) + self.write_message(data.decode()) + + self.zmq_pull_stream.on_recv(zmq_callback) + + def on_message(self, message): + logging.debug('Received on WebSocket: {}'.format(message)) + self.fsm.trigger(self._parse_anchor(message), message=message) + + def send_message(self, message, anchor: str = None): + if not anchor: + anchor = self._parse_anchor(message) + encoded_message = [part.encode('utf-8') for part in (anchor, message)] + self.zmq_pub_socket.send_multipart(encoded_message) + + def on_close(self): + self.zmq_pull_socket.close() + self.zmq_pub_socket.close() + + # much secure, very cors, wow + def check_origin(self, origin): + return True + + @staticmethod + def _parse_anchor(message): + message_json = json.loads(message) + return message_json['anchor']