Merge branch 'networking-rework' into chausie

This commit is contained in:
Kristóf Tóth 2019-06-04 14:45:13 +02:00
commit bed99c20be
38 changed files with 384 additions and 475 deletions

View File

@ -6,6 +6,7 @@ disable = missing-docstring, too-few-public-methods, invalid-name
[SIMILARITIES] [SIMILARITIES]
min-similarity-lines=6
ignore-comments=yes ignore-comments=yes
ignore-docstrings=yes ignore-docstrings=yes
ignore-imports=yes ignore-imports=yes

View File

@ -23,8 +23,8 @@ ENV TFW_PUBLIC_PORT=8888 \
TFW_LOGIN_APP_PORT=6666 \ TFW_LOGIN_APP_PORT=6666 \
TFW_TERMINADO_PORT=7878 \ TFW_TERMINADO_PORT=7878 \
TFW_SUPERVISOR_HTTP_PORT=9001 \ TFW_SUPERVISOR_HTTP_PORT=9001 \
TFW_PUBLISHER_PORT=7654 \ TFW_PUB_PORT=7654 \
TFW_RECEIVER_PORT=8765 TFW_PULL_PORT=8765
EXPOSE ${TFW_PUBLIC_PORT} EXPOSE ${TFW_PUBLIC_PORT}

View File

@ -14,3 +14,5 @@ from .pipe_io_event_handler import PipeIOEventHandlerBase, PipeIOEventHandler, P
from .pipe_io_event_handler import TransformerPipeIOEventHandler, CommandEventHandler from .pipe_io_event_handler import TransformerPipeIOEventHandler, CommandEventHandler
from .directory_snapshotting_event_handler import DirectorySnapshottingEventHandler from .directory_snapshotting_event_handler import DirectorySnapshottingEventHandler
from .commands_equal import CommandsEqual from .commands_equal import CommandsEqual
from .frontend_event_handler import FrontendEventHandler
from .message_sender import MessageSender

View File

@ -5,7 +5,8 @@ from functools import wraps
from watchdog.events import FileSystemEventHandler as FileSystemWatchdogEventHandler from watchdog.events import FileSystemEventHandler as FileSystemWatchdogEventHandler
from tfw.networking.event_handlers.server_connector import ServerUplinkConnector from tfw.networking import Scope
from tfw.event_handlers import TFWServerUplinkConnector
from tfw.decorators.rate_limiter import RateLimiter from tfw.decorators.rate_limiter import RateLimiter
from tfw.mixins.observer_mixin import ObserverMixin from tfw.mixins.observer_mixin import ObserverMixin
@ -47,7 +48,7 @@ class IdeReloadWatchdogEventHandler(FileSystemWatchdogEventHandler):
def __init__(self, ide_key): def __init__(self, ide_key):
super().__init__() super().__init__()
self.ide_key = ide_key self.ide_key = ide_key
self.uplink = ServerUplinkConnector() self.uplink = TFWServerUplinkConnector()
self._paused = False self._paused = False
self.ignore = 0 self.ignore = 0
@ -65,10 +66,10 @@ class IdeReloadWatchdogEventHandler(FileSystemWatchdogEventHandler):
self.ignore = self.ignore - 1 self.ignore = self.ignore - 1
return return
LOG.debug(event) LOG.debug(event)
self.uplink.send({ self.uplink.send_message({
'key': self.ide_key, 'key': self.ide_key,
'data': {'command': 'reload'} 'data': {'command': 'reload'}
}) }, Scope.WEBSOCKET)
def with_monitor_paused(fun): def with_monitor_paused(fun):

View File

@ -3,7 +3,7 @@
from os.path import isdir, exists from os.path import isdir, exists
from tfw.event_handler_base import EventHandlerBase from tfw.event_handlers import FrontendEventHandlerBase
from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin
from tfw.components.directory_monitor import DirectoryMonitor from tfw.components.directory_monitor import DirectoryMonitor
from tfw.config.logs import logging from tfw.config.logs import logging
@ -11,7 +11,7 @@ from tfw.config.logs import logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class DirectoryMonitoringEventHandler(EventHandlerBase, MonitorManagerMixin): class DirectoryMonitoringEventHandler(FrontendEventHandlerBase, MonitorManagerMixin):
def __init__(self, key, directory): def __init__(self, key, directory):
super().__init__(key) super().__init__(key)
self._directory = directory self._directory = directory

View File

@ -8,7 +8,7 @@ from datetime import datetime
from dateutil import parser as dateparser from dateutil import parser as dateparser
from tfw.event_handler_base import EventHandlerBase from tfw.event_handlers import FrontendEventHandlerBase
from tfw.components.snapshot_provider import SnapshotProvider from tfw.components.snapshot_provider import SnapshotProvider
from tfw.config import TFWENV from tfw.config import TFWENV
from tfw.config.logs import logging from tfw.config.logs import logging
@ -16,7 +16,7 @@ from tfw.config.logs import logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class DirectorySnapshottingEventHandler(EventHandlerBase): class DirectorySnapshottingEventHandler(FrontendEventHandlerBase):
def __init__(self, key, directories, exclude_unix_patterns=None): def __init__(self, key, directories, exclude_unix_patterns=None):
super().__init__(key) super().__init__(key)
self.snapshot_providers = {} self.snapshot_providers = {}

