Merge pull request #31 from avatao-content/networking_refactor

Networking refactor
This commit is contained in:
Bokros Bálint 2018-04-16 13:04:56 +02:00 committed by GitHub
commit 18124bf8c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 162 additions and 123 deletions

View File

@ -1,3 +1,2 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.

View File

@ -3,31 +3,24 @@
from functools import wraps
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler as FileSystemWatchdogEventHandler
from tfw.networking.event_handlers import ServerUplinkConnector
from tfw.components.decorators import RateLimiter
from tfw.decorators import RateLimiter
from tfw.mixins import ObserverMixin
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class DirectoryMonitor:
class DirectoryMonitor(ObserverMixin):
def __init__(self, directory):
self.observer = Observer()
ObserverMixin.__init__(self)
self.eventhandler = WebideReloadWatchdogEventHandler()
self.observer.schedule(self.eventhandler, directory, recursive=True)
self.pause, self.resume = self.eventhandler.pause, self.eventhandler.resume
def watch(self):
self.observer.start()
def stop(self):
self.observer.stop()
self.observer.join()
@property
def ignore(self):
return self.eventhandler.ignore
@ -70,8 +63,8 @@ class WebideReloadWatchdogEventHandler(FileSystemWatchdogEventHandler):
self.ignore = self.ignore - 1
return
LOG.debug(event)
key = 'webide'
self.uplink.send(key, {'data': {'command': 'reload'}})
self.uplink.send({'key': 'webide',
'data': {'command': 'reload'}})
def with_monitor_paused(fun):

View File

