diff --git a/.drone.yml b/.drone.yml index 0b24068..9dbc1f4 100644 --- a/.drone.yml +++ b/.drone.yml @@ -10,4 +10,4 @@ pipeline: - docker push eu.gcr.io/avatao-challengestore/tutorial-framework:${DRONE_TAG} when: event: 'tag' - branch: refs/tags/bombay-20* + branch: refs/tags/mainecoon-20* diff --git a/Dockerfile b/Dockerfile index 95976be..85f00ee 100644 --- a/Dockerfile +++ b/Dockerfile @@ -37,6 +37,7 @@ ENV PYTHONPATH="/usr/local/lib" \ TFW_LIB_DIR="/usr/local/lib/" \ TFW_TERMINADO_DIR="/tmp/terminado_server" \ TFW_FRONTEND_DIR="/srv/frontend" \ + TFW_SERVER_DIR="/srv/.tfw" \ TFW_HISTFILE="/home/${AVATAO_USER}/.bash_history" \ PROMPT_COMMAND="history -a" @@ -45,13 +46,15 @@ RUN echo "export HISTFILE=${TFW_HISTFILE}" >> /tmp/bashrc &&\ cat /tmp/bashrc >> /home/${AVATAO_USER}/.bashrc COPY supervisor/supervisord.conf ${TFW_SUPERVISORD_CONF} +COPY supervisor/components/ ${TFW_SUPERVISORD_COMPONENTS} COPY nginx/nginx.conf ${TFW_NGINX_CONF} COPY nginx/default.conf ${TFW_NGINX_DEFAULT} COPY nginx/components/ ${TFW_NGINX_COMPONENTS} COPY lib LICENSE ${TFW_LIB_DIR} +COPY supervisor/tfw_server.py ${TFW_SERVER_DIR}/ RUN for dir in "${TFW_LIB_DIR}"/{tfw,tao,envvars} "/etc/nginx" "/etc/supervisor"; do \ - chown -R root:root "$dir" && chmod -R 700 "$dir"; \ + chown -R root:root "$dir" && chmod -R 700 "$dir"; \ done ONBUILD ARG BUILD_CONTEXT="solvable" diff --git a/README.md b/README.md index 5eff814..71e7294 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,20 @@ Frontend components use websockets to connect to the TFW server, to which you ca ![TFW architecture](docs/tfw_architecture.png) +### Networking details + +Event handlers connect to the TFW server using ZMQ. +They receive messages on their `SUB`(scribe) sockets, which are connected to the `PUB`(lish) socket of the server. +Event handlers reply on their `PUSH` socket, then their messages are received on the `PULL` socket of the server. + +The TFW server is basically just a fancy proxy. +It's behaviour is quite simple: it proxies every message received from the fontend to the event handlers and vice versa. + +The server is also capable of "mirroring" messages back to their source. +This is useful for communication between event handlers or frontend components (event handler to event handler or frontend component to frontend component communication). + +Components can also broadcast messages (broadcasted messages are received both by event handlers and the frontend as well). + ### Event handlers Imagine event handlers as callbacks that are invoked when TFW receives a specific type of message. For instance, you could send a message to the framework when the user does something of note. @@ -75,11 +89,180 @@ The TFW message format: - The `data` object can contain anything you might want to send - The `trigger` key is an optional field that triggers an FSM action with that name from the current state (whatever that might be) +To mirror messages back to their sources you can use a special messaging format, in which the message to be mirrored is enveloped inside the `data` field of the outer message: + +```text + "key": "mirror", + "data": + { + ... + The message you want to mirror (with it's own "key" and "data" fields) + ... + } +``` + +Broadcasting messages is possible in a similar manner by using `"key": "broadcast"` in the outer message. ## Where to go next Most of the components you need have docstrings included (hang on tight, this is work in progress) – refer to them for usage info. -In the `docs` folder you can find our Sphinx-based API documentation, which you can build using the `hack/tfw.sh` script in the [test-tutorial-framework](https://github.com/avatao-content/test-tutorial-framework) repository. +In the `docs` folder you can find our Sphinx-based documentation, which you can build using the `hack/tfw.sh` script in the [test-tutorial-framework](https://github.com/avatao-content/test-tutorial-framework) repository. To get started you should take a look at [test-tutorial-framework](https://github.com/avatao-content/test-tutorial-framework), which serves as an example project as well. + +## API + +APIs exposed by our pre-witten event handlers are documented here. + +### IdeEventHandler + +You can read the content of the currently selected file like so: +``` +{ + "key": "ide", + "data": + { + "command": "read" + } +} +``` + +Use the following message to overwrite the content of the currently selected file: +``` +{ + "key": "ide", + "data": + { + "command": "write", + "content": ...string... + } +} +``` + +To select a file use the following message: +``` +{ + "key": "ide", + "data": + { + "command": "select", + "filename": ...string... + } +} +``` + +You can switch to a new working directory using this message (note that the directory must be in `allowed_directories`): +``` +{ + "key": "ide", + "data": + { + "command": "selectdir", + "directory": ...string... + } +} +``` + +Overwriting the current list of excluded file patterns is possible with this message: +``` +{ + "key": "ide", + "data": + { + "command": "exclude", + "exclude": ...array of strings... + } +} +``` + +### TerminalEventHandler + +Writing to the terminal: +``` +{ + "key": "shell", + "data": + { + "command": "write", + "value": ...string... + } +} +``` + +You can read terminal command history like so: +``` +{ + "key": "shell", + "data": + { + "command": "read", + "count": ...number... + } +} +``` + +### ProcessManagingEventHandler + +Starting, stopping and restarting supervisor processes can be done using similar messages (where `command` is `start`, `stop` or `restart`): +``` +{ + "key": "processmanager", + "data": + { + "command": ...string..., + "process_name": ...string... + } +} +``` + +### LogMonitoringEventHandler + +To change which supervisor process is monitored use this message: +``` +{ + "key": "logmonitor", + "data" : + { + "command": "process_name", + "value": ...string... + } +} +``` + +To set the tail length of logs (the monitor will send back the last `value` characters of the log): +``` +{ + "key": "logmonitor", + "data" : + { + "command": "log_tail", + "value": ...number... + } +} +``` + +### FSMManagingEventHandler + +To attempt executing a trigger on the FSM use: +``` +{ + "key": "fsm", + "data" : + { + "command": "trigger", + "value": ...string... + } +} +``` + +To force the broadcasting of an FSM update you can use this message: +``` +{ + "key": "fsm", + "data" : + { + "command": "update" + } +} +``` diff --git a/VERSION b/VERSION index 4958238..c1385d5 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -bombay +mainecoon diff --git a/docs/source/conf.py b/docs/source/conf.py index 5c596d0..c41e102 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -26,7 +26,7 @@ author = 'Kristóf Tóth' # The short X.Y version version = '' # The full version, including alpha/beta/rc tags -release = 'bombay' +release = 'mainecoon' # -- General configuration --------------------------------------------------- diff --git a/docs/tfw_architecture.png b/docs/tfw_architecture.png index fa53dd6..854448f 100644 Binary files a/docs/tfw_architecture.png and b/docs/tfw_architecture.png differ diff --git a/lib/tfw/__init__.py b/lib/tfw/__init__.py index d0e877b..53e80d7 100644 --- a/lib/tfw/__init__.py +++ b/lib/tfw/__init__.py @@ -1,6 +1,7 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from .event_handler_base import EventHandlerBase, TriggeredEventHandler +from .event_handler_base import EventHandlerBase, TriggeredEventHandler, BroadcastingEventHandler from .fsm_base import FSMBase from .linear_fsm import LinearFSM +from .yaml_fsm import YamlFSM diff --git a/lib/tfw/components/__init__.py b/lib/tfw/components/__init__.py index 12735f9..ad160f1 100644 --- a/lib/tfw/components/__init__.py +++ b/lib/tfw/components/__init__.py @@ -8,3 +8,4 @@ from .ide_event_handler import IdeEventHandler from .history_monitor import HistoryMonitor, BashMonitor, GDBMonitor from .terminal_commands import TerminalCommands from .log_monitoring_event_handler import LogMonitoringEventHandler +from .fsm_managing_event_handler import FSMManagingEventHandler diff --git a/lib/tfw/components/fsm_managing_event_handler.py b/lib/tfw/components/fsm_managing_event_handler.py new file mode 100644 index 0000000..78efa66 --- /dev/null +++ b/lib/tfw/components/fsm_managing_event_handler.py @@ -0,0 +1,56 @@ +# Copyright (C) 2018 Avatao.com Innovative Learning Kft. +# All Rights Reserved. See LICENSE file for details. + +from tfw import BroadcastingEventHandler +from tfw.config.logs import logging + +LOG = logging.getLogger(__name__) + + +class FSMManagingEventHandler(BroadcastingEventHandler): + def __init__(self, key, fsm_type): + super().__init__(key) + self.fsm = fsm_type() + self._fsm_updater = FSMUpdater(self.fsm) + + self.command_handlers = { + 'trigger': self.handle_trigger, + 'update': self.handle_update + } + + def handle_event(self, message): + try: + data = message['data'] + message['data'] = self.command_handlers[data['command']](data) + return message + except KeyError: + LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) + + def handle_trigger(self, data): + self.fsm.step(data['value']) + return self.with_fsm_update(data) + + def with_fsm_update(self, data): + return { + **data, + **self._fsm_updater.get_fsm_state_and_transitions() + } + + def handle_update(self, data): + return self.with_fsm_update(data) + + +class FSMUpdater: + def __init__(self, fsm): + self.fsm = fsm + + def get_fsm_state_and_transitions(self): + state = self.fsm.state + valid_transitions = [ + {'trigger': trigger} + for trigger in self.fsm.get_triggers(self.fsm.state) + ] + return { + 'current_state': state, + 'valid_transitions': valid_transitions + } diff --git a/lib/tfw/event_handler_base.py b/lib/tfw/event_handler_base.py index cca44b1..8771f35 100644 --- a/lib/tfw/event_handler_base.py +++ b/lib/tfw/event_handler_base.py @@ -2,9 +2,13 @@ # All Rights Reserved. See LICENSE file for details. from abc import ABC, abstractmethod +from json import dumps +from hashlib import md5 -from tfw.networking import deserialize_tfw_msg from tfw.networking.event_handlers import ServerConnector +from tfw.config.logs import logging + +LOG = logging.getLogger(__name__) class EventHandlerBase(ABC): @@ -20,22 +24,23 @@ class EventHandlerBase(ABC): self.subscribe(self.key, 'reset') self.server_connector.register_callback(self.event_handler_callback) - def event_handler_callback(self, msg_parts): + def event_handler_callback(self, message): """ Callback that is invoked when receiving a message. Dispatches messages to handler methods and sends a response back in case the handler returned something. This is subscribed in __init__(). """ - message = deserialize_tfw_msg(*msg_parts) response = self.dispatch_handling(message) if response: - response['key'] = message['key'] self.server_connector.send(response) def dispatch_handling(self, message): """ Used to dispatch messages to their specific handlers. + + :param message: the message received + :returns: the message to send back """ if message['key'] != 'reset': return self.handle_event(message) @@ -47,6 +52,7 @@ class EventHandlerBase(ABC): Abstract method that implements the handling of messages. :param message: the message received + :returns: the message to send back """ raise NotImplementedError @@ -56,6 +62,7 @@ class EventHandlerBase(ABC): Usually 'reset' events receive some sort of special treatment. :param message: the message received + :returns: the message to send back """ return None @@ -102,3 +109,38 @@ class TriggeredEventHandler(EventHandlerBase, ABC): if message.get('trigger') == self.trigger: return super().dispatch_handling(message) return None + + +class BroadcastingEventHandler(EventHandlerBase, ABC): + # pylint: disable=abstract-method + """ + Abstract base class for EventHandlers which broadcast responses + and intelligently ignore their own broadcasted messages they receive. + """ + def __init__(self, key): + super().__init__(key) + self.own_message_hashes = [] + + def event_handler_callback(self, message): + message_hash = self.hash_message(message) + + if message_hash in self.own_message_hashes: + self.own_message_hashes.remove(message_hash) + return + + response = self.dispatch_handling(message) + if response: + self.own_message_hashes.append(self.hash_message(response)) + self.server_connector.send(self.make_broadcast_message(response)) + + @staticmethod + def hash_message(message): + message_bytes = dumps(message, sort_keys=True).encode() + return md5(message_bytes).hexdigest() + + @staticmethod + def make_broadcast_message(message): + return { + 'key': 'broadcast', + 'data': message + } diff --git a/lib/tfw/fsm_base.py b/lib/tfw/fsm_base.py index 7296627..28b5ef6 100644 --- a/lib/tfw/fsm_base.py +++ b/lib/tfw/fsm_base.py @@ -1,27 +1,32 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from typing import List +from collections import defaultdict -from transitions import Machine +from transitions import Machine, MachineError from tfw.mixins import CallbackMixin +from tfw.config.logs import logging + +LOG = logging.getLogger(__name__) -class FSMBase(CallbackMixin): +class FSMBase(Machine, CallbackMixin): """ A general FSM base class you can inherit from to track user progress. See linear_fsm.py for an example use-case. - TFW the transitions library for state machines, please refer to their + TFW uses the transitions library for state machines, please refer to their documentation for more information on creating your own machines: https://github.com/pytransitions/transitions """ states, transitions = [], [] - def __init__(self, initial: str = None, accepted_states: List[str] = None): + def __init__(self, initial=None, accepted_states=None): self.accepted_states = accepted_states or [self.states[-1]] - self.machine = Machine( - model=self, + self.trigger_predicates = defaultdict(list) + + Machine.__init__( + self, states=self.states, transitions=self.transitions, initial=initial or self.states[0], @@ -34,4 +39,27 @@ class FSMBase(CallbackMixin): self._execute_callbacks(event_data.kwargs) def is_solved(self): - return self.state in self.accepted_states # pylint: disable=no-member + return self.state in self.accepted_states # pylint: disable=no-member + + def subscribe_predicate(self, trigger, *predicates): + self.trigger_predicates[trigger].extend(predicates) + + def unsubscribe_predicate(self, trigger, *predicates): + self.trigger_predicates[trigger] = [ + predicate + for predicate in self.trigger_predicates[trigger] + not in predicates + ] + + def step(self, trigger): + predicate_results = ( + predicate() + for predicate in self.trigger_predicates[trigger] + ) + + # TODO: think about what could we do when this prevents triggering + if all(predicate_results): + try: + self.trigger(trigger) + except (AttributeError, MachineError): + LOG.debug('FSM failed to execute nonexistent trigger: "%s"', trigger) diff --git a/lib/tfw/mixins/callback_mixin.py b/lib/tfw/mixins/callback_mixin.py index 3c94e71..54515f3 100644 --- a/lib/tfw/mixins/callback_mixin.py +++ b/lib/tfw/mixins/callback_mixin.py @@ -21,6 +21,14 @@ class CallbackMixin: fun = partial(callback, *args, **kwargs) self._callbacks.append(fun) + def subscribe_callbacks(self, *callbacks): + """ + Subscribe a list of callbacks to incoke once an event is triggered. + :param callbacks: callbacks to be subscribed + """ + for callback in callbacks: + self.subscribe_callback(callback) + def unsubscribe_callback(self, callback): self._callbacks.remove(callback) diff --git a/lib/tfw/networking/__init__.py b/lib/tfw/networking/__init__.py index 6dd15f9..c5879b7 100644 --- a/lib/tfw/networking/__init__.py +++ b/lib/tfw/networking/__init__.py @@ -1,7 +1,7 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from .serialization import serialize_tfw_msg, deserialize_tfw_msg, validate_message +from .serialization import serialize_tfw_msg, deserialize_tfw_msg, with_deserialize_tfw_msg from .zmq_connector_base import ZMQConnectorBase # from .controller_connector import ControllerConnector # TODO: readd once controller stuff is resolved from .message_sender import MessageSender diff --git a/lib/tfw/networking/event_handlers/server_connector.py b/lib/tfw/networking/event_handlers/server_connector.py index 6685ead..678bb3b 100644 --- a/lib/tfw/networking/event_handlers/server_connector.py +++ b/lib/tfw/networking/event_handlers/server_connector.py @@ -6,9 +6,12 @@ from functools import partial import zmq from zmq.eventloop.zmqstream import ZMQStream -from tfw.networking import serialize_tfw_msg +from tfw.networking import serialize_tfw_msg, with_deserialize_tfw_msg from tfw.networking import ZMQConnectorBase from tfw.config import TFWENV +from tfw.config.logs import logging + +LOG = logging.getLogger(__name__) class ServerDownlinkConnector(ZMQConnectorBase): @@ -20,7 +23,10 @@ class ServerDownlinkConnector(ZMQConnectorBase): self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE) self.unsubscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.UNSUBSCRIBE) - self.register_callback = self._zmq_sub_stream.on_recv + + def register_callback(self, callback): + callback = with_deserialize_tfw_msg(callback) + self._zmq_sub_stream.on_recv(callback) class ServerUplinkConnector(ZMQConnectorBase): diff --git a/lib/tfw/networking/serialization.py b/lib/tfw/networking/serialization.py index 6a6a3e7..c28eec0 100644 --- a/lib/tfw/networking/serialization.py +++ b/lib/tfw/networking/serialization.py @@ -22,10 +22,7 @@ The purpose of this module is abstracting away this low level behaviour. """ import json - - -def validate_message(message): - return 'key' in message +from functools import wraps def serialize_tfw_msg(message): @@ -35,6 +32,14 @@ def serialize_tfw_msg(message): return _serialize_all(message['key'], message) +def with_deserialize_tfw_msg(fun): + @wraps(fun) + def wrapper(message_parts): + message = deserialize_tfw_msg(*message_parts) + return fun(message) + return wrapper + + def deserialize_tfw_msg(*args): """ Return message from TFW multipart data diff --git a/lib/tfw/networking/server/__init__.py b/lib/tfw/networking/server/__init__.py index aea3e0c..e707fab 100644 --- a/lib/tfw/networking/server/__init__.py +++ b/lib/tfw/networking/server/__init__.py @@ -3,5 +3,4 @@ from .event_handler_connector import EventHandlerConnector, EventHandlerUplinkConnector, EventHandlerDownlinkConnector from .tfw_server import TFWServer -from .zmq_websocket_handler import ZMQWebSocketProxy # from .controller_responder import ControllerResponder # TODO: readd once controller stuff is resolved diff --git a/lib/tfw/networking/server/event_handler_connector.py b/lib/tfw/networking/server/event_handler_connector.py index 542bac1..c4c6338 100644 --- a/lib/tfw/networking/server/event_handler_connector.py +++ b/lib/tfw/networking/server/event_handler_connector.py @@ -4,7 +4,7 @@ import zmq from zmq.eventloop.zmqstream import ZMQStream -from tfw.networking import ZMQConnectorBase, serialize_tfw_msg +from tfw.networking import ZMQConnectorBase, serialize_tfw_msg, with_deserialize_tfw_msg from tfw.config import TFWENV from tfw.config.logs import logging @@ -32,6 +32,7 @@ class EventHandlerUplinkConnector(ZMQConnectorBase): class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector): def register_callback(self, callback): + callback = with_deserialize_tfw_msg(callback) self._zmq_pull_stream.on_recv(callback) def send_message(self, message: dict): diff --git a/lib/tfw/networking/server/tfw_server.py b/lib/tfw/networking/server/tfw_server.py index 78cc8ee..bdc5179 100644 --- a/lib/tfw/networking/server/tfw_server.py +++ b/lib/tfw/networking/server/tfw_server.py @@ -1,15 +1,12 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from collections import defaultdict - from tornado.web import Application -from tfw.networking import MessageSender from tfw.networking.event_handlers import ServerUplinkConnector from tfw.networking.server import EventHandlerConnector from tfw.config.logs import logging -from .zmq_websocket_handler import ZMQWebSocketProxy +from .zmq_websocket_proxy import ZMQWebSocketProxy LOG = logging.getLogger(__name__) @@ -18,117 +15,29 @@ class TFWServer: """ This class handles the proxying of messages between the frontend and event handers. It proxies messages from the "/ws" route to all event handlers subscribed to a ZMQ - SUB socket. It also manages an FSM you can define as a constructor argument. + SUB socket. """ - def __init__(self, fsm_type): - """ - :param fsm_type: the type of FSM you want TFW to use - """ - self._fsm = fsm_type() - self._fsm_updater = FSMUpdater(self._fsm) - self._fsm_manager = FSMManager(self._fsm) - self._fsm.subscribe_callback(self._fsm_updater.update) + def __init__(self): self._event_handler_connector = EventHandlerConnector() + self._uplink_connector = ServerUplinkConnector() self.application = Application([( r'/ws', ZMQWebSocketProxy,{ - 'make_eventhandler_message': self.make_eventhandler_message, - 'proxy_filter': self.proxy_filter, - 'handle_trigger': self.handle_trigger, - 'event_handler_connector': self._event_handler_connector + 'event_handler_connector': self._event_handler_connector, + 'message_handlers': [self.handle_trigger] })] ) - # self.controller_responder = ControllerResponder(self.fsm) - # TODO: add this once controller stuff is resolved - - @property - def fsm(self): - return self._fsm - - @property - def fsm_manager(self): - return self._fsm_manager - - def make_eventhandler_message(self, message): - self.trigger_fsm(message) - message['FSMUpdate'] = self._fsm_updater.get_fsm_state_and_transitions() - return message def handle_trigger(self, message): - LOG.debug('Executing handler for trigger "%s"', message.get('trigger', '')) - self.trigger_fsm(message) - - def trigger_fsm(self, message): - trigger = message.get('trigger', '') - try: - self._fsm_manager.trigger(trigger, message) - except AttributeError: - LOG.debug('FSM failed to execute nonexistent trigger: "%s"', trigger) - - def proxy_filter(self, message): - # pylint: disable=unused-argument,no-self-use - return True + if 'trigger' in message: + LOG.debug('Executing handler for trigger "%s"', message.get('trigger', '')) + self._uplink_connector.send_to_eventhandler({ + 'key': 'fsm', + 'data': { + 'command': 'trigger', + 'value': message.get('trigger', '') + } + }) def listen(self, port): self.application.listen(port) - - -class FSMManager: - def __init__(self, fsm): - self._fsm = fsm - self.trigger_predicates = defaultdict(list) - self.messenge_sender = MessageSender() - - @property - def fsm(self): - return self._fsm - - def trigger(self, trigger, message): - predicate_results = [] - for predicate in self.trigger_predicates[trigger]: - success, message = predicate(message) - predicate_results.append(success) - self.messenge_sender.send('FSM', message) - - if all(predicate_results): - try: - self.fsm.trigger(trigger, message=message) - except AttributeError: - LOG.debug('FSM failed to execute nonexistent trigger: "%s"', trigger) - - def subscribe_predicate(self, trigger, *predicates): - self.trigger_predicates[trigger].extend(predicates) - - def unsubscribe_predicate(self, trigger, *predicates): - self.trigger_predicates[trigger] = [ - predicate - for predicate in self.trigger_predicates[trigger] - not in predicates - ] - - -class FSMUpdater: - def __init__(self, fsm): - self.fsm = fsm - self.uplink = ServerUplinkConnector() - - def update(self, kwargs_dict): - # pylint: disable=unused-argument - self.uplink.send(self.generate_fsm_update()) - - def generate_fsm_update(self): - return { - 'key': 'FSMUpdate', - 'data': self.get_fsm_state_and_transitions() - } - - def get_fsm_state_and_transitions(self): - state = self.fsm.state - valid_transitions = [ - {'trigger': trigger} - for trigger in self.fsm.machine.get_triggers(self.fsm.state) - ] - return { - 'current_state': state, - 'valid_transitions': valid_transitions - } diff --git a/lib/tfw/networking/server/zmq_websocket_handler.py b/lib/tfw/networking/server/zmq_websocket_handler.py deleted file mode 100644 index c842205..0000000 --- a/lib/tfw/networking/server/zmq_websocket_handler.py +++ /dev/null @@ -1,91 +0,0 @@ -# Copyright (C) 2018 Avatao.com Innovative Learning Kft. -# All Rights Reserved. See LICENSE file for details. - -import json -from abc import ABC, abstractmethod - -from tornado.websocket import WebSocketHandler - -from tfw.networking import deserialize_tfw_msg, validate_message -from tfw.config.logs import logging - -LOG = logging.getLogger(__name__) - - -class ZMQWebSocketHandler(WebSocketHandler, ABC): - instances = set() - - def initialize(self, **kwargs): # pylint: disable=arguments-differ - self._event_handler_connector = kwargs['event_handler_connector'] - - def prepare(self): - ZMQWebSocketHandler.instances.add(self) - - def on_close(self): - ZMQWebSocketHandler.instances.remove(self) - - def open(self, *args, **kwargs): - LOG.debug('WebSocket connection initiated') - self._event_handler_connector.register_callback(self.zmq_callback) - - def zmq_callback(self, msg_parts): - keyhandlers = {'mirror': self.mirror} - - message = deserialize_tfw_msg(*msg_parts) - LOG.debug('Received on pull socket: %s', message) - if not validate_message(message): - return - - self.handle_trigger(message) - if message['key'] not in keyhandlers: - for instance in ZMQWebSocketHandler.instances: - instance.write_message(message) - else: - try: - keyhandlers[message['key']](message) - except KeyError: - LOG.error('Invalid mirror message format! Ignoring.') - - def mirror(self, message): - message = message['data'] - self._event_handler_connector.send_message(message) - - def on_message(self, message): - LOG.debug('Received on WebSocket: %s', message) - if validate_message(message): - self.send_message(self.make_eventhandler_message(message)) - - @abstractmethod - def make_eventhandler_message(self, message): - raise NotImplementedError - - def send_message(self, message: dict): - self._event_handler_connector.send_message(message) - - @abstractmethod - def handle_trigger(self, message): - raise NotImplementedError - - # much secure, very cors, wow - def check_origin(self, origin): - return True - - -class ZMQWebSocketProxy(ZMQWebSocketHandler): - # pylint: disable=abstract-method - def initialize(self, **kwargs): # pylint: disable=arguments-differ - super(ZMQWebSocketProxy, self).initialize(**kwargs) - self._make_eventhandler_message = kwargs['make_eventhandler_message'] - self._proxy_filter = kwargs['proxy_filter'] - self._handle_trigger = kwargs['handle_trigger'] - - def on_message(self, message): - message = json.loads(message) - if self._proxy_filter(message): - super().on_message(message) - - def make_eventhandler_message(self, message): - return self._make_eventhandler_message(message) - - def handle_trigger(self, message): - self._handle_trigger(message) diff --git a/lib/tfw/networking/server/zmq_websocket_proxy.py b/lib/tfw/networking/server/zmq_websocket_proxy.py new file mode 100644 index 0000000..85d6c60 --- /dev/null +++ b/lib/tfw/networking/server/zmq_websocket_proxy.py @@ -0,0 +1,121 @@ +# Copyright (C) 2018 Avatao.com Innovative Learning Kft. +# All Rights Reserved. See LICENSE file for details. + +import json + +from tornado.websocket import WebSocketHandler + +from tfw.mixins import CallbackMixin +from tfw.config.logs import logging + +LOG = logging.getLogger(__name__) + + +class ZMQWebSocketProxy(WebSocketHandler): + instances = set() + + def initialize(self, **kwargs): # pylint: disable=arguments-differ + self._event_handler_connector = kwargs['event_handler_connector'] + self._message_handlers = kwargs.get('message_handlers', []) + self._proxy_filters = kwargs.get('proxy_filters', []) + + self.proxy_eventhandler_to_websocket = TFWProxy( + self.send_eventhandler_message, + self.send_websocket_message + ) + self.proxy_websocket_to_eventhandler = TFWProxy( + self.send_websocket_message, + self.send_eventhandler_message + ) + + proxies = (self.proxy_eventhandler_to_websocket, self.proxy_websocket_to_eventhandler) + for proxy in proxies: + proxy.proxy_filters.subscribe_callbacks(*self._proxy_filters) + proxy.proxy_callbacks.subscribe_callbacks(*self._message_handlers) + + def prepare(self): + ZMQWebSocketProxy.instances.add(self) + + def on_close(self): + ZMQWebSocketProxy.instances.remove(self) + + def open(self, *args, **kwargs): + LOG.debug('WebSocket connection initiated') + self._event_handler_connector.register_callback(self.eventhander_callback) + + def eventhander_callback(self, message): + """ + Invoked on ZMQ messages from event handlers. + """ + LOG.debug('Received on pull socket: %s', message) + self.proxy_eventhandler_to_websocket(message) + + def on_message(self, message): + """ + Invoked on WS messages from frontend. + """ + message = json.loads(message) + LOG.debug('Received on WebSocket: %s', message) + self.proxy_websocket_to_eventhandler(message) + + def send_eventhandler_message(self, message): + self._event_handler_connector.send_message(message) + + @staticmethod + def send_websocket_message(message): + for instance in ZMQWebSocketProxy.instances: + instance.write_message(message) + + # much secure, very cors, wow + def check_origin(self, origin): + return True + + +class TFWProxy: + def __init__(self, to_source, to_destination): + self.to_source = to_source + self.to_destination = to_destination + + self.proxy_filters = CallbackMixin() + self.proxy_callbacks = CallbackMixin() + + self.proxy_filters.subscribe_callback(self.validate_message) + + self.keyhandlers = { + 'mirror': self.mirror, + 'broadcast': self.broadcast + } + + @staticmethod + def validate_message(message): + if 'key' not in message: + raise ValueError('Invalid TFW message format!') + + def __call__(self, message): + try: + self.proxy_filters._execute_callbacks(message) + except ValueError: + LOG.exception('Invalid TFW message received!') + return + + self.proxy_callbacks._execute_callbacks(message) + + if message['key'] not in self.keyhandlers: + self.to_destination(message) + else: + handler = self.keyhandlers[message['key']] + try: + handler(message) + except KeyError: + LOG.error('Invalid "%s" message format! Ignoring.', handler.__name__) + + def mirror(self, message): + message = message['data'] + LOG.debug('Mirroring message: %s', message) + self.to_source(message) + + def broadcast(self, message): + message = message['data'] + LOG.debug('Broadcasting message: %s', message) + self.to_source(message) + self.to_destination(message) diff --git a/lib/tfw/yaml_fsm.py b/lib/tfw/yaml_fsm.py new file mode 100644 index 0000000..b887a9c --- /dev/null +++ b/lib/tfw/yaml_fsm.py @@ -0,0 +1,64 @@ +from subprocess import Popen, run +from functools import partial +from contextlib import suppress + +import yaml +from transitions import State + +from tfw import FSMBase + + +class YamlFSM(FSMBase): + def __init__(self, config_file): + self.config = self.parse_config(config_file) + self.setup_states() + super().__init__() # FSMBase.__init__() requires states + self.setup_transitions() + + @staticmethod + def parse_config(config_file): + with open(config_file, 'r') as ifile: + return yaml.safe_load(ifile) + + def setup_states(self): + self.for_config_states_and_transitions_do(self.wrap_callbacks_with_subprocess_call) + self.states = [State(**state) for state in self.config['states']] + + def setup_transitions(self): + self.for_config_states_and_transitions_do(self.subscribe_and_remove_predicates) + for transition in self.config['transitions']: + self.add_transition(**transition) + + def for_config_states_and_transitions_do(self, what): + for array in ('states', 'transitions'): + for json_obj in self.config[array]: + what(json_obj) + + @staticmethod + def wrap_callbacks_with_subprocess_call(json_obj): + topatch = ('on_enter', 'on_exit', 'prepare', 'before', 'after') + for key in json_obj: + if key in topatch: + json_obj[key] = partial(run_command_async, json_obj[key]) + + def subscribe_and_remove_predicates(self, json_obj): + if 'predicates' in json_obj: + for predicate in json_obj['predicates']: + self.subscribe_predicate( + json_obj['trigger'], + partial( + command_statuscode_is_zero, + predicate + ) + ) + + with suppress(KeyError): + json_obj.pop('predicates') + + +def run_command_async(command, event): + Popen(command, shell=True) + + +def command_statuscode_is_zero(command): + return run(command, shell=True).returncode == 0 diff --git a/requirements.txt b/requirements.txt index cdc46e3..a324ff0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ pyzmq==17.0.0 transitions==0.6.4 terminado==0.8.1 watchdog==0.8.3 +PyYAML==3.12 diff --git a/supervisor/components/tfw_server.conf b/supervisor/components/tfw_server.conf new file mode 100644 index 0000000..f583be9 --- /dev/null +++ b/supervisor/components/tfw_server.conf @@ -0,0 +1,4 @@ +[program:tfwserver] +user=root +directory=%(ENV_TFW_SERVER_DIR)s +command=python3 tfw_server.py diff --git a/supervisor/tfw_server.py b/supervisor/tfw_server.py new file mode 100644 index 0000000..78766ac --- /dev/null +++ b/supervisor/tfw_server.py @@ -0,0 +1,9 @@ +from tornado.ioloop import IOLoop + +from tfw.networking import TFWServer +from tfw.config import TFWENV + + +if __name__ == '__main__': + TFWServer().listen(TFWENV.WEB_PORT) + IOLoop.instance().start()