View File

@ -0,0 +1,63 @@
from abc import ABC, abstractmethod
from contextlib import suppress
from tfw.event_handlers import FrontendEventHandlerBase
from .message_sender import MessageSender
class FrontendEventHandler(FrontendEventHandlerBase):
def __init__(self):
frontend_keys = ('message', 'queueMessages', 'dashboard', 'console')
self._frontend_message_storage = FrontendMessageStorage(frontend_keys)
super().__init__((*frontend_keys, 'recover'))
def handle_event(self, message):
self._frontend_message_storage.save_message(message)
if message['key'] == 'recover':
self.recover_frontend()
return message
def recover_frontend(self):
for message in self._frontend_message_storage.messages:
self.send_message(message)
class MessageStorage(ABC):
def __init__(self):
self._messages = []
def save_message(self, message):
with suppress(KeyError, AttributeError):
if self._filter_message(message):
self._messages.extend(self._transform_message(message))
@abstractmethod
def _filter_message(self, message):
raise NotImplementedError
def _transform_message(self, message): # pylint: disable=no-self-use
yield message
def clear(self):
self._messages.clear()
@property
def messages(self):
yield from self._messages
class FrontendMessageStorage(MessageStorage):
def __init__(self, keys):
self._keys = keys
super().__init__()
def _filter_message(self, message):
key = message['key']
return key in self._keys
def _transform_message(self, message):
if message['key'] == 'queueMessages':
yield from MessageSender.generate_messages_from_queue(message)
else:
yield message

View File

@ -1,14 +1,15 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details. # All Rights Reserved. See LICENSE file for details.
from tfw.event_handler_base import EventHandlerBase from tfw.event_handlers import FrontendEventHandlerBase
from tfw.crypto import KeyManager, sign_message, verify_message from tfw.crypto import KeyManager, sign_message, verify_message
from tfw.config.logs import logging from tfw.config.logs import logging
from tfw.networking import Scope
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class FSMManagingEventHandler(EventHandlerBase): class FSMManagingEventHandler(FrontendEventHandlerBase):
""" """
EventHandler responsible for managing the state machine of EventHandler responsible for managing the state machine of
the framework (TFW FSM). the framework (TFW FSM).
@ -42,7 +43,7 @@ class FSMManagingEventHandler(EventHandlerBase):
fsm_update_message = self._fsm_updater.fsm_update fsm_update_message = self._fsm_updater.fsm_update
sign_message(self.auth_key, message) sign_message(self.auth_key, message)
sign_message(self.auth_key, fsm_update_message) sign_message(self.auth_key, fsm_update_message)
self.server_connector.broadcast(fsm_update_message) self.server_connector.send_message(fsm_update_message, Scope.BROADCAST)
return message return message
except KeyError: except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -6,7 +6,7 @@ from glob import glob
from fnmatch import fnmatchcase from fnmatch import fnmatchcase
from typing import Iterable from typing import Iterable
from tfw.event_handler_base import EventHandlerBase from tfw.event_handlers import FrontendEventHandlerBase
from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin
from tfw.components.directory_monitor import DirectoryMonitor from tfw.components.directory_monitor import DirectoryMonitor
from tfw.config.logs import logging from tfw.config.logs import logging
@ -102,7 +102,7 @@ class FileManager: # pylint: disable=too-many-instance-attributes
return relpath(self._filepath(filename), start=self._workdir) return relpath(self._filepath(filename), start=self._workdir)
class IdeEventHandler(EventHandlerBase, MonitorManagerMixin): class IdeEventHandler(FrontendEventHandlerBase, MonitorManagerMixin):
# pylint: disable=too-many-arguments,anomalous-backslash-in-string # pylint: disable=too-many-arguments,anomalous-backslash-in-string
""" """
Event handler implementing the backend of our browser based IDE. Event handler implementing the backend of our browser based IDE.

View File

