Merge branch 'fsm_as_eventhandler'

This commit is contained in:
Kristóf Tóth 2018-07-11 10:23:28 +02:00
commit 09bcb7de6b
24 changed files with 574 additions and 224 deletions

View File

@ -10,4 +10,4 @@ pipeline:
- docker push eu.gcr.io/avatao-challengestore/tutorial-framework:${DRONE_TAG}
when:
event: 'tag'
branch: refs/tags/bombay-20*
branch: refs/tags/mainecoon-20*

View File

@ -37,6 +37,7 @@ ENV PYTHONPATH="/usr/local/lib" \
TFW_LIB_DIR="/usr/local/lib/" \
TFW_TERMINADO_DIR="/tmp/terminado_server" \
TFW_FRONTEND_DIR="/srv/frontend" \
TFW_SERVER_DIR="/srv/.tfw" \
TFW_HISTFILE="/home/${AVATAO_USER}/.bash_history" \
PROMPT_COMMAND="history -a"
@ -45,13 +46,15 @@ RUN echo "export HISTFILE=${TFW_HISTFILE}" >> /tmp/bashrc &&\
cat /tmp/bashrc >> /home/${AVATAO_USER}/.bashrc
COPY supervisor/supervisord.conf ${TFW_SUPERVISORD_CONF}
COPY supervisor/components/ ${TFW_SUPERVISORD_COMPONENTS}
COPY nginx/nginx.conf ${TFW_NGINX_CONF}
COPY nginx/default.conf ${TFW_NGINX_DEFAULT}
COPY nginx/components/ ${TFW_NGINX_COMPONENTS}
COPY lib LICENSE ${TFW_LIB_DIR}
COPY supervisor/tfw_server.py ${TFW_SERVER_DIR}/
RUN for dir in "${TFW_LIB_DIR}"/{tfw,tao,envvars} "/etc/nginx" "/etc/supervisor"; do \
chown -R root:root "$dir" && chmod -R 700 "$dir"; \
chown -R root:root "$dir" && chmod -R 700 "$dir"; \
done
ONBUILD ARG BUILD_CONTEXT="solvable"

185
README.md
View File