@ -5,17 +5,18 @@ from os.path import isdir, exists
from tfw import TriggerlessEventHandler
from tfw.config.logs import logging
from tfw.mixins import MonitorManagerMixin
from .directory_monitor import DirectoryMonitor
LOG = logging.getLogger(__name__)
class DirectoryMonitoringEventHandler(TriggerlessEventHandler):
class DirectoryMonitoringEventHandler(TriggerlessEventHandler, MonitorManagerMixin):
def __init__(self, key, directory):
super().__init__(key)
self._directory = directory
self._monitor = None
self.reload_monitor()
MonitorManagerMixin.__init__(self, DirectoryMonitor, self._directory)
self.commands = {'pause': self.pause,
'resume': self.resume,
'ignore': self.ignore,
@ -31,20 +32,7 @@ class DirectoryMonitoringEventHandler(TriggerlessEventHandler):
raise EnvironmentError('No such directory!')
self._directory = directory
@property
def monitor(self):
return self._monitor
def reload_monitor(self):
if self._monitor:
try:
self._monitor.stop()
except KeyError:
LOG.debug('Working directory was removed ignoring...')
self._monitor = DirectoryMonitor(self._directory)
self._monitor.watch() # This runs on a separate thread
def handle_event(self, key, message):
def handle_event(self, message):
try:
message['data'] = self.commands[message['data']['command']](message['data'])
return message

View File

@ -6,11 +6,10 @@ from re import findall
from re import compile as compileregex
from abc import ABC, abstractmethod
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
from tfw.components.mixins import CallbackMixin
from tfw.components.decorators import RateLimiter
from tfw.mixins import CallbackMixin, ObserverMixin
from tfw.decorators import RateLimiter
class CallbackEventHandler(PatternMatchingEventHandler, ABC):
@ -24,13 +23,13 @@ class CallbackEventHandler(PatternMatchingEventHandler, ABC):
callback()
class HistoryMonitor(CallbackMixin, ABC):
class HistoryMonitor(CallbackMixin, ObserverMixin, ABC):
def __init__(self, histfile):
CallbackMixin.__init__(self)
ObserverMixin.__init__(self)
self.histfile = histfile
self._history = []
self._last_length = len(self._history)
self.observer = Observer()
self.observer.schedule(CallbackEventHandler([self.histfile],
self._fetch_history,
self._invoke_callbacks),
@ -60,13 +59,6 @@ class HistoryMonitor(CallbackMixin, ABC):
if self._last_length < len(self._history):
self._execute_callbacks(self.history)
def watch(self):
self.observer.start()
def stop(self):
self.observer.stop()
self.observer.join()
class BashMonitor(HistoryMonitor):
@property

View File

@ -4,7 +4,7 @@
from xmlrpc.client import Fault as SupervisorFault
from tfw import TriggerlessEventHandler
from tfw.components.mixins import SupervisorMixin
from tfw.mixins import SupervisorMixin
from tfw.config.logs import logging
from .directory_monitor import with_monitor_paused
@ -30,7 +30,7 @@ class ProcessManagingEventHandler(TriggerlessEventHandler):
self.processmanager = ProcessManager()
@with_monitor_paused
def handle_event(self, key, message):
def handle_event(self, message):
try:
data = message['data']
self.processmanager(data['command'], data['process_name'])

View File

@ -27,10 +27,11 @@ class TerminadoEventHandler(TriggerlessEventHandler):
def historymonitor(self):
return self._historymonitor
def handle_event(self, key, message):
def handle_event(self, message):
LOG.debug('TerminadoEventHandler received event: %s', message)
try:
message['data'] = self.commands[message['data']['command']](message['data'])
data = message['data']
message['data'] = self.commands[data['command']](data)
return message
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -7,6 +7,7 @@ from fnmatch import fnmatchcase
from collections import Iterable
from tfw import TriggerlessEventHandler
from tfw.mixins import MonitorManagerMixin
from tfw.config.logs import logging
from .directory_monitor import DirectoryMonitor
@ -58,7 +59,7 @@ class FileManager: # pylint: disable=too-many-instance-attributes
@filename.setter
def filename(self, filename):
if not filename in self.files:
if filename not in self.files:
raise EnvironmentError('No such file in workdir!')
self._filename = filename
@ -90,12 +91,13 @@ class FileManager: # pylint: disable=too-many-instance-attributes
return relpath(self._filepath(filename), start=self._workdir)
class WebideEventHandler(TriggerlessEventHandler):
class WebideEventHandler(TriggerlessEventHandler, MonitorManagerMixin):
# pylint: disable=too-many-arguments
def __init__(self, key, directory, allowed_directories, selected_file=None, exclude=None):
super().__init__(key)
self.filemanager = FileManager(allowed_directories=allowed_directories, working_directory=directory,
selected_file=selected_file, exclude=exclude)
MonitorManagerMixin.__init__(self, DirectoryMonitor, self.filemanager.workdir)
self.commands = {'read': self.read,
'write': self.write,
@ -103,22 +105,6 @@ class WebideEventHandler(TriggerlessEventHandler):
'selectdir': self.select_dir,
'exclude': self.exclude}
self._monitor = None
self.reload_monitor()
@property
def monitor(self):
return self._monitor
def reload_monitor(self):
if self._monitor:
try:
self._monitor.stop()
except KeyError:
LOG.debug('Working directory was removed ignoring...')
self._monitor = DirectoryMonitor(self.filemanager.workdir)
self._monitor.watch() # This runs on a separate thread
def read(self, data):
try:
data['content'] = self.filemanager.file_contents
@ -171,7 +157,7 @@ class WebideEventHandler(TriggerlessEventHandler):
data['files'] = self.filemanager.files
data['directory'] = self.filemanager.workdir
def handle_event(self, key, message):
def handle_event(self, message):
try:
data = message['data']
message['data'] = self.commands[data['command']](data)

View File

@ -3,7 +3,7 @@
from abc import ABC, abstractmethod
from tfw.networking import deserialize_all
from tfw.networking import deserialize_tfw_msg
from tfw.networking.event_handlers import ServerConnector
@ -17,25 +17,26 @@ class EventHandlerBase(ABC):
self.server_connector.register_callback(self.event_handler_callback)
def event_handler_callback(self, msg_parts):
key, message = deserialize_all(*msg_parts)
response = self.dispatch_handling(key, message)
message = deserialize_tfw_msg(*msg_parts)
response = self.dispatch_handling(message)
response['key'] = message['key']
if response is None:
return
self.server_connector.send(key, response)
self.server_connector.send(response)
@abstractmethod
def dispatch_handling(self, key, message):
def dispatch_handling(self, message):
raise NotImplementedError
def _dispatch_handling(self, key, message):
def _dispatch_handling(self, message):
# pylint: disable=no-else-return
if key != 'reset':
return self.handle_event(key, message)
if message['key'] != 'reset':
return self.handle_event(message)
else:
return self.handle_reset(message)
@abstractmethod
def handle_event(self, key, message):
def handle_event(self, message):
raise NotImplementedError
def handle_reset(self, message):
@ -65,8 +66,8 @@ class EventHandlerBase(ABC):
class TriggerlessEventHandler(EventHandlerBase, ABC):
# pylint: disable=abstract-method
def dispatch_handling(self, key, message):
return self._dispatch_handling(key, message)
def dispatch_handling(self, message):
return self._dispatch_handling(message)
class TriggeredEventHandler(EventHandlerBase, ABC):
@ -75,7 +76,7 @@ class TriggeredEventHandler(EventHandlerBase, ABC):
super().__init__(key)
self.trigger = trigger
def dispatch_handling(self, key, message):
def dispatch_handling(self, message):
if message.get('trigger') == self.trigger:
return self._dispatch_handling(key, message)
return self._dispatch_handling(message)
return None

View File

@ -5,7 +5,7 @@ from typing import List
from transitions import Machine
from tfw.components.mixins import CallbackMixin
from tfw.mixins import CallbackMixin
class FSMBase(CallbackMixin):

View File

@ -3,3 +3,5 @@
from .supervisor_mixin import SupervisorMixin
from .callback_mixin import CallbackMixin
from .observer_mixin import ObserverMixin
from .monitor_manager_mixin import MonitorManagerMixin

View File

@ -0,0 +1,27 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class MonitorManagerMixin:
def __init__(self, monitor_type, directory):
self._monitor_type = monitor_type
self._monitor = None
self._monitored_directory = directory
self.reload_monitor()
@property
def monitor(self):
return self._monitor
def reload_monitor(self):
if self._monitor:
try:
self._monitor.stop()
except KeyError:
LOG.debug('Working directory was removed ignoring...')
self._monitor = self._monitor_type(self._monitored_directory)
self._monitor.watch() # This runs on a separate thread

View File

@ -0,0 +1,16 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from watchdog.observers import Observer
class ObserverMixin:
def __init__(self):
self.observer = Observer()
def watch(self):
self.observer.start()
def stop(self):
self.observer.stop()
self.observer.join()

View File

@ -1,9 +1,9 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from .serialization import decode_if_needed, encode_if_needed, serialize_all, deserialize_all
from .serialization import serialize_tfw_msg, deserialize_tfw_msg, validate_message
from .zmq_connector_base import ZMQConnectorBase
from .controller_connector import ControllerConnector
# from .controller_connector import ControllerConnector # TODO: readd once controller stuff is resolved
from .message_sender import MessageSender
from .event_handlers.server_connector import ServerUplinkConnector as TFWServerConnector
from .server.tfw_server import TFWServer

View File

@ -6,7 +6,7 @@ from functools import partial
import zmq
from zmq.eventloop.zmqstream import ZMQStream
from tfw.networking import serialize_all
from tfw.networking import serialize_tfw_msg
from tfw.networking import ZMQConnectorBase
from tfw.config import TFWENV
@ -29,13 +29,14 @@ class ServerUplinkConnector(ZMQConnectorBase):
self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH)
self._zmq_push_socket.connect('tcp://localhost:{}'.format(TFWENV.RECEIVER_PORT))
def send_to_eventhandler(self, key, message):
message['data']['key'] = key
self.send('mirror', message)
def send_to_eventhandler(self, message):
nested_message = {'key': message['key'], 'data': message.pop('data')}
message['key'] = 'mirror'
message['data'] = nested_message
self.send(message)
def send(self, key, message):
message['key'] = key
self._zmq_push_socket.send_multipart(serialize_all(key, message))
def send(self, message):
self._zmq_push_socket.send_multipart(serialize_tfw_msg(message))
class ServerConnector(ServerUplinkConnector, ServerDownlinkConnector):

View File

@ -17,5 +17,5 @@ class MessageSender:
'timestamp': datetime.now().isoformat(),
'message': message
}
response = {'data': data}
self.server_connector.send(self.key, response)
self.server_connector.send({'key': self.key,
'data': data})