@ -6,7 +6,8 @@ from os.path import dirname
from watchdog.events import PatternMatchingEventHandler as PatternMatchingWatchdogEventHandler from watchdog.events import PatternMatchingEventHandler as PatternMatchingWatchdogEventHandler
from tfw.networking.event_handlers.server_connector import ServerUplinkConnector from tfw.networking import Scope
from tfw.event_handlers import TFWServerUplinkConnector
from tfw.decorators.rate_limiter import RateLimiter from tfw.decorators.rate_limiter import RateLimiter
from tfw.mixins.observer_mixin import ObserverMixin from tfw.mixins.observer_mixin import ObserverMixin
from tfw.mixins.supervisor_mixin import SupervisorLogMixin from tfw.mixins.supervisor_mixin import SupervisorLogMixin
@ -38,7 +39,7 @@ class SendLogWatchdogEventHandler(PatternMatchingWatchdogEventHandler, Superviso
self.procinfo['stdout_logfile'], self.procinfo['stdout_logfile'],
self.procinfo['stderr_logfile'] self.procinfo['stderr_logfile']
]) ])
self.uplink = ServerUplinkConnector() self.uplink = TFWServerUplinkConnector()
self.log_tail = log_tail self.log_tail = log_tail
@property @property
@ -47,11 +48,11 @@ class SendLogWatchdogEventHandler(PatternMatchingWatchdogEventHandler, Superviso
@RateLimiter(rate_per_second=5) @RateLimiter(rate_per_second=5)
def on_modified(self, event): def on_modified(self, event):
self.uplink.send({ self.uplink.send_message({
'key': 'processlog', 'key': 'processlog',
'data': { 'data': {
'command': 'new_log', 'command': 'new_log',
'stdout': self.read_stdout(self.process_name, tail=self.log_tail), 'stdout': self.read_stdout(self.process_name, tail=self.log_tail),
'stderr': self.read_stderr(self.process_name, tail=self.log_tail) 'stderr': self.read_stderr(self.process_name, tail=self.log_tail)
} }
}) }, Scope.BROADCAST)

View File

@ -1,7 +1,7 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details. # All Rights Reserved. See LICENSE file for details.
from tfw.event_handler_base import EventHandlerBase from tfw.event_handlers import FrontendEventHandlerBase
from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin
from tfw.components.log_monitor import LogMonitor from tfw.components.log_monitor import LogMonitor
from tfw.config.logs import logging from tfw.config.logs import logging
@ -9,7 +9,7 @@ from tfw.config.logs import logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class LogMonitoringEventHandler(EventHandlerBase, MonitorManagerMixin): class LogMonitoringEventHandler(FrontendEventHandlerBase, MonitorManagerMixin):
""" """
Monitors the output of a supervisor process (stdout, stderr) and Monitors the output of a supervisor process (stdout, stderr) and
sends the results to the frontend. sends the results to the frontend.

View File

@ -1,7 +1,7 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details. # All Rights Reserved. See LICENSE file for details.
from tfw.networking.event_handlers.server_connector import ServerUplinkConnector from tfw.event_handlers import TFWServerUplinkConnector
class MessageSender: class MessageSender:
@ -9,7 +9,7 @@ class MessageSender:
Provides mechanisms to send messages to our frontend messaging component. Provides mechanisms to send messages to our frontend messaging component.
""" """
def __init__(self): def __init__(self):
self.server_connector = ServerUplinkConnector() self.server_connector = TFWServerUplinkConnector()
self.key = 'message' self.key = 'message'
self.queue_key = 'queueMessages' self.queue_key = 'queueMessages'
@ -19,14 +19,14 @@ class MessageSender:
:param originator: name of sender to be displayed on the frontend :param originator: name of sender to be displayed on the frontend
:param message: message to send :param message: message to send
""" """
data = { message = {
'key': self.key,
'data': {
'originator': originator, 'originator': originator,
'message': message 'message': message
} }
self.server_connector.send({ }
'key': self.key, self.server_connector.send_message(message)
'data': data
})
def queue_messages(self, originator, messages): def queue_messages(self, originator, messages):
""" """
@ -34,16 +34,16 @@ class MessageSender:
:param originator: name of sender to be displayed on the frontend :param originator: name of sender to be displayed on the frontend
:param messages: list of messages to queue :param messages: list of messages to queue
""" """
data = { message = {
'key': self.queue_key,
'data': {
'messages': [ 'messages': [
{'message': message, 'originator': originator} {'message': message, 'originator': originator}
for message in messages for message in messages
] ]
} }
self.server_connector.send({ }
'key': self.queue_key, self.server_connector.send_message(message)
'data': data
})
@staticmethod @staticmethod
def generate_messages_from_queue(queue_message): def generate_messages_from_queue(queue_message):

View File

@ -9,7 +9,7 @@ from secrets import token_urlsafe
from threading import Thread from threading import Thread
from contextlib import suppress from contextlib import suppress
from tfw.event_handler_base import EventHandlerBase from tfw.event_handlers import EventHandlerBase
from tfw.config.logs import logging from tfw.config.logs import logging
from .pipe_io_server import PipeIOServer, terminate_process_on_failure from .pipe_io_server import PipeIOServer, terminate_process_on_failure
@ -56,7 +56,7 @@ class PipeIOEventHandler(PipeIOEventHandlerBase):
def handle_pipe_event(self, message_bytes): def handle_pipe_event(self, message_bytes):
json = loads(message_bytes) json = loads(message_bytes)
self.server_connector.send(json) self.send_message(json)
class TransformerPipeIOEventHandler(PipeIOEventHandlerBase): class TransformerPipeIOEventHandler(PipeIOEventHandlerBase):
@ -93,7 +93,7 @@ class TransformerPipeIOEventHandler(PipeIOEventHandlerBase):
transformed_bytes = self._transform_in(message_bytes) transformed_bytes = self._transform_in(message_bytes)
if transformed_bytes: if transformed_bytes:
json_message = loads(transformed_bytes) json_message = loads(transformed_bytes)
self.server_connector.send(json_message) self.send_message(json_message)
class CommandEventHandler(PipeIOEventHandler): class CommandEventHandler(PipeIOEventHandler):

View File

@ -3,7 +3,7 @@
from xmlrpc.client import Fault as SupervisorFault from xmlrpc.client import Fault as SupervisorFault
from tfw.event_handler_base import EventHandlerBase from tfw.event_handlers import FrontendEventHandlerBase
from tfw.mixins.supervisor_mixin import SupervisorMixin, SupervisorLogMixin from tfw.mixins.supervisor_mixin import SupervisorMixin, SupervisorLogMixin
from tfw.components.directory_monitor import with_monitor_paused from tfw.components.directory_monitor import with_monitor_paused
from tfw.config.logs import logging from tfw.config.logs import logging
@ -23,7 +23,7 @@ class ProcessManager(SupervisorMixin, SupervisorLogMixin):
return self.commands[command](process_name) return self.commands[command](process_name)
class ProcessManagingEventHandler(EventHandlerBase): class ProcessManagingEventHandler(FrontendEventHandlerBase):
""" """
Event handler that can manage processes managed by supervisor. Event handler that can manage processes managed by supervisor.

View File

@ -1,7 +1,7 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details. # All Rights Reserved. See LICENSE file for details.
from tfw.event_handler_base import EventHandlerBase from tfw.event_handlers import FrontendEventHandlerBase
from tfw.components.terminado_mini_server import TerminadoMiniServer from tfw.components.terminado_mini_server import TerminadoMiniServer
from tfw.config import TFWENV from tfw.config import TFWENV
from tfw.config.logs import logging from tfw.config.logs import logging
@ -10,7 +10,7 @@ from tao.config import TAOENV
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class TerminalEventHandler(EventHandlerBase): class TerminalEventHandler(FrontendEventHandlerBase):
""" """
Event handler responsible for managing terminal sessions for frontend xterm Event handler responsible for managing terminal sessions for frontend xterm
sessions to connect to. You need to instanciate this in order for frontend sessions to connect to. You need to instanciate this in order for frontend
@ -50,7 +50,6 @@ class TerminalEventHandler(EventHandlerBase):
return self._historymonitor return self._historymonitor
def handle_event(self, message): def handle_event(self, message):
LOG.debug('TerminadoEventHandler received event: %s', message)
try: try:
data = message['data'] data = message['data']
message['data'] = self.commands[data['command']](data) message['data'] = self.commands[data['command']](data)

View File

@ -14,7 +14,7 @@ from cryptography.hazmat.primitives.hashes import SHA256
from cryptography.hazmat.primitives.hmac import HMAC as _HMAC from cryptography.hazmat.primitives.hmac import HMAC as _HMAC
from cryptography.exceptions import InvalidSignature from cryptography.exceptions import InvalidSignature
from tfw.networking.serialization import message_bytes from tfw.networking import message_bytes
from tfw.decorators.lazy_property import lazy_property from tfw.decorators.lazy_property import lazy_property
from tfw.config import TFWENV from tfw.config import TFWENV

View File

@ -2,5 +2,7 @@
# All Rights Reserved. See LICENSE file for details. # All Rights Reserved. See LICENSE file for details.
from .event_handler_base import EventHandlerBase from .event_handler_base import EventHandlerBase
from .frontend_event_handler_base import FrontendEventHandlerBase
from .boradcasting_event_handler import BroadcastingEventHandler from .boradcasting_event_handler import BroadcastingEventHandler
from .fsm_aware_event_handler import FSMAwareEventHandler from .fsm_aware_event_handler import FSMAwareEventHandler
from .tfw_server_connector import TFWServerUplinkConnector, TFWServerConnector

View File

@ -3,9 +3,11 @@
from abc import ABC from abc import ABC
from tfw.event_handler_base.event_handler_base import EventHandlerBase from tfw.networking import Scope
from tfw.crypto import message_checksum from tfw.crypto import message_checksum
from .event_handler_base import EventHandlerBase
class BroadcastingEventHandler(EventHandlerBase, ABC): class BroadcastingEventHandler(EventHandlerBase, ABC):
# pylint: disable=abstract-method # pylint: disable=abstract-method
@ -27,4 +29,4 @@ class BroadcastingEventHandler(EventHandlerBase, ABC):
response = self.dispatch_handling(message) response = self.dispatch_handling(message)
if response: if response:
self.own_message_hashes.append(message_checksum(response)) self.own_message_hashes.append(message_checksum(response))
self.server_connector.broadcast(response) self.server_connector.send_message(response, Scope.BROADCAST)

View File

@ -5,9 +5,10 @@ from abc import ABC, abstractmethod
from inspect import currentframe from inspect import currentframe
from typing import Iterable from typing import Iterable
from tfw.networking.event_handlers.server_connector import ServerConnector
from tfw.config.logs import logging from tfw.config.logs import logging
from .tfw_server_connector import TFWServerConnector
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -19,7 +20,7 @@ class EventHandlerBase(ABC):
Derived classes must implement the handle_event() method Derived classes must implement the handle_event() method
""" """
def __init__(self, key): def __init__(self, key):
self.server_connector = ServerConnector() self.server_connector = TFWServerConnector()
self.keys = [] self.keys = []
if isinstance(key, str): if isinstance(key, str):
self.keys.append(key) self.keys.append(key)
@ -48,7 +49,10 @@ class EventHandlerBase(ABC):
response = self.dispatch_handling(message) response = self.dispatch_handling(message)
if response: if response:
self.server_connector.send(response) self.send_message(response)
def send_message(self, message):
self.server_connector.send_message(message)
def check_key(self, message): def check_key(self, message):
""" """
@ -115,7 +119,6 @@ class EventHandlerBase(ABC):
Perform cleanup actions such as releasing database Perform cleanup actions such as releasing database
connections and stuff like that. connections and stuff like that.
""" """
pass
@classmethod @classmethod
def get_local_instances(cls): def get_local_instances(cls):

View File

@ -0,0 +1,8 @@
from tfw.networking import Scope
from .event_handler_base import EventHandlerBase
class FrontendEventHandlerBase(EventHandlerBase): # pylint: disable=abstract-method
def send_message(self, message):
self.server_connector.send_message(message, Scope.WEBSOCKET)

View File

@ -2,7 +2,6 @@
# All Rights Reserved. See LICENSE file for details. # All Rights Reserved. See LICENSE file for details.
from tfw.crypto import KeyManager, verify_message from tfw.crypto import KeyManager, verify_message
from tfw.config.logs import logging from tfw.config.logs import logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -43,4 +42,3 @@ class FSMAware:
:param kwargs: fsm_update 'data' field :param kwargs: fsm_update 'data' field
""" """
pass

View File

@ -3,8 +3,8 @@
from abc import ABC from abc import ABC
from tfw.event_handler_base.event_handler_base import EventHandlerBase from .event_handler_base import EventHandlerBase
from tfw.networking.fsm_aware import FSMAware from .fsm_aware import FSMAware
class FSMAwareEventHandler(EventHandlerBase, FSMAware, ABC): class FSMAwareEventHandler(EventHandlerBase, FSMAware, ABC):

View File

@ -0,0 +1,19 @@
from functools import partial
from tfw.networking import ServerUplinkConnector, ServerConnector
from tfw.config import TFWENV
UPLINK_CONN_ADDR = f'tcp://localhost:{TFWENV.PULL_PORT}'
DOWNLINK_CONN_ADDR = f'tcp://localhost:{TFWENV.PUB_PORT}'
TFWServerUplinkConnector = partial(
ServerUplinkConnector,
connect_addr=UPLINK_CONN_ADDR
)
TFWServerConnector = partial(
ServerConnector,
downlink_connect_addr=DOWNLINK_CONN_ADDR,
uplink_connect_addr=UPLINK_CONN_ADDR
)

View File

@ -1,6 +1,7 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details. # All Rights Reserved. See LICENSE file for details.
from .message_sender import MessageSender from .serialization import serialize_tfw_msg, deserialize_tfw_msg, with_deserialize_tfw_msg, message_bytes
from .event_handlers.server_connector import ServerUplinkConnector as TFWServerConnector from .server_connector import ServerUplinkConnector, ServerDownlinkConnector, ServerConnector
from .server.tfw_server import TFWServer from .event_handler_connector import EventHandlerConnector
from .scope import Scope

View File

@ -0,0 +1,51 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
import zmq
from zmq.eventloop.zmqstream import ZMQStream
from tfw.config.logs import logging
from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg
LOG = logging.getLogger(__name__)
class EventHandlerDownlinkConnector():
def __init__(self, bind_addr):
self._zmq_pull_socket = zmq.Context.instance().socket(zmq.PULL)
self._zmq_pull_socket.setsockopt(zmq.RCVHWM, 0)
self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket)
self._zmq_pull_socket.bind(bind_addr)
LOG.debug('Pull socket bound to %s', bind_addr)
def register_callback(self, callback):
callback = with_deserialize_tfw_msg(callback)
self._zmq_pull_stream.on_recv(callback)
def close(self):
self._zmq_pull_stream.close()
class EventHandlerUplinkConnector():
def __init__(self, bind_addr):
self._zmq_pub_socket = zmq.Context.instance().socket(zmq.PUB)
self._zmq_pub_socket.setsockopt(zmq.SNDHWM, 0)
self._zmq_pub_socket.bind(bind_addr)
LOG.debug('Pub socket bound to %s', bind_addr)
def send_message(self, message: dict):
self._zmq_pub_socket.send_multipart(serialize_tfw_msg(message))
def close(self):
self._zmq_pub_socket.close()
class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector):
def __init__(self, downlink_bind_addr, uplink_bind_addr):
EventHandlerDownlinkConnector.__init__(self, downlink_bind_addr)
EventHandlerUplinkConnector.__init__(self, uplink_bind_addr)
def close(self):
EventHandlerDownlinkConnector.close(self)
EventHandlerUplinkConnector.close(self)