@ -20,6 +20,20 @@ Frontend components use websockets to connect to the TFW server, to which you ca
![TFW architecture](docs/tfw_architecture.png)
### Networking details
Event handlers connect to the TFW server using ZMQ.
They receive messages on their `SUB`(scribe) sockets, which are connected to the `PUB`(lish) socket of the server.
Event handlers reply on their `PUSH` socket, then their messages are received on the `PULL` socket of the server.
The TFW server is basically just a fancy proxy.
It's behaviour is quite simple: it proxies every message received from the fontend to the event handlers and vice versa.
The server is also capable of "mirroring" messages back to their source.
This is useful for communication between event handlers or frontend components (event handler to event handler or frontend component to frontend component communication).
Components can also broadcast messages (broadcasted messages are received both by event handlers and the frontend as well).
### Event handlers
Imagine event handlers as callbacks that are invoked when TFW receives a specific type of message. For instance, you could send a message to the framework when the user does something of note.
@ -75,11 +89,180 @@ The TFW message format:
- The `data` object can contain anything you might want to send
- The `trigger` key is an optional field that triggers an FSM action with that name from the current state (whatever that might be)
To mirror messages back to their sources you can use a special messaging format, in which the message to be mirrored is enveloped inside the `data` field of the outer message:
```text
"key": "mirror",
"data":
{
...
The message you want to mirror (with it's own "key" and "data" fields)
...
}
```
Broadcasting messages is possible in a similar manner by using `"key": "broadcast"` in the outer message.
## Where to go next
Most of the components you need have docstrings included (hang on tight, this is work in progress) refer to them for usage info.
In the `docs` folder you can find our Sphinx-based API documentation, which you can build using the `hack/tfw.sh` script in the [test-tutorial-framework](https://github.com/avatao-content/test-tutorial-framework) repository.
In the `docs` folder you can find our Sphinx-based documentation, which you can build using the `hack/tfw.sh` script in the [test-tutorial-framework](https://github.com/avatao-content/test-tutorial-framework) repository.
To get started you should take a look at [test-tutorial-framework](https://github.com/avatao-content/test-tutorial-framework), which serves as an example project as well.
## API
APIs exposed by our pre-witten event handlers are documented here.
### IdeEventHandler
You can read the content of the currently selected file like so:
```
{
"key": "ide",
"data":
{
"command": "read"
}
}
```
Use the following message to overwrite the content of the currently selected file:
```
{
"key": "ide",
"data":
{
"command": "write",
"content": ...string...
}
}
```
To select a file use the following message:
```
{
"key": "ide",
"data":
{
"command": "select",
"filename": ...string...
}
}
```
You can switch to a new working directory using this message (note that the directory must be in `allowed_directories`):
```
{
"key": "ide",
"data":
{
"command": "selectdir",
"directory": ...string...
}
}
```
Overwriting the current list of excluded file patterns is possible with this message:
```
{
"key": "ide",
"data":
{
"command": "exclude",
"exclude": ...array of strings...
}
}
```
### TerminalEventHandler
Writing to the terminal:
```
{
"key": "shell",
"data":
{
"command": "write",
"value": ...string...
}
}
```
You can read terminal command history like so:
```
{
"key": "shell",
"data":
{
"command": "read",
"count": ...number...
}
}
```
### ProcessManagingEventHandler
Starting, stopping and restarting supervisor processes can be done using similar messages (where `command` is `start`, `stop` or `restart`):
```
{
"key": "processmanager",
"data":
{
"command": ...string...,
"process_name": ...string...
}
}
```
### LogMonitoringEventHandler
To change which supervisor process is monitored use this message:
```
{
"key": "logmonitor",
"data" :
{
"command": "process_name",
"value": ...string...
}
}
```
To set the tail length of logs (the monitor will send back the last `value` characters of the log):
```
{
"key": "logmonitor",
"data" :
{
"command": "log_tail",
"value": ...number...
}
}
```
### FSMManagingEventHandler
To attempt executing a trigger on the FSM use:
```
{
"key": "fsm",
"data" :
{
"command": "trigger",
"value": ...string...
}
}
```
To force the broadcasting of an FSM update you can use this message:
```
{
"key": "fsm",
"data" :
{
"command": "update"
}
}
```

View File

@ -1 +1 @@
bombay
mainecoon

View File

@ -26,7 +26,7 @@ author = 'Kristóf Tóth'
# The short X.Y version
version = ''
# The full version, including alpha/beta/rc tags
release = 'bombay'
release = 'mainecoon'
# -- General configuration ---------------------------------------------------

Binary file not shown.

Before

Width:  |  Height:  |  Size: 40 KiB

After

Width:  |  Height:  |  Size: 46 KiB

View File

@ -1,6 +1,7 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from .event_handler_base import EventHandlerBase, TriggeredEventHandler
from .event_handler_base import EventHandlerBase, TriggeredEventHandler, BroadcastingEventHandler
from .fsm_base import FSMBase
from .linear_fsm import LinearFSM
from .yaml_fsm import YamlFSM

View File

@ -8,3 +8,4 @@ from .ide_event_handler import IdeEventHandler
from .history_monitor import HistoryMonitor, BashMonitor, GDBMonitor
from .terminal_commands import TerminalCommands
from .log_monitoring_event_handler import LogMonitoringEventHandler
from .fsm_managing_event_handler import FSMManagingEventHandler

View File

@ -0,0 +1,56 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from tfw import BroadcastingEventHandler
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class FSMManagingEventHandler(BroadcastingEventHandler):
def __init__(self, key, fsm_type):
super().__init__(key)
self.fsm = fsm_type()
self._fsm_updater = FSMUpdater(self.fsm)
self.command_handlers = {
'trigger': self.handle_trigger,
'update': self.handle_update
}
def handle_event(self, message):
try:
data = message['data']
message['data'] = self.command_handlers[data['command']](data)
return message
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def handle_trigger(self, data):
self.fsm.step(data['value'])
return self.with_fsm_update(data)
def with_fsm_update(self, data):
return {
**data,
**self._fsm_updater.get_fsm_state_and_transitions()
}
def handle_update(self, data):
return self.with_fsm_update(data)
class FSMUpdater:
def __init__(self, fsm):
self.fsm = fsm
def get_fsm_state_and_transitions(self):
state = self.fsm.state
valid_transitions = [
{'trigger': trigger}
for trigger in self.fsm.get_triggers(self.fsm.state)
]
return {
'current_state': state,
'valid_transitions': valid_transitions
}

View File

@ -2,9 +2,13 @@
# All Rights Reserved. See LICENSE file for details.
from abc import ABC, abstractmethod
from json import dumps
from hashlib import md5
from tfw.networking import deserialize_tfw_msg
from tfw.networking.event_handlers import ServerConnector
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class EventHandlerBase(ABC):
@ -20,22 +24,23 @@ class EventHandlerBase(ABC):
self.subscribe(self.key, 'reset')
self.server_connector.register_callback(self.event_handler_callback)
def event_handler_callback(self, msg_parts):
def event_handler_callback(self, message):
"""
Callback that is invoked when receiving a message.
Dispatches messages to handler methods and sends
a response back in case the handler returned something.
This is subscribed in __init__().
"""
message = deserialize_tfw_msg(*msg_parts)
response = self.dispatch_handling(message)
if response:
response['key'] = message['key']
self.server_connector.send(response)
def dispatch_handling(self, message):
"""
Used to dispatch messages to their specific handlers.
:param message: the message received
:returns: the message to send back
"""
if message['key'] != 'reset':
return self.handle_event(message)
@ -47,6 +52,7 @@ class EventHandlerBase(ABC):
Abstract method that implements the handling of messages.
:param message: the message received
:returns: the message to send back
"""
raise NotImplementedError
@ -56,6 +62,7 @@ class EventHandlerBase(ABC):
Usually 'reset' events receive some sort of special treatment.
:param message: the message received
:returns: the message to send back
"""
return None
@ -102,3 +109,38 @@ class TriggeredEventHandler(EventHandlerBase, ABC):
if message.get('trigger') == self.trigger:
return super().dispatch_handling(message)
return None
class BroadcastingEventHandler(EventHandlerBase, ABC):
# pylint: disable=abstract-method
"""
Abstract base class for EventHandlers which broadcast responses
and intelligently ignore their own broadcasted messages they receive.
"""
def __init__(self, key):
super().__init__(key)
self.own_message_hashes = []
def event_handler_callback(self, message):
message_hash = self.hash_message(message)
if message_hash in self.own_message_hashes:
self.own_message_hashes.remove(message_hash)
return
response = self.dispatch_handling(message)
if response:
self.own_message_hashes.append(self.hash_message(response))
self.server_connector.send(self.make_broadcast_message(response))
@staticmethod
def hash_message(message):
message_bytes = dumps(message, sort_keys=True).encode()
return md5(message_bytes).hexdigest()
@staticmethod
def make_broadcast_message(message):
return {
'key': 'broadcast',
'data': message
}

View File

@ -1,27 +1,32 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from typing import List
from collections import defaultdict
from transitions import Machine
from transitions import Machine, MachineError
from tfw.mixins import CallbackMixin
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class FSMBase(CallbackMixin):
class FSMBase(Machine, CallbackMixin):
"""
A general FSM base class you can inherit from to track user progress.
See linear_fsm.py for an example use-case.
TFW the transitions library for state machines, please refer to their
TFW uses the transitions library for state machines, please refer to their
documentation for more information on creating your own machines:
https://github.com/pytransitions/transitions
"""
states, transitions = [], []
def __init__(self, initial: str = None, accepted_states: List[str] = None):
def __init__(self, initial=None, accepted_states=None):
self.accepted_states = accepted_states or [self.states[-1]]
self.machine = Machine(
model=self,
self.trigger_predicates = defaultdict(list)
Machine.__init__(
self,
states=self.states,
transitions=self.transitions,
initial=initial or self.states[0],
@ -34,4 +39,27 @@ class FSMBase(CallbackMixin):
self._execute_callbacks(event_data.kwargs)
def is_solved(self):
return self.state in self.accepted_states # pylint: disable=no-member
return self.state in self.accepted_states # pylint: disable=no-member
def subscribe_predicate(self, trigger, *predicates):
self.trigger_predicates[trigger].extend(predicates)
def unsubscribe_predicate(self, trigger, *predicates):
self.trigger_predicates[trigger] = [
predicate
for predicate in self.trigger_predicates[trigger]
not in predicates
]
def step(self, trigger):
predicate_results = (
predicate()
for predicate in self.trigger_predicates[trigger]
)
# TODO: think about what could we do when this prevents triggering
if all(predicate_results):
try:
self.trigger(trigger)
except (AttributeError, MachineError):
LOG.debug('FSM failed to execute nonexistent trigger: "%s"', trigger)

View File

@ -21,6 +21,14 @@ class CallbackMixin:
fun = partial(callback, *args, **kwargs)
self._callbacks.append(fun)
def subscribe_callbacks(self, *callbacks):
"""
Subscribe a list of callbacks to incoke once an event is triggered.
:param callbacks: callbacks to be subscribed
"""
for callback in callbacks:
self.subscribe_callback(callback)
def unsubscribe_callback(self, callback):
self._callbacks.remove(callback)

View File

@ -1,7 +1,7 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from .serialization import serialize_tfw_msg, deserialize_tfw_msg, validate_message
from .serialization import serialize_tfw_msg, deserialize_tfw_msg, with_deserialize_tfw_msg
from .zmq_connector_base import ZMQConnectorBase
# from .controller_connector import ControllerConnector # TODO: readd once controller stuff is resolved
from .message_sender import MessageSender

View File

@ -6,9 +6,12 @@ from functools import partial
import zmq
from zmq.eventloop.zmqstream import ZMQStream
from tfw.networking import serialize_tfw_msg
from tfw.networking import serialize_tfw_msg, with_deserialize_tfw_msg
from tfw.networking import ZMQConnectorBase
from tfw.config import TFWENV
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class ServerDownlinkConnector(ZMQConnectorBase):
@ -20,7 +23,10 @@ class ServerDownlinkConnector(ZMQConnectorBase):
self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE)
self.unsubscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.UNSUBSCRIBE)
self.register_callback = self._zmq_sub_stream.on_recv
def register_callback(self, callback):
callback = with_deserialize_tfw_msg(callback)
self._zmq_sub_stream.on_recv(callback)
class ServerUplinkConnector(ZMQConnectorBase):

View File

@ -22,10 +22,7 @@ The purpose of this module is abstracting away this low level behaviour.
"""
import json
def validate_message(message):
return 'key' in message
from functools import wraps
def serialize_tfw_msg(message):
@ -35,6 +32,14 @@ def serialize_tfw_msg(message):
return _serialize_all(message['key'], message)
def with_deserialize_tfw_msg(fun):
@wraps(fun)
def wrapper(message_parts):
message = deserialize_tfw_msg(*message_parts)
return fun(message)
return wrapper
def deserialize_tfw_msg(*args):
"""
Return message from TFW multipart data

View File

@ -3,5 +3,4 @@
from .event_handler_connector import EventHandlerConnector, EventHandlerUplinkConnector, EventHandlerDownlinkConnector
from .tfw_server import TFWServer
from .zmq_websocket_handler import ZMQWebSocketProxy
# from .controller_responder import ControllerResponder # TODO: readd once controller stuff is resolved

View File

@ -4,7 +4,7 @@
import zmq
from zmq.eventloop.zmqstream import ZMQStream
from tfw.networking import ZMQConnectorBase, serialize_tfw_msg
from tfw.networking import ZMQConnectorBase, serialize_tfw_msg, with_deserialize_tfw_msg
from tfw.config import TFWENV
from tfw.config.logs import logging
@ -32,6 +32,7 @@ class EventHandlerUplinkConnector(ZMQConnectorBase):
class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector):
def register_callback(self, callback):
callback = with_deserialize_tfw_msg(callback)
self._zmq_pull_stream.on_recv(callback)
def send_message(self, message: dict):

View File

@ -1,15 +1,12 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from collections import defaultdict
from tornado.web import Application
from tfw.networking import MessageSender
from tfw.networking.event_handlers import ServerUplinkConnector
from tfw.networking.server import EventHandlerConnector
from tfw.config.logs import logging
from .zmq_websocket_handler import ZMQWebSocketProxy
from .zmq_websocket_proxy import ZMQWebSocketProxy
LOG = logging.getLogger(__name__)
@ -18,117 +15,29 @@ class TFWServer:
"""
This class handles the proxying of messages between the frontend and event handers.
It proxies messages from the "/ws" route to all event handlers subscribed to a ZMQ
SUB socket. It also manages an FSM you can define as a constructor argument.
SUB socket.
"""
def __init__(self, fsm_type):
"""
:param fsm_type: the type of FSM you want TFW to use
"""
self._fsm = fsm_type()
self._fsm_updater = FSMUpdater(self._fsm)
self._fsm_manager = FSMManager(self._fsm)
self._fsm.subscribe_callback(self._fsm_updater.update)
def __init__(self):
self._event_handler_connector = EventHandlerConnector()
self._uplink_connector = ServerUplinkConnector()
self.application = Application([(
r'/ws', ZMQWebSocketProxy,{
'make_eventhandler_message': self.make_eventhandler_message,
'proxy_filter': self.proxy_filter,
'handle_trigger': self.handle_trigger,
'event_handler_connector': self._event_handler_connector
'event_handler_connector': self._event_handler_connector,
'message_handlers': [self.handle_trigger]
})]
)
# self.controller_responder = ControllerResponder(self.fsm)
# TODO: add this once controller stuff is resolved
@property
def fsm(self):
return self._fsm
@property
def fsm_manager(self):
return self._fsm_manager
def make_eventhandler_message(self, message):
self.trigger_fsm(message)
message['FSMUpdate'] = self._fsm_updater.get_fsm_state_and_transitions()
return message
def handle_trigger(self, message):
LOG.debug('Executing handler for trigger "%s"', message.get('trigger', ''))
self.trigger_fsm(message)
def trigger_fsm(self, message):
trigger = message.get('trigger', '')
try:
self._fsm_manager.trigger(trigger, message)
except AttributeError:
LOG.debug('FSM failed to execute nonexistent trigger: "%s"', trigger)
def proxy_filter(self, message):
# pylint: disable=unused-argument,no-self-use
return True
if 'trigger' in message:
LOG.debug('Executing handler for trigger "%s"', message.get('trigger', ''))
self._uplink_connector.send_to_eventhandler({
'key': 'fsm',
'data': {
'command': 'trigger',
'value': message.get('trigger', '')
}
})
def listen(self, port):
self.application.listen(port)
class FSMManager:
def __init__(self, fsm):
self._fsm = fsm
self.trigger_predicates = defaultdict(list)
self.messenge_sender = MessageSender()
@property
def fsm(self):
return self._fsm
def trigger(self, trigger, message):
predicate_results = []
for predicate in self.trigger_predicates[trigger]:
success, message = predicate(message)
predicate_results.append(success)
self.messenge_sender.send('FSM', message)
if all(predicate_results):
try:
self.fsm.trigger(trigger, message=message)
except AttributeError:
LOG.debug('FSM failed to execute nonexistent trigger: "%s"', trigger)
def subscribe_predicate(self, trigger, *predicates):
self.trigger_predicates[trigger].extend(predicates)
def unsubscribe_predicate(self, trigger, *predicates):
self.trigger_predicates[trigger] = [
predicate
for predicate in self.trigger_predicates[trigger]
not in predicates
]
class FSMUpdater:
def __init__(self, fsm):
self.fsm = fsm
self.uplink = ServerUplinkConnector()
def update(self, kwargs_dict):
# pylint: disable=unused-argument
self.uplink.send(self.generate_fsm_update())
def generate_fsm_update(self):
return {
'key': 'FSMUpdate',
'data': self.get_fsm_state_and_transitions()
}
def get_fsm_state_and_transitions(self):
state = self.fsm.state
valid_transitions = [
{'trigger': trigger}
for trigger in self.fsm.machine.get_triggers(self.fsm.state)
]
return {
'current_state': state,
'valid_transitions': valid_transitions
}

View File

@ -1,91 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
import json
from abc import ABC, abstractmethod
from tornado.websocket import WebSocketHandler
from tfw.networking import deserialize_tfw_msg, validate_message
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class ZMQWebSocketHandler(WebSocketHandler, ABC):
instances = set()
def initialize(self, **kwargs): # pylint: disable=arguments-differ
self._event_handler_connector = kwargs['event_handler_connector']
def prepare(self):
ZMQWebSocketHandler.instances.add(self)
def on_close(self):
ZMQWebSocketHandler.instances.remove(self)
def open(self, *args, **kwargs):
LOG.debug('WebSocket connection initiated')
self._event_handler_connector.register_callback(self.zmq_callback)
def zmq_callback(self, msg_parts):
keyhandlers = {'mirror': self.mirror}
message = deserialize_tfw_msg(*msg_parts)
LOG.debug('Received on pull socket: %s', message)
if not validate_message(message):
return
self.handle_trigger(message)
if message['key'] not in keyhandlers:
for instance in ZMQWebSocketHandler.instances:
instance.write_message(message)
else:
try:
keyhandlers[message['key']](message)
except KeyError:
LOG.error('Invalid mirror message format! Ignoring.')
def mirror(self, message):
message = message['data']
self._event_handler_connector.send_message(message)
def on_message(self, message):
LOG.debug('Received on WebSocket: %s', message)
if validate_message(message):
self.send_message(self.make_eventhandler_message(message))
@abstractmethod
def make_eventhandler_message(self, message):
raise NotImplementedError
def send_message(self, message: dict):
self._event_handler_connector.send_message(message)
@abstractmethod
def handle_trigger(self, message):
raise NotImplementedError
# much secure, very cors, wow
def check_origin(self, origin):
return True
class ZMQWebSocketProxy(ZMQWebSocketHandler):
# pylint: disable=abstract-method
def initialize(self, **kwargs): # pylint: disable=arguments-differ
super(ZMQWebSocketProxy, self).initialize(**kwargs)
self._make_eventhandler_message = kwargs['make_eventhandler_message']
self._proxy_filter = kwargs['proxy_filter']
self._handle_trigger = kwargs['handle_trigger']
def on_message(self, message):
message = json.loads(message)
if self._proxy_filter(message):
super().on_message(message)
def make_eventhandler_message(self, message):
return self._make_eventhandler_message(message)
def handle_trigger(self, message):
self._handle_trigger(message)

View File

@ -0,0 +1,121 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
import json
from tornado.websocket import WebSocketHandler
from tfw.mixins import CallbackMixin
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class ZMQWebSocketProxy(WebSocketHandler):
instances = set()
def initialize(self, **kwargs): # pylint: disable=arguments-differ
self._event_handler_connector = kwargs['event_handler_connector']
self._message_handlers = kwargs.get('message_handlers', [])
self._proxy_filters = kwargs.get('proxy_filters', [])
self.proxy_eventhandler_to_websocket = TFWProxy(
self.send_eventhandler_message,
self.send_websocket_message
)
self.proxy_websocket_to_eventhandler = TFWProxy(
self.send_websocket_message,
self.send_eventhandler_message
)
proxies = (self.proxy_eventhandler_to_websocket, self.proxy_websocket_to_eventhandler)
for proxy in proxies:
proxy.proxy_filters.subscribe_callbacks(*self._proxy_filters)
proxy.proxy_callbacks.subscribe_callbacks(*self._message_handlers)
def prepare(self):
ZMQWebSocketProxy.instances.add(self)
def on_close(self):
ZMQWebSocketProxy.instances.remove(self)
def open(self, *args, **kwargs):
LOG.debug('WebSocket connection initiated')
self._event_handler_connector.register_callback(self.eventhander_callback)
def eventhander_callback(self, message):
"""
Invoked on ZMQ messages from event handlers.
"""
LOG.debug('Received on pull socket: %s', message)
self.proxy_eventhandler_to_websocket(message)
def on_message(self, message):
"""
Invoked on WS messages from frontend.
"""
message = json.loads(message)
LOG.debug('Received on WebSocket: %s', message)
self.proxy_websocket_to_eventhandler(message)
def send_eventhandler_message(self, message):
self._event_handler_connector.send_message(message)
@staticmethod
def send_websocket_message(message):
for instance in ZMQWebSocketProxy.instances:
instance.write_message(message)
# much secure, very cors, wow
def check_origin(self, origin):
return True
class TFWProxy:
def __init__(self, to_source, to_destination):
self.to_source = to_source
self.to_destination = to_destination
self.proxy_filters = CallbackMixin()
self.proxy_callbacks = CallbackMixin()
self.proxy_filters.subscribe_callback(self.validate_message)
self.keyhandlers = {
'mirror': self.mirror,
'broadcast': self.broadcast
}
@staticmethod
def validate_message(message):
if 'key' not in message:
raise ValueError('Invalid TFW message format!')
def __call__(self, message):
try:
self.proxy_filters._execute_callbacks(message)
except ValueError:
LOG.exception('Invalid TFW message received!')
return
self.proxy_callbacks._execute_callbacks(message)
if message['key'] not in self.keyhandlers:
self.to_destination(message)
else:
handler = self.keyhandlers[message['key']]
try:
handler(message)
except KeyError:
LOG.error('Invalid "%s" message format! Ignoring.', handler.__name__)
def mirror(self, message):
message = message['data']
LOG.debug('Mirroring message: %s', message)
self.to_source(message)
def broadcast(self, message):
message = message['data']
LOG.debug('Broadcasting message: %s', message)
self.to_source(message)
self.to_destination(message)

64
lib/tfw/yaml_fsm.py Normal file
View File

@ -0,0 +1,64 @@
from subprocess import Popen, run
from functools import partial
from contextlib import suppress
import yaml
from transitions import State
from tfw import FSMBase
class YamlFSM(FSMBase):
def __init__(self, config_file):
self.config = self.parse_config(config_file)
self.setup_states()
super().__init__() # FSMBase.__init__() requires states
self.setup_transitions()
@staticmethod
def parse_config(config_file):
with open(config_file, 'r') as ifile:
return yaml.safe_load(ifile)
def setup_states(self):
self.for_config_states_and_transitions_do(self.wrap_callbacks_with_subprocess_call)
self.states = [State(**state) for state in self.config['states']]
def setup_transitions(self):
self.for_config_states_and_transitions_do(self.subscribe_and_remove_predicates)
for transition in self.config['transitions']:
self.add_transition(**transition)
def for_config_states_and_transitions_do(self, what):
for array in ('states', 'transitions'):
for json_obj in self.config[array]:
what(json_obj)
@staticmethod
def wrap_callbacks_with_subprocess_call(json_obj):
topatch = ('on_enter', 'on_exit', 'prepare', 'before', 'after')
for key in json_obj:
if key in topatch:
json_obj[key] = partial(run_command_async, json_obj[key])
def subscribe_and_remove_predicates(self, json_obj):
if 'predicates' in json_obj:
for predicate in json_obj['predicates']:
self.subscribe_predicate(
json_obj['trigger'],
partial(
command_statuscode_is_zero,
predicate
)
)
with suppress(KeyError):
json_obj.pop('predicates')
def run_command_async(command, event):
Popen(command, shell=True)
def command_statuscode_is_zero(command):
return run(command, shell=True).returncode == 0

View File

@ -3,3 +3,4 @@ pyzmq==17.0.0
transitions==0.6.4
terminado==0.8.1
watchdog==0.8.3
PyYAML==3.12

View File

@ -0,0 +1,4 @@
[program:tfwserver]
user=root
directory=%(ENV_TFW_SERVER_DIR)s
command=python3 tfw_server.py

9
supervisor/tfw_server.py Normal file
View File

@ -0,0 +1,9 @@
from tornado.ioloop import IOLoop
from tfw.networking import TFWServer
from tfw.config import TFWENV
if __name__ == '__main__':
TFWServer().listen(TFWENV.WEB_PORT)
IOLoop.instance().start()