View File

@ -1,37 +1,69 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
"""
TFW JSON message format
message:
{
"key": string, # addressing
"data": {...}, # payload
"trigger": string # FSM trigger
}
ZeroMQ's sub-pub sockets use enveloped messages
(http://zguide.zeromq.org/page:all#Pub-Sub-Message-Envelopes)
and TFW also uses them internally. This means that on ZMQ sockets
we always send the messages key separately and then the actual
message (which contains the key as well) like so:
socket.send_multipart([message['key'], message])
The purpose of this module is abstracting away this low level behaviour.
"""
import json
def serialize_all(*args):
def validate_message(message):
return 'key' in message
def serialize_tfw_msg(message):
return _serialize_all(message['key'], message)
def deserialize_tfw_msg(*args):
return _deserialize_all(*args)[1]
def _serialize_all(*args):
return tuple(_serialize_single(arg) for arg in args)
def deserialize_all(*args):
def _deserialize_all(*args):
return tuple(_deserialize_single(arg) for arg in args)
def _serialize_single(data):
if not isinstance(data, str):
data = json.dumps(data)
return encode_if_needed(data)
return _encode_if_needed(data)
def _deserialize_single(data):
try:
return json.loads(data)
except ValueError:
return decode_if_needed(data)
return _decode_if_needed(data)
def encode_if_needed(value):
def _encode_if_needed(value):
if isinstance(value, str):
value = value.encode('utf-8')
return value
def decode_if_needed(value):
def _decode_if_needed(value):
if isinstance(value, (bytes, bytearray)):
value = value.decode('utf-8')
return value

View File

@ -4,4 +4,4 @@
from .event_handler_connector import EventHandlerConnector, EventHandlerUplinkConnector, EventHandlerDownlinkConnector
from .tfw_server import TFWServer
from .zmq_websocket_handler import ZMQWebSocketProxy
from .controller_responder import ControllerResponder
# from .controller_responder import ControllerResponder # TODO: readd once controller stuff is resolved