View File

@ -1,2 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.

View File

@ -1,89 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from functools import partial
import zmq
from zmq.eventloop.zmqstream import ZMQStream
from tfw.networking.zmq_connector_base import ZMQConnectorBase
from tfw.networking.serialization import serialize_tfw_msg, with_deserialize_tfw_msg
from tfw.config import TFWENV
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class ServerDownlinkConnector(ZMQConnectorBase):
def __init__(self, zmq_context=None):
super(ServerDownlinkConnector, self).__init__(zmq_context)
self._zmq_sub_socket = self._zmq_context.socket(zmq.SUB)
self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUBLISHER_PORT}')
self._zmq_sub_socket.setsockopt(zmq.RCVHWM,0)
self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket)
self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE)
self.unsubscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.UNSUBSCRIBE)
def register_callback(self, callback):
callback = with_deserialize_tfw_msg(callback)
self._zmq_sub_stream.on_recv(callback)
def close(self):
self._zmq_sub_stream.close()
class ServerUplinkConnector(ZMQConnectorBase):
"""
Class capable of sending messages to the TFW server and event handlers.
"""
def __init__(self, zmq_context=None):
super(ServerUplinkConnector, self).__init__(zmq_context)
self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH)
self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.RECEIVER_PORT}')
self._zmq_push_socket.setsockopt(zmq.SNDHWM,0)
def send_to_eventhandler(self, message):
"""
Send a message to an event handler through the TFW server.
This envelopes the desired message in the 'data' field of the message to
TFWServer, which will mirror it to event handlers.
:param message: JSON message you want to send
"""
self.send({
'key': 'mirror',
'data': message
})
def send(self, message):
"""
Send a message to the frontend through the TFW server.
:param message: JSON message you want to send
"""
self._zmq_push_socket.send_multipart(serialize_tfw_msg(message))
def broadcast(self, message):
"""
Broadast a message through the TFW server.
This envelopes the desired message in the 'data' field of the message to
TFWServer, which will broadast it.
:param message: JSON message you want to send
"""
self.send({
'key': 'broadcast',
'data': message
})
def close(self):
self._zmq_push_socket.close()
class ServerConnector(ServerUplinkConnector, ServerDownlinkConnector):
def close(self):
ServerUplinkConnector.close(self)
ServerDownlinkConnector.close(self)

