From 26c6c5d1e657c476775a5886baeec1d5ca5efebf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Tue, 21 May 2019 13:44:02 +0200 Subject: [PATCH 01/10] Refactor EventHandlerConnector family of classes --- .../networking/server/event_handler_connector.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/lib/tfw/networking/server/event_handler_connector.py b/lib/tfw/networking/server/event_handler_connector.py index 5075c56..bc6c9be 100644 --- a/lib/tfw/networking/server/event_handler_connector.py +++ b/lib/tfw/networking/server/event_handler_connector.py @@ -21,6 +21,10 @@ class EventHandlerDownlinkConnector(ZMQConnectorBase): self._zmq_pull_socket.bind(address) LOG.debug('Pull socket bound to %s', address) + def register_callback(self, callback): + callback = with_deserialize_tfw_msg(callback) + self._zmq_pull_stream.on_recv(callback) + class EventHandlerUplinkConnector(ZMQConnectorBase): def __init__(self, zmq_context=None): @@ -30,11 +34,9 @@ class EventHandlerUplinkConnector(ZMQConnectorBase): self._zmq_pub_socket.bind(address) LOG.debug('Pub socket bound to %s', address) - -class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector): - def register_callback(self, callback): - callback = with_deserialize_tfw_msg(callback) - self._zmq_pull_stream.on_recv(callback) - def send_message(self, message: dict): self._zmq_pub_socket.send_multipart(serialize_tfw_msg(message)) + + +class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector): + pass From 6b23b863ed47b7bf52b89fdbaba0aa7b8c5aa0dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Tue, 21 May 2019 13:55:28 +0200 Subject: [PATCH 02/10] Fix code formatting --- lib/tfw/networking/event_handlers/server_connector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/tfw/networking/event_handlers/server_connector.py b/lib/tfw/networking/event_handlers/server_connector.py index 4c2a2b9..ab9973b 100644 --- a/lib/tfw/networking/event_handlers/server_connector.py +++ b/lib/tfw/networking/event_handlers/server_connector.py @@ -19,7 +19,7 @@ class ServerDownlinkConnector(ZMQConnectorBase): super(ServerDownlinkConnector, self).__init__(zmq_context) self._zmq_sub_socket = self._zmq_context.socket(zmq.SUB) self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUBLISHER_PORT}') - self._zmq_sub_socket.setsockopt(zmq.RCVHWM,0) + self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE) @@ -41,7 +41,7 @@ class ServerUplinkConnector(ZMQConnectorBase): super(ServerUplinkConnector, self).__init__(zmq_context) self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH) self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.RECEIVER_PORT}') - self._zmq_push_socket.setsockopt(zmq.SNDHWM,0) + self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0) def send_to_eventhandler(self, message): """ From 6431fac9b15cb5ac3619d83d0d165bbf509c2a34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Tue, 21 May 2019 13:57:56 +0200 Subject: [PATCH 03/10] Set ZMQ HWM in EventHandlerConnector to infinite --- lib/tfw/networking/server/event_handler_connector.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/tfw/networking/server/event_handler_connector.py b/lib/tfw/networking/server/event_handler_connector.py index bc6c9be..8b13338 100644 --- a/lib/tfw/networking/server/event_handler_connector.py +++ b/lib/tfw/networking/server/event_handler_connector.py @@ -16,6 +16,7 @@ class EventHandlerDownlinkConnector(ZMQConnectorBase): def __init__(self, zmq_context=None): super(EventHandlerDownlinkConnector, self).__init__(zmq_context) self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL) + self._zmq_pull_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket) address = f'tcp://*:{TFWENV.RECEIVER_PORT}' self._zmq_pull_socket.bind(address) @@ -30,6 +31,7 @@ class EventHandlerUplinkConnector(ZMQConnectorBase): def __init__(self, zmq_context=None): super(EventHandlerUplinkConnector, self).__init__(zmq_context) self._zmq_pub_socket = self._zmq_context.socket(zmq.PUB) + self._zmq_pub_socket.setsockopt(zmq.SNDHWM, 0) address = f'tcp://*:{TFWENV.PUBLISHER_PORT}' self._zmq_pub_socket.bind(address) LOG.debug('Pub socket bound to %s', address) From 613919a5b60d4d2876c20c27e69647fbe197db22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Tue, 21 May 2019 13:59:42 +0200 Subject: [PATCH 04/10] Implement closing EventHandlerConnector --- lib/tfw/networking/server/event_handler_connector.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/tfw/networking/server/event_handler_connector.py b/lib/tfw/networking/server/event_handler_connector.py index 8b13338..7b56a88 100644 --- a/lib/tfw/networking/server/event_handler_connector.py +++ b/lib/tfw/networking/server/event_handler_connector.py @@ -26,6 +26,9 @@ class EventHandlerDownlinkConnector(ZMQConnectorBase): callback = with_deserialize_tfw_msg(callback) self._zmq_pull_stream.on_recv(callback) + def close(self): + self._zmq_pull_stream.close() + class EventHandlerUplinkConnector(ZMQConnectorBase): def __init__(self, zmq_context=None): @@ -39,6 +42,11 @@ class EventHandlerUplinkConnector(ZMQConnectorBase): def send_message(self, message: dict): self._zmq_pub_socket.send_multipart(serialize_tfw_msg(message)) + def close(self): + self._zmq_pub_socket.close() + class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector): - pass + def close(self): + EventHandlerDownlinkConnector.close(self) + EventHandlerUplinkConnector.close(self) From 01d90035018a2132e4896ff6438e3f71e4b78003 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Sun, 26 May 2019 18:26:33 +0200 Subject: [PATCH 05/10] Rework whole TFW networking model --- lib/tfw/components/__init__.py | 1 + lib/tfw/components/directory_monitor.py | 6 +- .../directory_monitoring_event_handler.py | 4 +- .../directory_snapshotting_event_handler.py | 4 +- lib/tfw/components/frontend_event_handler.py | 62 ++++++++ .../components/fsm_managing_event_handler.py | 7 +- lib/tfw/components/ide_event_handler.py | 4 +- lib/tfw/components/log_monitor.py | 6 +- .../log_monitoring_event_handler.py | 4 +- lib/tfw/components/pipe_io_event_handler.py | 4 +- .../process_managing_event_handler.py | 4 +- lib/tfw/components/terminal_event_handler.py | 4 +- lib/tfw/event_handler_base/__init__.py | 2 +- .../boradcasting_event_handler.py | 3 +- .../event_handler_base/event_handler_base.py | 12 +- .../event_handlers/server_connector.py | 46 ++---- lib/tfw/networking/message_sender.py | 32 ++-- lib/tfw/networking/server/tfw_server.py | 88 +---------- .../networking/server/zmq_websocket_proxy.py | 141 ++++-------------- 19 files changed, 155 insertions(+), 279 deletions(-) create mode 100644 lib/tfw/components/frontend_event_handler.py diff --git a/lib/tfw/components/__init__.py b/lib/tfw/components/__init__.py index 7c3f76d..7d2ebc0 100644 --- a/lib/tfw/components/__init__.py +++ b/lib/tfw/components/__init__.py @@ -14,3 +14,4 @@ from .pipe_io_event_handler import PipeIOEventHandlerBase, PipeIOEventHandler, P from .pipe_io_event_handler import TransformerPipeIOEventHandler, CommandEventHandler from .directory_snapshotting_event_handler import DirectorySnapshottingEventHandler from .commands_equal import CommandsEqual +from .frontend_event_handler import FrontendEventHandler diff --git a/lib/tfw/components/directory_monitor.py b/lib/tfw/components/directory_monitor.py index f65c8ff..6aa2ca0 100644 --- a/lib/tfw/components/directory_monitor.py +++ b/lib/tfw/components/directory_monitor.py @@ -5,7 +5,7 @@ from functools import wraps from watchdog.events import FileSystemEventHandler as FileSystemWatchdogEventHandler -from tfw.networking.event_handlers.server_connector import ServerUplinkConnector +from tfw.networking.event_handlers.server_connector import ServerUplinkConnector, Scope from tfw.decorators.rate_limiter import RateLimiter from tfw.mixins.observer_mixin import ObserverMixin @@ -65,10 +65,10 @@ class IdeReloadWatchdogEventHandler(FileSystemWatchdogEventHandler): self.ignore = self.ignore - 1 return LOG.debug(event) - self.uplink.send({ + self.uplink.send_message({ 'key': self.ide_key, 'data': {'command': 'reload'} - }) + }, Scope.WEBSOCKET) def with_monitor_paused(fun): diff --git a/lib/tfw/components/directory_monitoring_event_handler.py b/lib/tfw/components/directory_monitoring_event_handler.py index 8d97022..03c4a3a 100644 --- a/lib/tfw/components/directory_monitoring_event_handler.py +++ b/lib/tfw/components/directory_monitoring_event_handler.py @@ -3,7 +3,7 @@ from os.path import isdir, exists -from tfw.event_handler_base import EventHandlerBase +from tfw.event_handler_base import FrontendEventHandlerBase from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin from tfw.components.directory_monitor import DirectoryMonitor from tfw.config.logs import logging @@ -11,7 +11,7 @@ from tfw.config.logs import logging LOG = logging.getLogger(__name__) -class DirectoryMonitoringEventHandler(EventHandlerBase, MonitorManagerMixin): +class DirectoryMonitoringEventHandler(FrontendEventHandlerBase, MonitorManagerMixin): def __init__(self, key, directory): super().__init__(key) self._directory = directory diff --git a/lib/tfw/components/directory_snapshotting_event_handler.py b/lib/tfw/components/directory_snapshotting_event_handler.py index 8b6f384..6a285a8 100644 --- a/lib/tfw/components/directory_snapshotting_event_handler.py +++ b/lib/tfw/components/directory_snapshotting_event_handler.py @@ -8,7 +8,7 @@ from datetime import datetime from dateutil import parser as dateparser -from tfw.event_handler_base import EventHandlerBase +from tfw.event_handler_base import FrontendEventHandlerBase from tfw.components.snapshot_provider import SnapshotProvider from tfw.config import TFWENV from tfw.config.logs import logging @@ -16,7 +16,7 @@ from tfw.config.logs import logging LOG = logging.getLogger(__name__) -class DirectorySnapshottingEventHandler(EventHandlerBase): +class DirectorySnapshottingEventHandler(FrontendEventHandlerBase): def __init__(self, key, directories, exclude_unix_patterns=None): super().__init__(key) self.snapshot_providers = {} diff --git a/lib/tfw/components/frontend_event_handler.py b/lib/tfw/components/frontend_event_handler.py new file mode 100644 index 0000000..898bb7e --- /dev/null +++ b/lib/tfw/components/frontend_event_handler.py @@ -0,0 +1,62 @@ +from abc import ABC, abstractmethod +from contextlib import suppress + +from tfw.networking.message_sender import MessageSender +from tfw.event_handler_base import FrontendEventHandlerBase + + +class FrontendEventHandler(FrontendEventHandlerBase): + def __init__(self): + frontend_keys = ('message', 'queueMessages', 'dashboard', 'console') + self._frontend_message_storage = FrontendMessageStorage(frontend_keys) + super().__init__((*frontend_keys, 'recover')) + + def handle_event(self, message): + self._frontend_message_storage.save_message(message) + if message['key'] == 'recover': + self.recover_frontend() + return message + + def recover_frontend(self): + for message in self._frontend_message_storage.messages: + self.send_message(message) + + +class MessageStorage(ABC): + def __init__(self): + self._messages = [] + + def save_message(self, message): + with suppress(KeyError, AttributeError): + if self._filter_message(message): + self._messages.extend(self._transform_message(message)) + + @abstractmethod + def _filter_message(self, message): + raise NotImplementedError + + def _transform_message(self, message): # pylint: disable=no-self-use + yield message + + def clear(self): + self._messages.clear() + + @property + def messages(self): + yield from self._messages + + +class FrontendMessageStorage(MessageStorage): + def __init__(self, keys): + self._keys = keys + super().__init__() + + def _filter_message(self, message): + key = message['key'] + return key in self._keys + + def _transform_message(self, message): + if message['key'] == 'queueMessages': + yield from MessageSender.generate_messages_from_queue(message) + else: + yield message diff --git a/lib/tfw/components/fsm_managing_event_handler.py b/lib/tfw/components/fsm_managing_event_handler.py index 73ebe10..3069fc1 100644 --- a/lib/tfw/components/fsm_managing_event_handler.py +++ b/lib/tfw/components/fsm_managing_event_handler.py @@ -1,14 +1,15 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from tfw.event_handler_base import EventHandlerBase +from tfw.event_handler_base import FrontendEventHandlerBase from tfw.crypto import KeyManager, sign_message, verify_message from tfw.config.logs import logging +from tfw.networking.event_handlers.server_connector import Scope LOG = logging.getLogger(__name__) -class FSMManagingEventHandler(EventHandlerBase): +class FSMManagingEventHandler(FrontendEventHandlerBase): """ EventHandler responsible for managing the state machine of the framework (TFW FSM). @@ -42,7 +43,7 @@ class FSMManagingEventHandler(EventHandlerBase): fsm_update_message = self._fsm_updater.fsm_update sign_message(self.auth_key, message) sign_message(self.auth_key, fsm_update_message) - self.server_connector.broadcast(fsm_update_message) + self.server_connector.send_message(fsm_update_message, Scope.BROADCAST) return message except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) diff --git a/lib/tfw/components/ide_event_handler.py b/lib/tfw/components/ide_event_handler.py index 7e242e5..534de6a 100644 --- a/lib/tfw/components/ide_event_handler.py +++ b/lib/tfw/components/ide_event_handler.py @@ -6,7 +6,7 @@ from glob import glob from fnmatch import fnmatchcase from typing import Iterable -from tfw.event_handler_base import EventHandlerBase +from tfw.event_handler_base import FrontendEventHandlerBase from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin from tfw.components.directory_monitor import DirectoryMonitor from tfw.config.logs import logging @@ -102,7 +102,7 @@ class FileManager: # pylint: disable=too-many-instance-attributes return relpath(self._filepath(filename), start=self._workdir) -class IdeEventHandler(EventHandlerBase, MonitorManagerMixin): +class IdeEventHandler(FrontendEventHandlerBase, MonitorManagerMixin): # pylint: disable=too-many-arguments,anomalous-backslash-in-string """ Event handler implementing the backend of our browser based IDE. diff --git a/lib/tfw/components/log_monitor.py b/lib/tfw/components/log_monitor.py index cf06a0a..2ee6d96 100644 --- a/lib/tfw/components/log_monitor.py +++ b/lib/tfw/components/log_monitor.py @@ -6,7 +6,7 @@ from os.path import dirname from watchdog.events import PatternMatchingEventHandler as PatternMatchingWatchdogEventHandler -from tfw.networking.event_handlers.server_connector import ServerUplinkConnector +from tfw.networking.event_handlers.server_connector import ServerUplinkConnector, Scope from tfw.decorators.rate_limiter import RateLimiter from tfw.mixins.observer_mixin import ObserverMixin from tfw.mixins.supervisor_mixin import SupervisorLogMixin @@ -47,11 +47,11 @@ class SendLogWatchdogEventHandler(PatternMatchingWatchdogEventHandler, Superviso @RateLimiter(rate_per_second=5) def on_modified(self, event): - self.uplink.send({ + self.uplink.send_message({ 'key': 'processlog', 'data': { 'command': 'new_log', 'stdout': self.read_stdout(self.process_name, tail=self.log_tail), 'stderr': self.read_stderr(self.process_name, tail=self.log_tail) } - }) + }, Scope.BROADCAST) diff --git a/lib/tfw/components/log_monitoring_event_handler.py b/lib/tfw/components/log_monitoring_event_handler.py index 0bc7ab2..d384323 100644 --- a/lib/tfw/components/log_monitoring_event_handler.py +++ b/lib/tfw/components/log_monitoring_event_handler.py @@ -1,7 +1,7 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from tfw.event_handler_base import EventHandlerBase +from tfw.event_handler_base import FrontendEventHandlerBase from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin from tfw.components.log_monitor import LogMonitor from tfw.config.logs import logging @@ -9,7 +9,7 @@ from tfw.config.logs import logging LOG = logging.getLogger(__name__) -class LogMonitoringEventHandler(EventHandlerBase, MonitorManagerMixin): +class LogMonitoringEventHandler(FrontendEventHandlerBase, MonitorManagerMixin): """ Monitors the output of a supervisor process (stdout, stderr) and sends the results to the frontend. diff --git a/lib/tfw/components/pipe_io_event_handler.py b/lib/tfw/components/pipe_io_event_handler.py index 0a94e57..37b5b89 100644 --- a/lib/tfw/components/pipe_io_event_handler.py +++ b/lib/tfw/components/pipe_io_event_handler.py @@ -56,7 +56,7 @@ class PipeIOEventHandler(PipeIOEventHandlerBase): def handle_pipe_event(self, message_bytes): json = loads(message_bytes) - self.server_connector.send(json) + self.send_message(json) class TransformerPipeIOEventHandler(PipeIOEventHandlerBase): @@ -93,7 +93,7 @@ class TransformerPipeIOEventHandler(PipeIOEventHandlerBase): transformed_bytes = self._transform_in(message_bytes) if transformed_bytes: json_message = loads(transformed_bytes) - self.server_connector.send(json_message) + self.send_message(json_message) class CommandEventHandler(PipeIOEventHandler): diff --git a/lib/tfw/components/process_managing_event_handler.py b/lib/tfw/components/process_managing_event_handler.py index 61b6c62..a87bdd6 100644 --- a/lib/tfw/components/process_managing_event_handler.py +++ b/lib/tfw/components/process_managing_event_handler.py @@ -3,7 +3,7 @@ from xmlrpc.client import Fault as SupervisorFault -from tfw.event_handler_base import EventHandlerBase +from tfw.event_handler_base import FrontendEventHandlerBase from tfw.mixins.supervisor_mixin import SupervisorMixin, SupervisorLogMixin from tfw.components.directory_monitor import with_monitor_paused from tfw.config.logs import logging @@ -23,7 +23,7 @@ class ProcessManager(SupervisorMixin, SupervisorLogMixin): return self.commands[command](process_name) -class ProcessManagingEventHandler(EventHandlerBase): +class ProcessManagingEventHandler(FrontendEventHandlerBase): """ Event handler that can manage processes managed by supervisor. diff --git a/lib/tfw/components/terminal_event_handler.py b/lib/tfw/components/terminal_event_handler.py index e49415c..f45bc8a 100644 --- a/lib/tfw/components/terminal_event_handler.py +++ b/lib/tfw/components/terminal_event_handler.py @@ -1,7 +1,7 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from tfw.event_handler_base import EventHandlerBase +from tfw.event_handler_base import FrontendEventHandlerBase from tfw.components.terminado_mini_server import TerminadoMiniServer from tfw.config import TFWENV from tfw.config.logs import logging @@ -10,7 +10,7 @@ from tao.config import TAOENV LOG = logging.getLogger(__name__) -class TerminalEventHandler(EventHandlerBase): +class TerminalEventHandler(FrontendEventHandlerBase): """ Event handler responsible for managing terminal sessions for frontend xterm sessions to connect to. You need to instanciate this in order for frontend diff --git a/lib/tfw/event_handler_base/__init__.py b/lib/tfw/event_handler_base/__init__.py index fd50525..65ca6e1 100644 --- a/lib/tfw/event_handler_base/__init__.py +++ b/lib/tfw/event_handler_base/__init__.py @@ -1,6 +1,6 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from .event_handler_base import EventHandlerBase +from .event_handler_base import EventHandlerBase, FrontendEventHandlerBase from .boradcasting_event_handler import BroadcastingEventHandler from .fsm_aware_event_handler import FSMAwareEventHandler diff --git a/lib/tfw/event_handler_base/boradcasting_event_handler.py b/lib/tfw/event_handler_base/boradcasting_event_handler.py index 59ee493..cfda694 100644 --- a/lib/tfw/event_handler_base/boradcasting_event_handler.py +++ b/lib/tfw/event_handler_base/boradcasting_event_handler.py @@ -4,6 +4,7 @@ from abc import ABC from tfw.event_handler_base.event_handler_base import EventHandlerBase +from tfw.networking.event_handlers.server_connector import Scope from tfw.crypto import message_checksum @@ -27,4 +28,4 @@ class BroadcastingEventHandler(EventHandlerBase, ABC): response = self.dispatch_handling(message) if response: self.own_message_hashes.append(message_checksum(response)) - self.server_connector.broadcast(response) + self.server_connector.send_message(response, Scope.BROADCAST) diff --git a/lib/tfw/event_handler_base/event_handler_base.py b/lib/tfw/event_handler_base/event_handler_base.py index 18541ee..f51c668 100644 --- a/lib/tfw/event_handler_base/event_handler_base.py +++ b/lib/tfw/event_handler_base/event_handler_base.py @@ -5,7 +5,7 @@ from abc import ABC, abstractmethod from inspect import currentframe from typing import Iterable -from tfw.networking.event_handlers.server_connector import ServerConnector +from tfw.networking.event_handlers.server_connector import ServerConnector, Scope from tfw.config.logs import logging LOG = logging.getLogger(__name__) @@ -48,7 +48,10 @@ class EventHandlerBase(ABC): response = self.dispatch_handling(message) if response: - self.server_connector.send(response) + self.send_message(response) + + def send_message(self, message): + self.server_connector.send_message(message) def check_key(self, message): """ @@ -128,3 +131,8 @@ class EventHandlerBase(ABC): instance for instance in locals_values if isinstance(instance, cls) } + + +class FrontendEventHandlerBase(EventHandlerBase): # pylint: disable=abstract-method + def send_message(self, message): + self.server_connector.send_message(message, Scope.WEBSOCKET) diff --git a/lib/tfw/networking/event_handlers/server_connector.py b/lib/tfw/networking/event_handlers/server_connector.py index ab9973b..73b04e0 100644 --- a/lib/tfw/networking/event_handlers/server_connector.py +++ b/lib/tfw/networking/event_handlers/server_connector.py @@ -2,6 +2,7 @@ # All Rights Reserved. See LICENSE file for details. from functools import partial +from enum import Enum import zmq from zmq.eventloop.zmqstream import ZMQStream @@ -33,52 +34,23 @@ class ServerDownlinkConnector(ZMQConnectorBase): self._zmq_sub_stream.close() +class Scope(Enum): + ZMQ = 'zmq' + WEBSOCKET = 'websocket' + BROADCAST = 'broadcast' + + class ServerUplinkConnector(ZMQConnectorBase): - """ - Class capable of sending messages to the TFW server and event handlers. - """ def __init__(self, zmq_context=None): super(ServerUplinkConnector, self).__init__(zmq_context) self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH) self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.RECEIVER_PORT}') self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0) - def send_to_eventhandler(self, message): - """ - Send a message to an event handler through the TFW server. - - This envelopes the desired message in the 'data' field of the message to - TFWServer, which will mirror it to event handlers. - - :param message: JSON message you want to send - """ - self.send({ - 'key': 'mirror', - 'data': message - }) - - def send(self, message): - """ - Send a message to the frontend through the TFW server. - - :param message: JSON message you want to send - """ + def send_message(self, message, scope=Scope.ZMQ): + message['scope'] = scope.value self._zmq_push_socket.send_multipart(serialize_tfw_msg(message)) - def broadcast(self, message): - """ - Broadast a message through the TFW server. - - This envelopes the desired message in the 'data' field of the message to - TFWServer, which will broadast it. - - :param message: JSON message you want to send - """ - self.send({ - 'key': 'broadcast', - 'data': message - }) - def close(self): self._zmq_push_socket.close() diff --git a/lib/tfw/networking/message_sender.py b/lib/tfw/networking/message_sender.py index 58eb1c9..1232aaa 100644 --- a/lib/tfw/networking/message_sender.py +++ b/lib/tfw/networking/message_sender.py @@ -19,14 +19,14 @@ class MessageSender: :param originator: name of sender to be displayed on the frontend :param message: message to send """ - data = { - 'originator': originator, - 'message': message - } - self.server_connector.send({ + message = { 'key': self.key, - 'data': data - }) + 'data': { + 'originator': originator, + 'message': message + } + } + self.server_connector.send_message(message) def queue_messages(self, originator, messages): """ @@ -34,16 +34,16 @@ class MessageSender: :param originator: name of sender to be displayed on the frontend :param messages: list of messages to queue """ - data = { - 'messages': [ - {'message': message, 'originator': originator} - for message in messages - ] - } - self.server_connector.send({ + message = { 'key': self.queue_key, - 'data': data - }) + 'data': { + 'messages': [ + {'message': message, 'originator': originator} + for message in messages + ] + } + } + self.server_connector.send_message(message) @staticmethod def generate_messages_from_queue(queue_message): diff --git a/lib/tfw/networking/server/tfw_server.py b/lib/tfw/networking/server/tfw_server.py index a1a0f5f..ed28e62 100644 --- a/lib/tfw/networking/server/tfw_server.py +++ b/lib/tfw/networking/server/tfw_server.py @@ -1,114 +1,28 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from abc import ABC, abstractmethod -from contextlib import suppress - from tornado.web import Application from tfw.networking.server.zmq_websocket_proxy import ZMQWebSocketProxy -from tfw.networking.event_handlers.server_connector import ServerUplinkConnector from tfw.networking.server.event_handler_connector import EventHandlerConnector -from tfw.networking.message_sender import MessageSender -from tfw.networking.fsm_aware import FSMAware -from tfw.crypto import KeyManager, verify_message, sign_message from tfw.config.logs import logging LOG = logging.getLogger(__name__) -class TFWServer(FSMAware): +class TFWServer: """ This class handles the proxying of messages between the frontend and event handers. It proxies messages from the "/ws" route to all event handlers subscribed to a ZMQ SUB socket. """ def __init__(self): - super().__init__() self._event_handler_connector = EventHandlerConnector() - self._uplink_connector = ServerUplinkConnector() - self._auth_key = KeyManager().auth_key - self.application = Application([( r'/ws', ZMQWebSocketProxy, { 'event_handler_connector': self._event_handler_connector, - 'proxy_filters_and_callbacks': { - 'message_handlers': [ - self.handle_trigger, - self.handle_recover, - self.handle_fsm_update - ], - 'frontend_message_handlers': [self.save_frontend_messages] - } } )]) - self._frontend_messages = FrontendMessageStorage() - - def handle_trigger(self, message): - if 'trigger' in message: - LOG.debug('Executing handler for trigger "%s"', message.get('trigger', '')) - fsm_eh_command = { - 'key': 'fsm', - 'data': { - 'command': 'trigger', - 'value': message['trigger'] - } - } - if verify_message(self._auth_key, message): - sign_message(self._auth_key, fsm_eh_command) - self._uplink_connector.send_to_eventhandler(fsm_eh_command) - - def handle_recover(self, message): - if message['key'] == 'recover': - self._frontend_messages.replay_messages(self._uplink_connector) - self._frontend_messages.clear() - - def handle_fsm_update(self, message): - self.update_fsm_data(message) - - def save_frontend_messages(self, message): - self._frontend_messages.save_message(message) - def listen(self, port): self.application.listen(port) - - -class MessageStorage(ABC): - def __init__(self): - self.saved_messages = [] - - def save_message(self, message): - with suppress(KeyError, AttributeError): - if self.filter_message(message): - self.saved_messages.extend(self.transform_message(message)) - - @abstractmethod - def filter_message(self, message): - raise NotImplementedError - - def transform_message(self, message): # pylint: disable=no-self-use - yield message - - def clear(self): - self.saved_messages.clear() - - -class FrontendMessageStorage(MessageStorage): - def filter_message(self, message): - key = message['key'] - command = message.get('data', {}).get('command') - return ( - key in ('message', 'dashboard', 'queueMessages') - or key == 'ide' and command in ('select', 'read') - ) - - def transform_message(self, message): - if message['key'] == 'queueMessages': - yield from MessageSender.generate_messages_from_queue(message) - else: - yield message - - def replay_messages(self, connector): - for message in self.saved_messages: - connector.send(message) diff --git a/lib/tfw/networking/server/zmq_websocket_proxy.py b/lib/tfw/networking/server/zmq_websocket_proxy.py index 15826bd..b813dca 100644 --- a/lib/tfw/networking/server/zmq_websocket_proxy.py +++ b/lib/tfw/networking/server/zmq_websocket_proxy.py @@ -5,7 +5,7 @@ import json from tornado.websocket import WebSocketHandler -from tfw.mixins.callback_mixin import CallbackMixin +from tfw.networking.event_handlers.server_connector import Scope from tfw.config.logs import logging LOG = logging.getLogger(__name__) @@ -14,38 +14,18 @@ LOG = logging.getLogger(__name__) class ZMQWebSocketProxy(WebSocketHandler): # pylint: disable=abstract-method instances = set() - sequence_number = 0 def initialize(self, **kwargs): # pylint: disable=arguments-differ - self._event_handler_connector = kwargs['event_handler_connector'] - self._proxy_filters_and_callbacks = kwargs.get('proxy_filters_and_callbacks', {}) + self.event_handler_connector = kwargs['event_handler_connector'] + self.tfw_router = TFWRouter(self.send_to_zmq, self.send_to_websockets) - self.proxy_eventhandler_to_websocket = TFWProxy( - self.send_eventhandler_message, - self.send_websocket_message - ) - self.proxy_websocket_to_eventhandler = TFWProxy( - self.send_websocket_message, - self.send_eventhandler_message - ) + def send_to_zmq(self, message): + self.event_handler_connector.send_message(message) - self.subscribe_proxy_callbacks() - - def subscribe_proxy_callbacks(self): - eventhandler_message_handlers = self._proxy_filters_and_callbacks.get('eventhandler_message_handlers', []) - frontend_message_handlers = self._proxy_filters_and_callbacks.get('frontend_message_handlers', []) - message_handlers = self._proxy_filters_and_callbacks.get('message_handlers', []) - proxy_filters = self._proxy_filters_and_callbacks.get('proxy_filters', []) - - self.proxy_websocket_to_eventhandler.subscribe_proxy_callbacks_and_filters( - eventhandler_message_handlers + message_handlers, - proxy_filters - ) - - self.proxy_eventhandler_to_websocket.subscribe_proxy_callbacks_and_filters( - frontend_message_handlers + message_handlers, - proxy_filters - ) + @staticmethod + def send_to_websockets(message): + for instance in ZMQWebSocketProxy.instances: + instance.write_message(message) def prepare(self): ZMQWebSocketProxy.instances.add(self) @@ -54,102 +34,39 @@ class ZMQWebSocketProxy(WebSocketHandler): ZMQWebSocketProxy.instances.remove(self) def open(self, *args, **kwargs): - LOG.debug('WebSocket connection initiated') - self._event_handler_connector.register_callback(self.eventhander_callback) + LOG.debug('WebSocket connection initiated!') + self.event_handler_connector.register_callback(self.zmq_callback) - def eventhander_callback(self, message): - """ - Invoked on ZMQ messages from event handlers. - """ - self.sequence_message(message) - LOG.debug('Received on pull socket: %s', message) - self.proxy_eventhandler_to_websocket(message) - - @classmethod - def sequence_message(cls, message): - cls.sequence_number += 1 - message['seq'] = cls.sequence_number + def zmq_callback(self, message): + LOG.debug('Received on ZMQ pull socket: %s', message) + self.tfw_router.route(message) def on_message(self, message): - """ - Invoked on WS messages from frontend. - """ message = json.loads(message) - self.sequence_message(message) LOG.debug('Received on WebSocket: %s', message) - self.proxy_websocket_to_eventhandler(message) - - def send_eventhandler_message(self, message): - self._event_handler_connector.send_message(message) - - @staticmethod - def send_websocket_message(message): - for instance in ZMQWebSocketProxy.instances: - instance.write_message(message) + self.tfw_router.route(message) # much secure, very cors, wow def check_origin(self, origin): return True -class TFWProxy: - # pylint: disable=protected-access - def __init__(self, to_source, to_destination): - self.to_source = to_source - self.to_destination = to_destination +class TFWRouter: + def __init__(self, send_to_zmq, send_to_websockets): + self.send_to_zmq = send_to_zmq + self.send_to_websockets = send_to_websockets - self.proxy_filters = CallbackMixin() - self.proxy_callbacks = CallbackMixin() + def route(self, message): + scope = Scope(message.get('scope', 'zmq')) - self.proxy_filters.subscribe_callback(self.validate_message) - - self.keyhandlers = { - 'mirror': self.mirror, - 'broadcast': self.broadcast + routing_table = { + Scope.ZMQ: self.send_to_zmq, + Scope.WEBSOCKET: self.send_to_websockets, + Scope.BROADCAST: self.broadcast } - - @staticmethod - def validate_message(message): - if 'key' not in message: - raise ValueError('Invalid TFW message format!') - - def __call__(self, message): - if not self.filter_and_execute_callbacks(message): - return - - if message['key'] not in self.keyhandlers: - self.to_destination(message) - else: - handler = self.keyhandlers[message['key']] - try: - handler(message) - except KeyError: - LOG.error('Invalid "%s" message format! Ignoring.', handler.__name__) - - def filter_and_execute_callbacks(self, message): - try: - self.proxy_filters._execute_callbacks(message) - self.proxy_callbacks._execute_callbacks(message) - return True - except ValueError: - LOG.exception('Invalid TFW message received!') - return False - - def mirror(self, message): - message = message['data'] - if not self.filter_and_execute_callbacks(message): - return - LOG.debug('Mirroring message: %s', message) - self.to_source(message) + action = routing_table[scope] + action(message) def broadcast(self, message): - message = message['data'] - if not self.filter_and_execute_callbacks(message): - return - LOG.debug('Broadcasting message: %s', message) - self.to_source(message) - self.to_destination(message) - - def subscribe_proxy_callbacks_and_filters(self, proxy_callbacks, proxy_filters): - self.proxy_callbacks.subscribe_callbacks(*proxy_callbacks) - self.proxy_filters.subscribe_callbacks(*proxy_filters) + self.send_to_zmq(message) + self.send_to_websockets(message) From 2134d743c38e7ef4414b96b843be701fe71d8727 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Mon, 27 May 2019 14:09:13 +0200 Subject: [PATCH 06/10] Improve TFW lib layout --- lib/tfw/components/directory_monitor.py | 2 +- .../directory_monitoring_event_handler.py | 2 +- .../directory_snapshotting_event_handler.py | 2 +- lib/tfw/components/frontend_event_handler.py | 2 +- lib/tfw/components/fsm_managing_event_handler.py | 4 ++-- lib/tfw/components/ide_event_handler.py | 2 +- lib/tfw/components/log_monitor.py | 2 +- lib/tfw/components/log_monitoring_event_handler.py | 2 +- lib/tfw/components/pipe_io_event_handler.py | 2 +- .../components/process_managing_event_handler.py | 2 +- lib/tfw/components/terminal_event_handler.py | 3 +-- lib/tfw/crypto.py | 2 +- .../__init__.py | 3 ++- .../boradcasting_event_handler.py | 5 +++-- .../event_handler_base.py | 7 +------ .../event_handlers/frontend_event_handler_base.py | 8 ++++++++ lib/tfw/{networking => event_handlers}/fsm_aware.py | 1 - .../fsm_aware_event_handler.py | 4 ++-- lib/tfw/networking/__init__.py | 6 ++++-- .../{server => }/event_handler_connector.py | 5 +++-- lib/tfw/networking/event_handlers/__init__.py | 2 -- lib/tfw/networking/message_sender.py | 2 +- lib/tfw/networking/scope.py | 7 +++++++ lib/tfw/networking/server/__init__.py | 2 -- .../{event_handlers => }/server_connector.py | 13 ++++--------- lib/tfw/server/__init__.py | 1 + lib/tfw/{networking => }/server/tfw_server.py | 5 +++-- .../{networking => }/server/zmq_websocket_proxy.py | 2 +- supervisor/tfw_server.py | 2 +- 29 files changed, 54 insertions(+), 48 deletions(-) rename lib/tfw/{event_handler_base => event_handlers}/__init__.py (67%) rename lib/tfw/{event_handler_base => event_handlers}/boradcasting_event_handler.py (87%) rename lib/tfw/{event_handler_base => event_handlers}/event_handler_base.py (93%) create mode 100644 lib/tfw/event_handlers/frontend_event_handler_base.py rename lib/tfw/{networking => event_handlers}/fsm_aware.py (99%) rename lib/tfw/{event_handler_base => event_handlers}/fsm_aware_event_handler.py (84%) rename lib/tfw/networking/{server => }/event_handler_connector.py (92%) delete mode 100644 lib/tfw/networking/event_handlers/__init__.py create mode 100644 lib/tfw/networking/scope.py delete mode 100644 lib/tfw/networking/server/__init__.py rename lib/tfw/networking/{event_handlers => }/server_connector.py (87%) create mode 100644 lib/tfw/server/__init__.py rename lib/tfw/{networking => }/server/tfw_server.py (83%) rename lib/tfw/{networking => }/server/zmq_websocket_proxy.py (97%) diff --git a/lib/tfw/components/directory_monitor.py b/lib/tfw/components/directory_monitor.py index 6aa2ca0..f4fb699 100644 --- a/lib/tfw/components/directory_monitor.py +++ b/lib/tfw/components/directory_monitor.py @@ -5,7 +5,7 @@ from functools import wraps from watchdog.events import FileSystemEventHandler as FileSystemWatchdogEventHandler -from tfw.networking.event_handlers.server_connector import ServerUplinkConnector, Scope +from tfw.networking import ServerUplinkConnector, Scope from tfw.decorators.rate_limiter import RateLimiter from tfw.mixins.observer_mixin import ObserverMixin diff --git a/lib/tfw/components/directory_monitoring_event_handler.py b/lib/tfw/components/directory_monitoring_event_handler.py index 03c4a3a..18bb8ca 100644 --- a/lib/tfw/components/directory_monitoring_event_handler.py +++ b/lib/tfw/components/directory_monitoring_event_handler.py @@ -3,7 +3,7 @@ from os.path import isdir, exists -from tfw.event_handler_base import FrontendEventHandlerBase +from tfw.event_handlers import FrontendEventHandlerBase from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin from tfw.components.directory_monitor import DirectoryMonitor from tfw.config.logs import logging diff --git a/lib/tfw/components/directory_snapshotting_event_handler.py b/lib/tfw/components/directory_snapshotting_event_handler.py index 6a285a8..d60732f 100644 --- a/lib/tfw/components/directory_snapshotting_event_handler.py +++ b/lib/tfw/components/directory_snapshotting_event_handler.py @@ -8,7 +8,7 @@ from datetime import datetime from dateutil import parser as dateparser -from tfw.event_handler_base import FrontendEventHandlerBase +from tfw.event_handlers import FrontendEventHandlerBase from tfw.components.snapshot_provider import SnapshotProvider from tfw.config import TFWENV from tfw.config.logs import logging diff --git a/lib/tfw/components/frontend_event_handler.py b/lib/tfw/components/frontend_event_handler.py index 898bb7e..73028ba 100644 --- a/lib/tfw/components/frontend_event_handler.py +++ b/lib/tfw/components/frontend_event_handler.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from contextlib import suppress from tfw.networking.message_sender import MessageSender -from tfw.event_handler_base import FrontendEventHandlerBase +from tfw.event_handlers import FrontendEventHandlerBase class FrontendEventHandler(FrontendEventHandlerBase): diff --git a/lib/tfw/components/fsm_managing_event_handler.py b/lib/tfw/components/fsm_managing_event_handler.py index 3069fc1..ac6e6fc 100644 --- a/lib/tfw/components/fsm_managing_event_handler.py +++ b/lib/tfw/components/fsm_managing_event_handler.py @@ -1,10 +1,10 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from tfw.event_handler_base import FrontendEventHandlerBase +from tfw.event_handlers import FrontendEventHandlerBase from tfw.crypto import KeyManager, sign_message, verify_message from tfw.config.logs import logging -from tfw.networking.event_handlers.server_connector import Scope +from tfw.networking import Scope LOG = logging.getLogger(__name__) diff --git a/lib/tfw/components/ide_event_handler.py b/lib/tfw/components/ide_event_handler.py index 534de6a..2632b45 100644 --- a/lib/tfw/components/ide_event_handler.py +++ b/lib/tfw/components/ide_event_handler.py @@ -6,7 +6,7 @@ from glob import glob from fnmatch import fnmatchcase from typing import Iterable -from tfw.event_handler_base import FrontendEventHandlerBase +from tfw.event_handlers import FrontendEventHandlerBase from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin from tfw.components.directory_monitor import DirectoryMonitor from tfw.config.logs import logging diff --git a/lib/tfw/components/log_monitor.py b/lib/tfw/components/log_monitor.py index 2ee6d96..b8a0b49 100644 --- a/lib/tfw/components/log_monitor.py +++ b/lib/tfw/components/log_monitor.py @@ -6,7 +6,7 @@ from os.path import dirname from watchdog.events import PatternMatchingEventHandler as PatternMatchingWatchdogEventHandler -from tfw.networking.event_handlers.server_connector import ServerUplinkConnector, Scope +from tfw.networking import ServerUplinkConnector, Scope from tfw.decorators.rate_limiter import RateLimiter from tfw.mixins.observer_mixin import ObserverMixin from tfw.mixins.supervisor_mixin import SupervisorLogMixin diff --git a/lib/tfw/components/log_monitoring_event_handler.py b/lib/tfw/components/log_monitoring_event_handler.py index d384323..23fe5b9 100644 --- a/lib/tfw/components/log_monitoring_event_handler.py +++ b/lib/tfw/components/log_monitoring_event_handler.py @@ -1,7 +1,7 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from tfw.event_handler_base import FrontendEventHandlerBase +from tfw.event_handlers import FrontendEventHandlerBase from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin from tfw.components.log_monitor import LogMonitor from tfw.config.logs import logging diff --git a/lib/tfw/components/pipe_io_event_handler.py b/lib/tfw/components/pipe_io_event_handler.py index 37b5b89..e35455e 100644 --- a/lib/tfw/components/pipe_io_event_handler.py +++ b/lib/tfw/components/pipe_io_event_handler.py @@ -9,7 +9,7 @@ from secrets import token_urlsafe from threading import Thread from contextlib import suppress -from tfw.event_handler_base import EventHandlerBase +from tfw.event_handlers import EventHandlerBase from tfw.config.logs import logging from .pipe_io_server import PipeIOServer, terminate_process_on_failure diff --git a/lib/tfw/components/process_managing_event_handler.py b/lib/tfw/components/process_managing_event_handler.py index a87bdd6..f6489e6 100644 --- a/lib/tfw/components/process_managing_event_handler.py +++ b/lib/tfw/components/process_managing_event_handler.py @@ -3,7 +3,7 @@ from xmlrpc.client import Fault as SupervisorFault -from tfw.event_handler_base import FrontendEventHandlerBase +from tfw.event_handlers import FrontendEventHandlerBase from tfw.mixins.supervisor_mixin import SupervisorMixin, SupervisorLogMixin from tfw.components.directory_monitor import with_monitor_paused from tfw.config.logs import logging diff --git a/lib/tfw/components/terminal_event_handler.py b/lib/tfw/components/terminal_event_handler.py index f45bc8a..3427c4f 100644 --- a/lib/tfw/components/terminal_event_handler.py +++ b/lib/tfw/components/terminal_event_handler.py @@ -1,7 +1,7 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from tfw.event_handler_base import FrontendEventHandlerBase +from tfw.event_handlers import FrontendEventHandlerBase from tfw.components.terminado_mini_server import TerminadoMiniServer from tfw.config import TFWENV from tfw.config.logs import logging @@ -50,7 +50,6 @@ class TerminalEventHandler(FrontendEventHandlerBase): return self._historymonitor def handle_event(self, message): - LOG.debug('TerminadoEventHandler received event: %s', message) try: data = message['data'] message['data'] = self.commands[data['command']](data) diff --git a/lib/tfw/crypto.py b/lib/tfw/crypto.py index 0b37893..344abee 100644 --- a/lib/tfw/crypto.py +++ b/lib/tfw/crypto.py @@ -14,7 +14,7 @@ from cryptography.hazmat.primitives.hashes import SHA256 from cryptography.hazmat.primitives.hmac import HMAC as _HMAC from cryptography.exceptions import InvalidSignature -from tfw.networking.serialization import message_bytes +from tfw.networking import message_bytes from tfw.decorators.lazy_property import lazy_property from tfw.config import TFWENV diff --git a/lib/tfw/event_handler_base/__init__.py b/lib/tfw/event_handlers/__init__.py similarity index 67% rename from lib/tfw/event_handler_base/__init__.py rename to lib/tfw/event_handlers/__init__.py index 65ca6e1..858b781 100644 --- a/lib/tfw/event_handler_base/__init__.py +++ b/lib/tfw/event_handlers/__init__.py @@ -1,6 +1,7 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from .event_handler_base import EventHandlerBase, FrontendEventHandlerBase +from .event_handler_base import EventHandlerBase +from .frontend_event_handler_base import FrontendEventHandlerBase from .boradcasting_event_handler import BroadcastingEventHandler from .fsm_aware_event_handler import FSMAwareEventHandler diff --git a/lib/tfw/event_handler_base/boradcasting_event_handler.py b/lib/tfw/event_handlers/boradcasting_event_handler.py similarity index 87% rename from lib/tfw/event_handler_base/boradcasting_event_handler.py rename to lib/tfw/event_handlers/boradcasting_event_handler.py index cfda694..c6f61a9 100644 --- a/lib/tfw/event_handler_base/boradcasting_event_handler.py +++ b/lib/tfw/event_handlers/boradcasting_event_handler.py @@ -3,10 +3,11 @@ from abc import ABC -from tfw.event_handler_base.event_handler_base import EventHandlerBase -from tfw.networking.event_handlers.server_connector import Scope +from tfw.networking import Scope from tfw.crypto import message_checksum +from .event_handler_base import EventHandlerBase + class BroadcastingEventHandler(EventHandlerBase, ABC): # pylint: disable=abstract-method diff --git a/lib/tfw/event_handler_base/event_handler_base.py b/lib/tfw/event_handlers/event_handler_base.py similarity index 93% rename from lib/tfw/event_handler_base/event_handler_base.py rename to lib/tfw/event_handlers/event_handler_base.py index f51c668..bb3155f 100644 --- a/lib/tfw/event_handler_base/event_handler_base.py +++ b/lib/tfw/event_handlers/event_handler_base.py @@ -5,7 +5,7 @@ from abc import ABC, abstractmethod from inspect import currentframe from typing import Iterable -from tfw.networking.event_handlers.server_connector import ServerConnector, Scope +from tfw.networking import ServerConnector from tfw.config.logs import logging LOG = logging.getLogger(__name__) @@ -131,8 +131,3 @@ class EventHandlerBase(ABC): instance for instance in locals_values if isinstance(instance, cls) } - - -class FrontendEventHandlerBase(EventHandlerBase): # pylint: disable=abstract-method - def send_message(self, message): - self.server_connector.send_message(message, Scope.WEBSOCKET) diff --git a/lib/tfw/event_handlers/frontend_event_handler_base.py b/lib/tfw/event_handlers/frontend_event_handler_base.py new file mode 100644 index 0000000..6990632 --- /dev/null +++ b/lib/tfw/event_handlers/frontend_event_handler_base.py @@ -0,0 +1,8 @@ +from tfw.networking import Scope + +from .event_handler_base import EventHandlerBase + + +class FrontendEventHandlerBase(EventHandlerBase): # pylint: disable=abstract-method + def send_message(self, message): + self.server_connector.send_message(message, Scope.WEBSOCKET) diff --git a/lib/tfw/networking/fsm_aware.py b/lib/tfw/event_handlers/fsm_aware.py similarity index 99% rename from lib/tfw/networking/fsm_aware.py rename to lib/tfw/event_handlers/fsm_aware.py index cb2b287..bec3633 100644 --- a/lib/tfw/networking/fsm_aware.py +++ b/lib/tfw/event_handlers/fsm_aware.py @@ -2,7 +2,6 @@ # All Rights Reserved. See LICENSE file for details. from tfw.crypto import KeyManager, verify_message - from tfw.config.logs import logging LOG = logging.getLogger(__name__) diff --git a/lib/tfw/event_handler_base/fsm_aware_event_handler.py b/lib/tfw/event_handlers/fsm_aware_event_handler.py similarity index 84% rename from lib/tfw/event_handler_base/fsm_aware_event_handler.py rename to lib/tfw/event_handlers/fsm_aware_event_handler.py index 01d6360..60313df 100644 --- a/lib/tfw/event_handler_base/fsm_aware_event_handler.py +++ b/lib/tfw/event_handlers/fsm_aware_event_handler.py @@ -3,8 +3,8 @@ from abc import ABC -from tfw.event_handler_base.event_handler_base import EventHandlerBase -from tfw.networking.fsm_aware import FSMAware +from .event_handler_base import EventHandlerBase +from .fsm_aware import FSMAware class FSMAwareEventHandler(EventHandlerBase, FSMAware, ABC): diff --git a/lib/tfw/networking/__init__.py b/lib/tfw/networking/__init__.py index 500dd7a..7772eb6 100644 --- a/lib/tfw/networking/__init__.py +++ b/lib/tfw/networking/__init__.py @@ -1,6 +1,8 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. +from .serialization import serialize_tfw_msg, deserialize_tfw_msg, with_deserialize_tfw_msg, message_bytes +from .server_connector import ServerUplinkConnector, ServerDownlinkConnector, ServerConnector +from .event_handler_connector import EventHandlerConnector from .message_sender import MessageSender -from .event_handlers.server_connector import ServerUplinkConnector as TFWServerConnector -from .server.tfw_server import TFWServer +from .scope import Scope diff --git a/lib/tfw/networking/server/event_handler_connector.py b/lib/tfw/networking/event_handler_connector.py similarity index 92% rename from lib/tfw/networking/server/event_handler_connector.py rename to lib/tfw/networking/event_handler_connector.py index 7b56a88..67ca7ba 100644 --- a/lib/tfw/networking/server/event_handler_connector.py +++ b/lib/tfw/networking/event_handler_connector.py @@ -4,11 +4,12 @@ import zmq from zmq.eventloop.zmqstream import ZMQStream -from tfw.networking.zmq_connector_base import ZMQConnectorBase -from tfw.networking.serialization import serialize_tfw_msg, with_deserialize_tfw_msg from tfw.config import TFWENV from tfw.config.logs import logging +from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg +from .zmq_connector_base import ZMQConnectorBase + LOG = logging.getLogger(__name__) diff --git a/lib/tfw/networking/event_handlers/__init__.py b/lib/tfw/networking/event_handlers/__init__.py deleted file mode 100644 index db64b25..0000000 --- a/lib/tfw/networking/event_handlers/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# Copyright (C) 2018 Avatao.com Innovative Learning Kft. -# All Rights Reserved. See LICENSE file for details. diff --git a/lib/tfw/networking/message_sender.py b/lib/tfw/networking/message_sender.py index 1232aaa..10dd462 100644 --- a/lib/tfw/networking/message_sender.py +++ b/lib/tfw/networking/message_sender.py @@ -1,7 +1,7 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from tfw.networking.event_handlers.server_connector import ServerUplinkConnector +from .server_connector import ServerUplinkConnector class MessageSender: diff --git a/lib/tfw/networking/scope.py b/lib/tfw/networking/scope.py new file mode 100644 index 0000000..639461b --- /dev/null +++ b/lib/tfw/networking/scope.py @@ -0,0 +1,7 @@ +from enum import Enum + + +class Scope(Enum): + ZMQ = 'zmq' + WEBSOCKET = 'websocket' + BROADCAST = 'broadcast' diff --git a/lib/tfw/networking/server/__init__.py b/lib/tfw/networking/server/__init__.py deleted file mode 100644 index db64b25..0000000 --- a/lib/tfw/networking/server/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# Copyright (C) 2018 Avatao.com Innovative Learning Kft. -# All Rights Reserved. See LICENSE file for details. diff --git a/lib/tfw/networking/event_handlers/server_connector.py b/lib/tfw/networking/server_connector.py similarity index 87% rename from lib/tfw/networking/event_handlers/server_connector.py rename to lib/tfw/networking/server_connector.py index 73b04e0..c0e2bf1 100644 --- a/lib/tfw/networking/event_handlers/server_connector.py +++ b/lib/tfw/networking/server_connector.py @@ -2,16 +2,17 @@ # All Rights Reserved. See LICENSE file for details. from functools import partial -from enum import Enum import zmq from zmq.eventloop.zmqstream import ZMQStream -from tfw.networking.zmq_connector_base import ZMQConnectorBase -from tfw.networking.serialization import serialize_tfw_msg, with_deserialize_tfw_msg from tfw.config import TFWENV from tfw.config.logs import logging +from .scope import Scope +from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg +from .zmq_connector_base import ZMQConnectorBase + LOG = logging.getLogger(__name__) @@ -34,12 +35,6 @@ class ServerDownlinkConnector(ZMQConnectorBase): self._zmq_sub_stream.close() -class Scope(Enum): - ZMQ = 'zmq' - WEBSOCKET = 'websocket' - BROADCAST = 'broadcast' - - class ServerUplinkConnector(ZMQConnectorBase): def __init__(self, zmq_context=None): super(ServerUplinkConnector, self).__init__(zmq_context) diff --git a/lib/tfw/server/__init__.py b/lib/tfw/server/__init__.py new file mode 100644 index 0000000..e2c01a9 --- /dev/null +++ b/lib/tfw/server/__init__.py @@ -0,0 +1 @@ +from .tfw_server import TFWServer diff --git a/lib/tfw/networking/server/tfw_server.py b/lib/tfw/server/tfw_server.py similarity index 83% rename from lib/tfw/networking/server/tfw_server.py rename to lib/tfw/server/tfw_server.py index ed28e62..a4cdd3f 100644 --- a/lib/tfw/networking/server/tfw_server.py +++ b/lib/tfw/server/tfw_server.py @@ -3,10 +3,11 @@ from tornado.web import Application -from tfw.networking.server.zmq_websocket_proxy import ZMQWebSocketProxy -from tfw.networking.server.event_handler_connector import EventHandlerConnector +from tfw.networking import EventHandlerConnector from tfw.config.logs import logging +from .zmq_websocket_proxy import ZMQWebSocketProxy + LOG = logging.getLogger(__name__) diff --git a/lib/tfw/networking/server/zmq_websocket_proxy.py b/lib/tfw/server/zmq_websocket_proxy.py similarity index 97% rename from lib/tfw/networking/server/zmq_websocket_proxy.py rename to lib/tfw/server/zmq_websocket_proxy.py index b813dca..b820f85 100644 --- a/lib/tfw/networking/server/zmq_websocket_proxy.py +++ b/lib/tfw/server/zmq_websocket_proxy.py @@ -5,7 +5,7 @@ import json from tornado.websocket import WebSocketHandler -from tfw.networking.event_handlers.server_connector import Scope +from tfw.networking import Scope from tfw.config.logs import logging LOG = logging.getLogger(__name__) diff --git a/supervisor/tfw_server.py b/supervisor/tfw_server.py index 78766ac..4cb8bd6 100644 --- a/supervisor/tfw_server.py +++ b/supervisor/tfw_server.py @@ -1,6 +1,6 @@ from tornado.ioloop import IOLoop -from tfw.networking import TFWServer +from tfw.server import TFWServer from tfw.config import TFWENV From 82df8a80654d3b9c938a4903bb0a4ba0077d9b45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Mon, 27 May 2019 14:16:51 +0200 Subject: [PATCH 07/10] Handle pylint suggestions --- .pylintrc | 1 + lib/tfw/event_handlers/event_handler_base.py | 1 - lib/tfw/event_handlers/fsm_aware.py | 1 - 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.pylintrc b/.pylintrc index fd195bc..1cbae5d 100644 --- a/.pylintrc +++ b/.pylintrc @@ -6,6 +6,7 @@ disable = missing-docstring, too-few-public-methods, invalid-name [SIMILARITIES] +min-similarity-lines=6 ignore-comments=yes ignore-docstrings=yes ignore-imports=yes diff --git a/lib/tfw/event_handlers/event_handler_base.py b/lib/tfw/event_handlers/event_handler_base.py index bb3155f..95300fc 100644 --- a/lib/tfw/event_handlers/event_handler_base.py +++ b/lib/tfw/event_handlers/event_handler_base.py @@ -118,7 +118,6 @@ class EventHandlerBase(ABC): Perform cleanup actions such as releasing database connections and stuff like that. """ - pass @classmethod def get_local_instances(cls): diff --git a/lib/tfw/event_handlers/fsm_aware.py b/lib/tfw/event_handlers/fsm_aware.py index bec3633..13cea43 100644 --- a/lib/tfw/event_handlers/fsm_aware.py +++ b/lib/tfw/event_handlers/fsm_aware.py @@ -42,4 +42,3 @@ class FSMAware: :param kwargs: fsm_update 'data' field """ - pass From e44a99fa6dae9944b410875e75cb98ad83d448d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Mon, 27 May 2019 20:02:09 +0200 Subject: [PATCH 08/10] Remove ZMQConnectorBase --- lib/tfw/networking/event_handler_connector.py | 19 ++++++++++--------- lib/tfw/networking/server_connector.py | 19 ++++++++++--------- lib/tfw/networking/zmq_connector_base.py | 9 --------- 3 files changed, 20 insertions(+), 27 deletions(-) delete mode 100644 lib/tfw/networking/zmq_connector_base.py diff --git a/lib/tfw/networking/event_handler_connector.py b/lib/tfw/networking/event_handler_connector.py index 67ca7ba..03f2513 100644 --- a/lib/tfw/networking/event_handler_connector.py +++ b/lib/tfw/networking/event_handler_connector.py @@ -8,15 +8,13 @@ from tfw.config import TFWENV from tfw.config.logs import logging from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg -from .zmq_connector_base import ZMQConnectorBase LOG = logging.getLogger(__name__) -class EventHandlerDownlinkConnector(ZMQConnectorBase): - def __init__(self, zmq_context=None): - super(EventHandlerDownlinkConnector, self).__init__(zmq_context) - self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL) +class EventHandlerDownlinkConnector(): + def __init__(self): + self._zmq_pull_socket = zmq.Context.instance().socket(zmq.PULL) self._zmq_pull_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket) address = f'tcp://*:{TFWENV.RECEIVER_PORT}' @@ -31,10 +29,9 @@ class EventHandlerDownlinkConnector(ZMQConnectorBase): self._zmq_pull_stream.close() -class EventHandlerUplinkConnector(ZMQConnectorBase): - def __init__(self, zmq_context=None): - super(EventHandlerUplinkConnector, self).__init__(zmq_context) - self._zmq_pub_socket = self._zmq_context.socket(zmq.PUB) +class EventHandlerUplinkConnector(): + def __init__(self): + self._zmq_pub_socket = zmq.Context.instance().socket(zmq.PUB) self._zmq_pub_socket.setsockopt(zmq.SNDHWM, 0) address = f'tcp://*:{TFWENV.PUBLISHER_PORT}' self._zmq_pub_socket.bind(address) @@ -48,6 +45,10 @@ class EventHandlerUplinkConnector(ZMQConnectorBase): class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector): + def __init__(self): + EventHandlerDownlinkConnector.__init__(self) + EventHandlerUplinkConnector.__init__(self) + def close(self): EventHandlerDownlinkConnector.close(self) EventHandlerUplinkConnector.close(self) diff --git a/lib/tfw/networking/server_connector.py b/lib/tfw/networking/server_connector.py index c0e2bf1..da84289 100644 --- a/lib/tfw/networking/server_connector.py +++ b/lib/tfw/networking/server_connector.py @@ -11,15 +11,13 @@ from tfw.config.logs import logging from .scope import Scope from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg -from .zmq_connector_base import ZMQConnectorBase LOG = logging.getLogger(__name__) -class ServerDownlinkConnector(ZMQConnectorBase): - def __init__(self, zmq_context=None): - super(ServerDownlinkConnector, self).__init__(zmq_context) - self._zmq_sub_socket = self._zmq_context.socket(zmq.SUB) +class ServerDownlinkConnector(): + def __init__(self): + self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB) self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUBLISHER_PORT}') self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) @@ -35,10 +33,9 @@ class ServerDownlinkConnector(ZMQConnectorBase): self._zmq_sub_stream.close() -class ServerUplinkConnector(ZMQConnectorBase): - def __init__(self, zmq_context=None): - super(ServerUplinkConnector, self).__init__(zmq_context) - self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH) +class ServerUplinkConnector(): + def __init__(self): + self._zmq_push_socket = zmq.Context.instance().socket(zmq.PUSH) self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.RECEIVER_PORT}') self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0) @@ -51,6 +48,10 @@ class ServerUplinkConnector(ZMQConnectorBase): class ServerConnector(ServerUplinkConnector, ServerDownlinkConnector): + def __init__(self): + ServerUplinkConnector.__init__(self) + ServerDownlinkConnector.__init__(self) + def close(self): ServerUplinkConnector.close(self) ServerDownlinkConnector.close(self) diff --git a/lib/tfw/networking/zmq_connector_base.py b/lib/tfw/networking/zmq_connector_base.py deleted file mode 100644 index 47a850e..0000000 --- a/lib/tfw/networking/zmq_connector_base.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (C) 2018 Avatao.com Innovative Learning Kft. -# All Rights Reserved. See LICENSE file for details. - -import zmq - - -class ZMQConnectorBase: - def __init__(self, zmq_context=None): - self._zmq_context = zmq_context or zmq.Context.instance() From f151ecfbac378187856c0d3c0af6fb835dd30933 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Mon, 27 May 2019 20:11:03 +0200 Subject: [PATCH 09/10] Improve ZMQ port envvar names --- Dockerfile | 4 ++-- lib/tfw/networking/event_handler_connector.py | 4 ++-- lib/tfw/networking/server_connector.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Dockerfile b/Dockerfile index 22237e8..94eee57 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,8 +23,8 @@ ENV TFW_PUBLIC_PORT=8888 \ TFW_LOGIN_APP_PORT=6666 \ TFW_TERMINADO_PORT=7878 \ TFW_SUPERVISOR_HTTP_PORT=9001 \ - TFW_PUBLISHER_PORT=7654 \ - TFW_RECEIVER_PORT=8765 + TFW_PUB_PORT=7654 \ + TFW_PULL_PORT=8765 EXPOSE ${TFW_PUBLIC_PORT} diff --git a/lib/tfw/networking/event_handler_connector.py b/lib/tfw/networking/event_handler_connector.py index 03f2513..bf2c9ae 100644 --- a/lib/tfw/networking/event_handler_connector.py +++ b/lib/tfw/networking/event_handler_connector.py @@ -17,7 +17,7 @@ class EventHandlerDownlinkConnector(): self._zmq_pull_socket = zmq.Context.instance().socket(zmq.PULL) self._zmq_pull_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket) - address = f'tcp://*:{TFWENV.RECEIVER_PORT}' + address = f'tcp://*:{TFWENV.PULL_PORT}' self._zmq_pull_socket.bind(address) LOG.debug('Pull socket bound to %s', address) @@ -33,7 +33,7 @@ class EventHandlerUplinkConnector(): def __init__(self): self._zmq_pub_socket = zmq.Context.instance().socket(zmq.PUB) self._zmq_pub_socket.setsockopt(zmq.SNDHWM, 0) - address = f'tcp://*:{TFWENV.PUBLISHER_PORT}' + address = f'tcp://*:{TFWENV.PUB_PORT}' self._zmq_pub_socket.bind(address) LOG.debug('Pub socket bound to %s', address) diff --git a/lib/tfw/networking/server_connector.py b/lib/tfw/networking/server_connector.py index da84289..600cb9a 100644 --- a/lib/tfw/networking/server_connector.py +++ b/lib/tfw/networking/server_connector.py @@ -18,7 +18,7 @@ LOG = logging.getLogger(__name__) class ServerDownlinkConnector(): def __init__(self): self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB) - self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUBLISHER_PORT}') + self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUB_PORT}') self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) @@ -36,7 +36,7 @@ class ServerDownlinkConnector(): class ServerUplinkConnector(): def __init__(self): self._zmq_push_socket = zmq.Context.instance().socket(zmq.PUSH) - self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.RECEIVER_PORT}') + self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.PULL_PORT}') self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0) def send_message(self, message, scope=Scope.ZMQ): From c8e98af5161d4a6277d022b04b4fdcd9d864d866 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Tue, 4 Jun 2019 13:58:03 +0200 Subject: [PATCH 10/10] Improve module dependencies by moving port envvars out of tfw.networking --- lib/tfw/components/__init__.py | 1 + lib/tfw/components/directory_monitor.py | 5 +++-- lib/tfw/components/frontend_event_handler.py | 3 ++- lib/tfw/components/log_monitor.py | 5 +++-- .../message_sender.py | 4 ++-- lib/tfw/event_handlers/__init__.py | 1 + lib/tfw/event_handlers/event_handler_base.py | 5 +++-- .../event_handlers/tfw_server_connector.py | 19 +++++++++++++++++ lib/tfw/networking/__init__.py | 1 - lib/tfw/networking/event_handler_connector.py | 21 ++++++++----------- lib/tfw/networking/server_connector.py | 19 ++++++++--------- lib/tfw/server/tfw_server.py | 14 ++++++++----- ...ocket_proxy.py => zmq_websocket_router.py} | 12 +++++------ supervisor/tfw_server.py | 3 +-- 14 files changed, 68 insertions(+), 45 deletions(-) rename lib/tfw/{networking => components}/message_sender.py (93%) create mode 100644 lib/tfw/event_handlers/tfw_server_connector.py rename lib/tfw/server/{zmq_websocket_proxy.py => zmq_websocket_router.py} (88%) diff --git a/lib/tfw/components/__init__.py b/lib/tfw/components/__init__.py index 7d2ebc0..b955ad8 100644 --- a/lib/tfw/components/__init__.py +++ b/lib/tfw/components/__init__.py @@ -15,3 +15,4 @@ from .pipe_io_event_handler import TransformerPipeIOEventHandler, CommandEventHa from .directory_snapshotting_event_handler import DirectorySnapshottingEventHandler from .commands_equal import CommandsEqual from .frontend_event_handler import FrontendEventHandler +from .message_sender import MessageSender diff --git a/lib/tfw/components/directory_monitor.py b/lib/tfw/components/directory_monitor.py index f4fb699..7144f0e 100644 --- a/lib/tfw/components/directory_monitor.py +++ b/lib/tfw/components/directory_monitor.py @@ -5,7 +5,8 @@ from functools import wraps from watchdog.events import FileSystemEventHandler as FileSystemWatchdogEventHandler -from tfw.networking import ServerUplinkConnector, Scope +from tfw.networking import Scope +from tfw.event_handlers import TFWServerUplinkConnector from tfw.decorators.rate_limiter import RateLimiter from tfw.mixins.observer_mixin import ObserverMixin @@ -47,7 +48,7 @@ class IdeReloadWatchdogEventHandler(FileSystemWatchdogEventHandler): def __init__(self, ide_key): super().__init__() self.ide_key = ide_key - self.uplink = ServerUplinkConnector() + self.uplink = TFWServerUplinkConnector() self._paused = False self.ignore = 0 diff --git a/lib/tfw/components/frontend_event_handler.py b/lib/tfw/components/frontend_event_handler.py index 73028ba..1a76abe 100644 --- a/lib/tfw/components/frontend_event_handler.py +++ b/lib/tfw/components/frontend_event_handler.py @@ -1,9 +1,10 @@ from abc import ABC, abstractmethod from contextlib import suppress -from tfw.networking.message_sender import MessageSender from tfw.event_handlers import FrontendEventHandlerBase +from .message_sender import MessageSender + class FrontendEventHandler(FrontendEventHandlerBase): def __init__(self): diff --git a/lib/tfw/components/log_monitor.py b/lib/tfw/components/log_monitor.py index b8a0b49..14591e8 100644 --- a/lib/tfw/components/log_monitor.py +++ b/lib/tfw/components/log_monitor.py @@ -6,7 +6,8 @@ from os.path import dirname from watchdog.events import PatternMatchingEventHandler as PatternMatchingWatchdogEventHandler -from tfw.networking import ServerUplinkConnector, Scope +from tfw.networking import Scope +from tfw.event_handlers import TFWServerUplinkConnector from tfw.decorators.rate_limiter import RateLimiter from tfw.mixins.observer_mixin import ObserverMixin from tfw.mixins.supervisor_mixin import SupervisorLogMixin @@ -38,7 +39,7 @@ class SendLogWatchdogEventHandler(PatternMatchingWatchdogEventHandler, Superviso self.procinfo['stdout_logfile'], self.procinfo['stderr_logfile'] ]) - self.uplink = ServerUplinkConnector() + self.uplink = TFWServerUplinkConnector() self.log_tail = log_tail @property diff --git a/lib/tfw/networking/message_sender.py b/lib/tfw/components/message_sender.py similarity index 93% rename from lib/tfw/networking/message_sender.py rename to lib/tfw/components/message_sender.py index 10dd462..4f35baa 100644 --- a/lib/tfw/networking/message_sender.py +++ b/lib/tfw/components/message_sender.py @@ -1,7 +1,7 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from .server_connector import ServerUplinkConnector +from tfw.event_handlers import TFWServerUplinkConnector class MessageSender: @@ -9,7 +9,7 @@ class MessageSender: Provides mechanisms to send messages to our frontend messaging component. """ def __init__(self): - self.server_connector = ServerUplinkConnector() + self.server_connector = TFWServerUplinkConnector() self.key = 'message' self.queue_key = 'queueMessages' diff --git a/lib/tfw/event_handlers/__init__.py b/lib/tfw/event_handlers/__init__.py index 858b781..0327aef 100644 --- a/lib/tfw/event_handlers/__init__.py +++ b/lib/tfw/event_handlers/__init__.py @@ -5,3 +5,4 @@ from .event_handler_base import EventHandlerBase from .frontend_event_handler_base import FrontendEventHandlerBase from .boradcasting_event_handler import BroadcastingEventHandler from .fsm_aware_event_handler import FSMAwareEventHandler +from .tfw_server_connector import TFWServerUplinkConnector, TFWServerConnector diff --git a/lib/tfw/event_handlers/event_handler_base.py b/lib/tfw/event_handlers/event_handler_base.py index 95300fc..0cc0a9e 100644 --- a/lib/tfw/event_handlers/event_handler_base.py +++ b/lib/tfw/event_handlers/event_handler_base.py @@ -5,9 +5,10 @@ from abc import ABC, abstractmethod from inspect import currentframe from typing import Iterable -from tfw.networking import ServerConnector from tfw.config.logs import logging +from .tfw_server_connector import TFWServerConnector + LOG = logging.getLogger(__name__) @@ -19,7 +20,7 @@ class EventHandlerBase(ABC): Derived classes must implement the handle_event() method """ def __init__(self, key): - self.server_connector = ServerConnector() + self.server_connector = TFWServerConnector() self.keys = [] if isinstance(key, str): self.keys.append(key) diff --git a/lib/tfw/event_handlers/tfw_server_connector.py b/lib/tfw/event_handlers/tfw_server_connector.py new file mode 100644 index 0000000..161a6cb --- /dev/null +++ b/lib/tfw/event_handlers/tfw_server_connector.py @@ -0,0 +1,19 @@ +from functools import partial + +from tfw.networking import ServerUplinkConnector, ServerConnector +from tfw.config import TFWENV + + +UPLINK_CONN_ADDR = f'tcp://localhost:{TFWENV.PULL_PORT}' +DOWNLINK_CONN_ADDR = f'tcp://localhost:{TFWENV.PUB_PORT}' + + +TFWServerUplinkConnector = partial( + ServerUplinkConnector, + connect_addr=UPLINK_CONN_ADDR +) +TFWServerConnector = partial( + ServerConnector, + downlink_connect_addr=DOWNLINK_CONN_ADDR, + uplink_connect_addr=UPLINK_CONN_ADDR +) diff --git a/lib/tfw/networking/__init__.py b/lib/tfw/networking/__init__.py index 7772eb6..49e896d 100644 --- a/lib/tfw/networking/__init__.py +++ b/lib/tfw/networking/__init__.py @@ -4,5 +4,4 @@ from .serialization import serialize_tfw_msg, deserialize_tfw_msg, with_deserialize_tfw_msg, message_bytes from .server_connector import ServerUplinkConnector, ServerDownlinkConnector, ServerConnector from .event_handler_connector import EventHandlerConnector -from .message_sender import MessageSender from .scope import Scope diff --git a/lib/tfw/networking/event_handler_connector.py b/lib/tfw/networking/event_handler_connector.py index bf2c9ae..378db11 100644 --- a/lib/tfw/networking/event_handler_connector.py +++ b/lib/tfw/networking/event_handler_connector.py @@ -4,7 +4,6 @@ import zmq from zmq.eventloop.zmqstream import ZMQStream -from tfw.config import TFWENV from tfw.config.logs import logging from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg @@ -13,13 +12,12 @@ LOG = logging.getLogger(__name__) class EventHandlerDownlinkConnector(): - def __init__(self): + def __init__(self, bind_addr): self._zmq_pull_socket = zmq.Context.instance().socket(zmq.PULL) self._zmq_pull_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket) - address = f'tcp://*:{TFWENV.PULL_PORT}' - self._zmq_pull_socket.bind(address) - LOG.debug('Pull socket bound to %s', address) + self._zmq_pull_socket.bind(bind_addr) + LOG.debug('Pull socket bound to %s', bind_addr) def register_callback(self, callback): callback = with_deserialize_tfw_msg(callback) @@ -30,12 +28,11 @@ class EventHandlerDownlinkConnector(): class EventHandlerUplinkConnector(): - def __init__(self): + def __init__(self, bind_addr): self._zmq_pub_socket = zmq.Context.instance().socket(zmq.PUB) self._zmq_pub_socket.setsockopt(zmq.SNDHWM, 0) - address = f'tcp://*:{TFWENV.PUB_PORT}' - self._zmq_pub_socket.bind(address) - LOG.debug('Pub socket bound to %s', address) + self._zmq_pub_socket.bind(bind_addr) + LOG.debug('Pub socket bound to %s', bind_addr) def send_message(self, message: dict): self._zmq_pub_socket.send_multipart(serialize_tfw_msg(message)) @@ -45,9 +42,9 @@ class EventHandlerUplinkConnector(): class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector): - def __init__(self): - EventHandlerDownlinkConnector.__init__(self) - EventHandlerUplinkConnector.__init__(self) + def __init__(self, downlink_bind_addr, uplink_bind_addr): + EventHandlerDownlinkConnector.__init__(self, downlink_bind_addr) + EventHandlerUplinkConnector.__init__(self, uplink_bind_addr) def close(self): EventHandlerDownlinkConnector.close(self) diff --git a/lib/tfw/networking/server_connector.py b/lib/tfw/networking/server_connector.py index 600cb9a..6bee301 100644 --- a/lib/tfw/networking/server_connector.py +++ b/lib/tfw/networking/server_connector.py @@ -6,7 +6,6 @@ from functools import partial import zmq from zmq.eventloop.zmqstream import ZMQStream -from tfw.config import TFWENV from tfw.config.logs import logging from .scope import Scope @@ -16,9 +15,9 @@ LOG = logging.getLogger(__name__) class ServerDownlinkConnector(): - def __init__(self): + def __init__(self, connect_addr): self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB) - self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUB_PORT}') + self._zmq_sub_socket.connect(connect_addr) self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) @@ -34,9 +33,9 @@ class ServerDownlinkConnector(): class ServerUplinkConnector(): - def __init__(self): + def __init__(self, connect_addr): self._zmq_push_socket = zmq.Context.instance().socket(zmq.PUSH) - self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.PULL_PORT}') + self._zmq_push_socket.connect(connect_addr) self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0) def send_message(self, message, scope=Scope.ZMQ): @@ -47,11 +46,11 @@ class ServerUplinkConnector(): self._zmq_push_socket.close() -class ServerConnector(ServerUplinkConnector, ServerDownlinkConnector): - def __init__(self): - ServerUplinkConnector.__init__(self) - ServerDownlinkConnector.__init__(self) +class ServerConnector(ServerDownlinkConnector, ServerUplinkConnector): + def __init__(self, downlink_connect_addr, uplink_connect_addr): + ServerDownlinkConnector.__init__(self, downlink_connect_addr) + ServerUplinkConnector.__init__(self, uplink_connect_addr) def close(self): - ServerUplinkConnector.close(self) ServerDownlinkConnector.close(self) + ServerUplinkConnector.close(self) diff --git a/lib/tfw/server/tfw_server.py b/lib/tfw/server/tfw_server.py index a4cdd3f..ad2d37e 100644 --- a/lib/tfw/server/tfw_server.py +++ b/lib/tfw/server/tfw_server.py @@ -4,9 +4,10 @@ from tornado.web import Application from tfw.networking import EventHandlerConnector +from tfw.config import TFWENV from tfw.config.logs import logging -from .zmq_websocket_proxy import ZMQWebSocketProxy +from .zmq_websocket_router import ZMQWebSocketRouter LOG = logging.getLogger(__name__) @@ -18,12 +19,15 @@ class TFWServer: SUB socket. """ def __init__(self): - self._event_handler_connector = EventHandlerConnector() + self._event_handler_connector = EventHandlerConnector( + downlink_bind_addr=f'tcp://*:{TFWENV.PULL_PORT}', + uplink_bind_addr=f'tcp://*:{TFWENV.PUB_PORT}' + ) self.application = Application([( - r'/ws', ZMQWebSocketProxy, { + r'/ws', ZMQWebSocketRouter, { 'event_handler_connector': self._event_handler_connector, } )]) - def listen(self, port): - self.application.listen(port) + def listen(self): + self.application.listen(TFWENV.WEB_PORT) diff --git a/lib/tfw/server/zmq_websocket_proxy.py b/lib/tfw/server/zmq_websocket_router.py similarity index 88% rename from lib/tfw/server/zmq_websocket_proxy.py rename to lib/tfw/server/zmq_websocket_router.py index b820f85..d6021b6 100644 --- a/lib/tfw/server/zmq_websocket_proxy.py +++ b/lib/tfw/server/zmq_websocket_router.py @@ -11,7 +11,7 @@ from tfw.config.logs import logging LOG = logging.getLogger(__name__) -class ZMQWebSocketProxy(WebSocketHandler): +class ZMQWebSocketRouter(WebSocketHandler): # pylint: disable=abstract-method instances = set() @@ -22,16 +22,16 @@ class ZMQWebSocketProxy(WebSocketHandler): def send_to_zmq(self, message): self.event_handler_connector.send_message(message) - @staticmethod - def send_to_websockets(message): - for instance in ZMQWebSocketProxy.instances: + @classmethod + def send_to_websockets(cls, message): + for instance in cls.instances: instance.write_message(message) def prepare(self): - ZMQWebSocketProxy.instances.add(self) + type(self).instances.add(self) def on_close(self): - ZMQWebSocketProxy.instances.remove(self) + type(self).instances.remove(self) def open(self, *args, **kwargs): LOG.debug('WebSocket connection initiated!') diff --git a/supervisor/tfw_server.py b/supervisor/tfw_server.py index 4cb8bd6..a3848b6 100644 --- a/supervisor/tfw_server.py +++ b/supervisor/tfw_server.py @@ -1,9 +1,8 @@ from tornado.ioloop import IOLoop from tfw.server import TFWServer -from tfw.config import TFWENV if __name__ == '__main__': - TFWServer().listen(TFWENV.WEB_PORT) + TFWServer().listen() IOLoop.instance().start()