View File

@ -4,7 +4,7 @@
import zmq
from zmq.eventloop.zmqstream import ZMQStream
from tfw.networking import ZMQConnectorBase, serialize_all
from tfw.networking import ZMQConnectorBase, serialize_tfw_msg
from tfw.config import TFWENV
from tfw.config.logs import logging
@ -34,7 +34,5 @@ class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkCon
def register_callback(self, callback):
self._zmq_pull_stream.on_recv(callback)
def send_message(self, message: dict, key: str = None):
if not key:
key = message.get('key', '')
self._zmq_pub_socket.send_multipart(serialize_all(key, message))
def send_message(self, message: dict):
self._zmq_pub_socket.send_multipart(serialize_tfw_msg(message))

View File

@ -23,7 +23,7 @@ class TFWServer:
self._event_handler_connector = EventHandlerConnector()
self.application = Application(
[(r'/ws', ZMQWebSocketProxy, {'make_response': self.make_response,
[(r'/ws', ZMQWebSocketProxy, {'make_eventhandler_message': self.make_eventhandler_message,
'proxy_filter': self.proxy_filter,
'handle_trigger': self.handle_trigger,
'event_handler_connector': self._event_handler_connector})]
@ -38,7 +38,7 @@ class TFWServer:
def fsm_manager(self):
return self._fsm_manager
def make_response(self, message):
def make_eventhandler_message(self, message):
self.trigger_fsm(message)
message['FSMUpdate'] = self._fsm_updater.get_fsm_state_and_transitions()
return message
@ -100,12 +100,11 @@ class FSMUpdater:
def update(self, kwargs_dict):
# pylint: disable=unused-argument
self.uplink.send(*self.generate_fsm_update())
self.uplink.send(self.generate_fsm_update())
def generate_fsm_update(self):
key = 'FSMUpdate'
response = {'data': self.get_fsm_state_and_transitions()}
return key, response
return {'key': 'FSMUpdate',
'data': self.get_fsm_state_and_transitions()}
def get_fsm_state_and_transitions(self):
state = self.fsm.state

View File

@ -6,7 +6,7 @@ from abc import ABC, abstractmethod
from tornado.websocket import WebSocketHandler
from tfw.networking import deserialize_all
from tfw.networking import deserialize_tfw_msg, validate_message
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
@ -31,32 +31,36 @@ class ZMQWebSocketHandler(WebSocketHandler, ABC):
def zmq_callback(self, msg_parts):
keyhandlers = {'mirror': self.mirror}
key, message = deserialize_all(*msg_parts)
message = deserialize_tfw_msg(*msg_parts)
LOG.debug('Received on pull socket: %s', message)
if not validate_message(message):
return
self.handle_trigger(message)
if key not in keyhandlers:
if message['key'] not in keyhandlers:
for instance in ZMQWebSocketHandler.instances:
instance.write_message(message)
else:
try:
keyhandlers[key](message['data'])
keyhandlers[message['key']](message)
except KeyError:
LOG.error('Invalid mirror message format! Ignoring.')
def mirror(self, data):
key = data['key']
self._event_handler_connector.send_message({'data': data}, key)
def mirror(self, message):
message = message['data']
self._event_handler_connector.send_message(message)
def on_message(self, message):
LOG.debug('Received on WebSocket: %s', message)
self.send_message(self.make_response(message))
if validate_message(message):
self.send_message(self.make_eventhandler_message(message))
@abstractmethod
def make_response(self, message):
def make_eventhandler_message(self, message):
raise NotImplementedError
def send_message(self, message: dict, key: str = None):
self._event_handler_connector.send_message(message, key)
def send_message(self, message: dict):
self._event_handler_connector.send_message(message)
@abstractmethod
def handle_trigger(self, message):
@ -71,7 +75,7 @@ class ZMQWebSocketProxy(ZMQWebSocketHandler):
# pylint: disable=abstract-method
def initialize(self, **kwargs): # pylint: disable=arguments-differ
super(ZMQWebSocketProxy, self).initialize(**kwargs)
self._make_response = kwargs['make_response']
self._make_eventhandler_message = kwargs['make_eventhandler_message']
self._proxy_filter = kwargs['proxy_filter']
self._handle_trigger = kwargs['handle_trigger']
@ -80,8 +84,8 @@ class ZMQWebSocketProxy(ZMQWebSocketHandler):
if self._proxy_filter(message):
super().on_message(message)
def make_response(self, message):
return self._make_response(message)
def make_eventhandler_message(self, message):
return self._make_eventhandler_message(message)
def handle_trigger(self, message):
self._handle_trigger(message)