View File

@ -0,0 +1,7 @@
from enum import Enum
class Scope(Enum):
ZMQ = 'zmq'
WEBSOCKET = 'websocket'
BROADCAST = 'broadcast'

View File

@ -1,2 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.

View File

@ -1,40 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
import zmq
from zmq.eventloop.zmqstream import ZMQStream
from tfw.networking.zmq_connector_base import ZMQConnectorBase
from tfw.networking.serialization import serialize_tfw_msg, with_deserialize_tfw_msg
from tfw.config import TFWENV
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class EventHandlerDownlinkConnector(ZMQConnectorBase):
def __init__(self, zmq_context=None):
super(EventHandlerDownlinkConnector, self).__init__(zmq_context)
self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL)
self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket)
address = f'tcp://*:{TFWENV.RECEIVER_PORT}'
self._zmq_pull_socket.bind(address)
LOG.debug('Pull socket bound to %s', address)
class EventHandlerUplinkConnector(ZMQConnectorBase):
def __init__(self, zmq_context=None):
super(EventHandlerUplinkConnector, self).__init__(zmq_context)
self._zmq_pub_socket = self._zmq_context.socket(zmq.PUB)
address = f'tcp://*:{TFWENV.PUBLISHER_PORT}'
self._zmq_pub_socket.bind(address)
LOG.debug('Pub socket bound to %s', address)
class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector):
def register_callback(self, callback):
callback = with_deserialize_tfw_msg(callback)
self._zmq_pull_stream.on_recv(callback)
def send_message(self, message: dict):
self._zmq_pub_socket.send_multipart(serialize_tfw_msg(message))

View File

@ -1,114 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from abc import ABC, abstractmethod
from contextlib import suppress
from tornado.web import Application
from tfw.networking.server.zmq_websocket_proxy import ZMQWebSocketProxy
from tfw.networking.event_handlers.server_connector import ServerUplinkConnector
from tfw.networking.server.event_handler_connector import EventHandlerConnector
from tfw.networking.message_sender import MessageSender
from tfw.networking.fsm_aware import FSMAware
from tfw.crypto import KeyManager, verify_message, sign_message
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class TFWServer(FSMAware):
"""
This class handles the proxying of messages between the frontend and event handers.
It proxies messages from the "/ws" route to all event handlers subscribed to a ZMQ
SUB socket.
"""
def __init__(self):
super().__init__()
self._event_handler_connector = EventHandlerConnector()
self._uplink_connector = ServerUplinkConnector()
self._auth_key = KeyManager().auth_key
self.application = Application([(
r'/ws', ZMQWebSocketProxy, {
'event_handler_connector': self._event_handler_connector,
'proxy_filters_and_callbacks': {
'message_handlers': [
self.handle_trigger,
self.handle_recover,
self.handle_fsm_update
],
'frontend_message_handlers': [self.save_frontend_messages]
}
}
)])
self._frontend_messages = FrontendMessageStorage()
def handle_trigger(self, message):
if 'trigger' in message:
LOG.debug('Executing handler for trigger "%s"', message.get('trigger', ''))
fsm_eh_command = {
'key': 'fsm',
'data': {
'command': 'trigger',
'value': message['trigger']
}
}
if verify_message(self._auth_key, message):
sign_message(self._auth_key, fsm_eh_command)
self._uplink_connector.send_to_eventhandler(fsm_eh_command)
def handle_recover(self, message):
if message['key'] == 'recover':
self._frontend_messages.replay_messages(self._uplink_connector)
self._frontend_messages.clear()
def handle_fsm_update(self, message):
self.update_fsm_data(message)
def save_frontend_messages(self, message):
self._frontend_messages.save_message(message)
def listen(self, port):
self.application.listen(port)
class MessageStorage(ABC):
def __init__(self):
self.saved_messages = []
def save_message(self, message):
with suppress(KeyError, AttributeError):
if self.filter_message(message):
self.saved_messages.extend(self.transform_message(message))
@abstractmethod
def filter_message(self, message):
raise NotImplementedError
def transform_message(self, message): # pylint: disable=no-self-use
yield message
def clear(self):
self.saved_messages.clear()
class FrontendMessageStorage(MessageStorage):
def filter_message(self, message):
key = message['key']
command = message.get('data', {}).get('command')
return (
key in ('message', 'dashboard', 'queueMessages')
or key == 'ide' and command in ('select', 'read')
)
def transform_message(self, message):
if message['key'] == 'queueMessages':
yield from MessageSender.generate_messages_from_queue(message)
else:
yield message
def replay_messages(self, connector):
for message in self.saved_messages:
connector.send(message)

View File

@ -1,155 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
import json
from tornado.websocket import WebSocketHandler
from tfw.mixins.callback_mixin import CallbackMixin
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class ZMQWebSocketProxy(WebSocketHandler):
# pylint: disable=abstract-method
instances = set()
sequence_number = 0
def initialize(self, **kwargs): # pylint: disable=arguments-differ
self._event_handler_connector = kwargs['event_handler_connector']
self._proxy_filters_and_callbacks = kwargs.get('proxy_filters_and_callbacks', {})
self.proxy_eventhandler_to_websocket = TFWProxy(
self.send_eventhandler_message,
self.send_websocket_message
)
self.proxy_websocket_to_eventhandler = TFWProxy(
self.send_websocket_message,
self.send_eventhandler_message
)
self.subscribe_proxy_callbacks()
def subscribe_proxy_callbacks(self):
eventhandler_message_handlers = self._proxy_filters_and_callbacks.get('eventhandler_message_handlers', [])
frontend_message_handlers = self._proxy_filters_and_callbacks.get('frontend_message_handlers', [])
message_handlers = self._proxy_filters_and_callbacks.get('message_handlers', [])
proxy_filters = self._proxy_filters_and_callbacks.get('proxy_filters', [])
self.proxy_websocket_to_eventhandler.subscribe_proxy_callbacks_and_filters(
eventhandler_message_handlers + message_handlers,
proxy_filters
)
self.proxy_eventhandler_to_websocket.subscribe_proxy_callbacks_and_filters(
frontend_message_handlers + message_handlers,
proxy_filters
)
def prepare(self):
ZMQWebSocketProxy.instances.add(self)
def on_close(self):
ZMQWebSocketProxy.instances.remove(self)
def open(self, *args, **kwargs):
LOG.debug('WebSocket connection initiated')
self._event_handler_connector.register_callback(self.eventhander_callback)
def eventhander_callback(self, message):
"""
Invoked on ZMQ messages from event handlers.
"""
self.sequence_message(message)
LOG.debug('Received on pull socket: %s', message)
self.proxy_eventhandler_to_websocket(message)
@classmethod
def sequence_message(cls, message):
cls.sequence_number += 1
message['seq'] = cls.sequence_number
def on_message(self, message):
"""
Invoked on WS messages from frontend.
"""
message = json.loads(message)
self.sequence_message(message)
LOG.debug('Received on WebSocket: %s', message)
self.proxy_websocket_to_eventhandler(message)
def send_eventhandler_message(self, message):
self._event_handler_connector.send_message(message)
@staticmethod
def send_websocket_message(message):
for instance in ZMQWebSocketProxy.instances:
instance.write_message(message)
# much secure, very cors, wow
def check_origin(self, origin):
return True
class TFWProxy:
# pylint: disable=protected-access
def __init__(self, to_source, to_destination):
self.to_source = to_source
self.to_destination = to_destination
self.proxy_filters = CallbackMixin()
self.proxy_callbacks = CallbackMixin()
self.proxy_filters.subscribe_callback(self.validate_message)
self.keyhandlers = {
'mirror': self.mirror,
'broadcast': self.broadcast
}
@staticmethod
def validate_message(message):
if 'key' not in message:
raise ValueError('Invalid TFW message format!')
def __call__(self, message):
if not self.filter_and_execute_callbacks(message):
return
if message['key'] not in self.keyhandlers:
self.to_destination(message)
else:
handler = self.keyhandlers[message['key']]
try:
handler(message)
except KeyError:
LOG.error('Invalid "%s" message format! Ignoring.', handler.__name__)
def filter_and_execute_callbacks(self, message):
try:
self.proxy_filters._execute_callbacks(message)
self.proxy_callbacks._execute_callbacks(message)
return True
except ValueError:
LOG.exception('Invalid TFW message received!')
return False
def mirror(self, message):
message = message['data']
if not self.filter_and_execute_callbacks(message):
return
LOG.debug('Mirroring message: %s', message)
self.to_source(message)
def broadcast(self, message):
message = message['data']
if not self.filter_and_execute_callbacks(message):
return
LOG.debug('Broadcasting message: %s', message)
self.to_source(message)
self.to_destination(message)
def subscribe_proxy_callbacks_and_filters(self, proxy_callbacks, proxy_filters):
self.proxy_callbacks.subscribe_callbacks(*proxy_callbacks)
self.proxy_filters.subscribe_callbacks(*proxy_filters)

View File

@ -0,0 +1,56 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from functools import partial
import zmq
from zmq.eventloop.zmqstream import ZMQStream
from tfw.config.logs import logging
from .scope import Scope
from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg
LOG = logging.getLogger(__name__)
class ServerDownlinkConnector():
def __init__(self, connect_addr):
self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB)
self._zmq_sub_socket.connect(connect_addr)
self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0)
self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket)
self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE)
self.unsubscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.UNSUBSCRIBE)
def register_callback(self, callback):
callback = with_deserialize_tfw_msg(callback)
self._zmq_sub_stream.on_recv(callback)
def close(self):
self._zmq_sub_stream.close()
class ServerUplinkConnector():
def __init__(self, connect_addr):
self._zmq_push_socket = zmq.Context.instance().socket(zmq.PUSH)
self._zmq_push_socket.connect(connect_addr)
self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0)
def send_message(self, message, scope=Scope.ZMQ):
message['scope'] = scope.value
self._zmq_push_socket.send_multipart(serialize_tfw_msg(message))
def close(self):
self._zmq_push_socket.close()
class ServerConnector(ServerDownlinkConnector, ServerUplinkConnector):
def __init__(self, downlink_connect_addr, uplink_connect_addr):
ServerDownlinkConnector.__init__(self, downlink_connect_addr)
ServerUplinkConnector.__init__(self, uplink_connect_addr)
def close(self):
ServerDownlinkConnector.close(self)
ServerUplinkConnector.close(self)

View File

@ -1,9 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
import zmq
class ZMQConnectorBase:
def __init__(self, zmq_context=None):
self._zmq_context = zmq_context or zmq.Context.instance()

View File

@ -0,0 +1 @@
from .tfw_server import TFWServer

View File

@ -0,0 +1,33 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from tornado.web import Application
from tfw.networking import EventHandlerConnector
from tfw.config import TFWENV
from tfw.config.logs import logging
from .zmq_websocket_router import ZMQWebSocketRouter
LOG = logging.getLogger(__name__)
class TFWServer:
"""
This class handles the proxying of messages between the frontend and event handers.
It proxies messages from the "/ws" route to all event handlers subscribed to a ZMQ
SUB socket.
"""
def __init__(self):
self._event_handler_connector = EventHandlerConnector(
downlink_bind_addr=f'tcp://*:{TFWENV.PULL_PORT}',
uplink_bind_addr=f'tcp://*:{TFWENV.PUB_PORT}'
)
self.application = Application([(
r'/ws', ZMQWebSocketRouter, {
'event_handler_connector': self._event_handler_connector,
}
)])
def listen(self):
self.application.listen(TFWENV.WEB_PORT)

View File

@ -0,0 +1,72 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
import json
from tornado.websocket import WebSocketHandler
from tfw.networking import Scope
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class ZMQWebSocketRouter(WebSocketHandler):
# pylint: disable=abstract-method
instances = set()
def initialize(self, **kwargs): # pylint: disable=arguments-differ
self.event_handler_connector = kwargs['event_handler_connector']
self.tfw_router = TFWRouter(self.send_to_zmq, self.send_to_websockets)
def send_to_zmq(self, message):
self.event_handler_connector.send_message(message)
@classmethod
def send_to_websockets(cls, message):
for instance in cls.instances:
instance.write_message(message)
def prepare(self):
type(self).instances.add(self)
def on_close(self):
type(self).instances.remove(self)
def open(self, *args, **kwargs):
LOG.debug('WebSocket connection initiated!')
self.event_handler_connector.register_callback(self.zmq_callback)
def zmq_callback(self, message):
LOG.debug('Received on ZMQ pull socket: %s', message)
self.tfw_router.route(message)
def on_message(self, message):
message = json.loads(message)
LOG.debug('Received on WebSocket: %s', message)
self.tfw_router.route(message)
# much secure, very cors, wow
def check_origin(self, origin):
return True
class TFWRouter:
def __init__(self, send_to_zmq, send_to_websockets):
self.send_to_zmq = send_to_zmq
self.send_to_websockets = send_to_websockets
def route(self, message):
scope = Scope(message.get('scope', 'zmq'))
routing_table = {
Scope.ZMQ: self.send_to_zmq,
Scope.WEBSOCKET: self.send_to_websockets,
Scope.BROADCAST: self.broadcast
}
action = routing_table[scope]
action(message)
def broadcast(self, message):
self.send_to_zmq(message)
self.send_to_websockets(message)

View File

@ -1,9 +1,8 @@
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
from tfw.networking import TFWServer from tfw.server import TFWServer
from tfw.config import TFWENV
if __name__ == '__main__': if __name__ == '__main__':
TFWServer().listen(TFWENV.WEB_PORT) TFWServer().listen()
IOLoop.instance().start() IOLoop.instance().start()