From c8e98af5161d4a6277d022b04b4fdcd9d864d866 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Tue, 4 Jun 2019 13:58:03 +0200 Subject: [PATCH] Improve module dependencies by moving port envvars out of tfw.networking --- lib/tfw/components/__init__.py | 1 + lib/tfw/components/directory_monitor.py | 5 +++-- lib/tfw/components/frontend_event_handler.py | 3 ++- lib/tfw/components/log_monitor.py | 5 +++-- .../message_sender.py | 4 ++-- lib/tfw/event_handlers/__init__.py | 1 + lib/tfw/event_handlers/event_handler_base.py | 5 +++-- .../event_handlers/tfw_server_connector.py | 19 +++++++++++++++++ lib/tfw/networking/__init__.py | 1 - lib/tfw/networking/event_handler_connector.py | 21 ++++++++----------- lib/tfw/networking/server_connector.py | 19 ++++++++--------- lib/tfw/server/tfw_server.py | 14 ++++++++----- ...ocket_proxy.py => zmq_websocket_router.py} | 12 +++++------ supervisor/tfw_server.py | 3 +-- 14 files changed, 68 insertions(+), 45 deletions(-) rename lib/tfw/{networking => components}/message_sender.py (93%) create mode 100644 lib/tfw/event_handlers/tfw_server_connector.py rename lib/tfw/server/{zmq_websocket_proxy.py => zmq_websocket_router.py} (88%) diff --git a/lib/tfw/components/__init__.py b/lib/tfw/components/__init__.py index 7d2ebc0..b955ad8 100644 --- a/lib/tfw/components/__init__.py +++ b/lib/tfw/components/__init__.py @@ -15,3 +15,4 @@ from .pipe_io_event_handler import TransformerPipeIOEventHandler, CommandEventHa from .directory_snapshotting_event_handler import DirectorySnapshottingEventHandler from .commands_equal import CommandsEqual from .frontend_event_handler import FrontendEventHandler +from .message_sender import MessageSender diff --git a/lib/tfw/components/directory_monitor.py b/lib/tfw/components/directory_monitor.py index f4fb699..7144f0e 100644 --- a/lib/tfw/components/directory_monitor.py +++ b/lib/tfw/components/directory_monitor.py @@ -5,7 +5,8 @@ from functools import wraps from watchdog.events import FileSystemEventHandler as FileSystemWatchdogEventHandler -from tfw.networking import ServerUplinkConnector, Scope +from tfw.networking import Scope +from tfw.event_handlers import TFWServerUplinkConnector from tfw.decorators.rate_limiter import RateLimiter from tfw.mixins.observer_mixin import ObserverMixin @@ -47,7 +48,7 @@ class IdeReloadWatchdogEventHandler(FileSystemWatchdogEventHandler): def __init__(self, ide_key): super().__init__() self.ide_key = ide_key - self.uplink = ServerUplinkConnector() + self.uplink = TFWServerUplinkConnector() self._paused = False self.ignore = 0 diff --git a/lib/tfw/components/frontend_event_handler.py b/lib/tfw/components/frontend_event_handler.py index 73028ba..1a76abe 100644 --- a/lib/tfw/components/frontend_event_handler.py +++ b/lib/tfw/components/frontend_event_handler.py @@ -1,9 +1,10 @@ from abc import ABC, abstractmethod from contextlib import suppress -from tfw.networking.message_sender import MessageSender from tfw.event_handlers import FrontendEventHandlerBase +from .message_sender import MessageSender + class FrontendEventHandler(FrontendEventHandlerBase): def __init__(self): diff --git a/lib/tfw/components/log_monitor.py b/lib/tfw/components/log_monitor.py index b8a0b49..14591e8 100644 --- a/lib/tfw/components/log_monitor.py +++ b/lib/tfw/components/log_monitor.py @@ -6,7 +6,8 @@ from os.path import dirname from watchdog.events import PatternMatchingEventHandler as PatternMatchingWatchdogEventHandler -from tfw.networking import ServerUplinkConnector, Scope +from tfw.networking import Scope +from tfw.event_handlers import TFWServerUplinkConnector from tfw.decorators.rate_limiter import RateLimiter from tfw.mixins.observer_mixin import ObserverMixin from tfw.mixins.supervisor_mixin import SupervisorLogMixin @@ -38,7 +39,7 @@ class SendLogWatchdogEventHandler(PatternMatchingWatchdogEventHandler, Superviso self.procinfo['stdout_logfile'], self.procinfo['stderr_logfile'] ]) - self.uplink = ServerUplinkConnector() + self.uplink = TFWServerUplinkConnector() self.log_tail = log_tail @property diff --git a/lib/tfw/networking/message_sender.py b/lib/tfw/components/message_sender.py similarity index 93% rename from lib/tfw/networking/message_sender.py rename to lib/tfw/components/message_sender.py index 10dd462..4f35baa 100644 --- a/lib/tfw/networking/message_sender.py +++ b/lib/tfw/components/message_sender.py @@ -1,7 +1,7 @@ # Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. -from .server_connector import ServerUplinkConnector +from tfw.event_handlers import TFWServerUplinkConnector class MessageSender: @@ -9,7 +9,7 @@ class MessageSender: Provides mechanisms to send messages to our frontend messaging component. """ def __init__(self): - self.server_connector = ServerUplinkConnector() + self.server_connector = TFWServerUplinkConnector() self.key = 'message' self.queue_key = 'queueMessages' diff --git a/lib/tfw/event_handlers/__init__.py b/lib/tfw/event_handlers/__init__.py index 858b781..0327aef 100644 --- a/lib/tfw/event_handlers/__init__.py +++ b/lib/tfw/event_handlers/__init__.py @@ -5,3 +5,4 @@ from .event_handler_base import EventHandlerBase from .frontend_event_handler_base import FrontendEventHandlerBase from .boradcasting_event_handler import BroadcastingEventHandler from .fsm_aware_event_handler import FSMAwareEventHandler +from .tfw_server_connector import TFWServerUplinkConnector, TFWServerConnector diff --git a/lib/tfw/event_handlers/event_handler_base.py b/lib/tfw/event_handlers/event_handler_base.py index 95300fc..0cc0a9e 100644 --- a/lib/tfw/event_handlers/event_handler_base.py +++ b/lib/tfw/event_handlers/event_handler_base.py @@ -5,9 +5,10 @@ from abc import ABC, abstractmethod from inspect import currentframe from typing import Iterable -from tfw.networking import ServerConnector from tfw.config.logs import logging +from .tfw_server_connector import TFWServerConnector + LOG = logging.getLogger(__name__) @@ -19,7 +20,7 @@ class EventHandlerBase(ABC): Derived classes must implement the handle_event() method """ def __init__(self, key): - self.server_connector = ServerConnector() + self.server_connector = TFWServerConnector() self.keys = [] if isinstance(key, str): self.keys.append(key) diff --git a/lib/tfw/event_handlers/tfw_server_connector.py b/lib/tfw/event_handlers/tfw_server_connector.py new file mode 100644 index 0000000..161a6cb --- /dev/null +++ b/lib/tfw/event_handlers/tfw_server_connector.py @@ -0,0 +1,19 @@ +from functools import partial + +from tfw.networking import ServerUplinkConnector, ServerConnector +from tfw.config import TFWENV + + +UPLINK_CONN_ADDR = f'tcp://localhost:{TFWENV.PULL_PORT}' +DOWNLINK_CONN_ADDR = f'tcp://localhost:{TFWENV.PUB_PORT}' + + +TFWServerUplinkConnector = partial( + ServerUplinkConnector, + connect_addr=UPLINK_CONN_ADDR +) +TFWServerConnector = partial( + ServerConnector, + downlink_connect_addr=DOWNLINK_CONN_ADDR, + uplink_connect_addr=UPLINK_CONN_ADDR +) diff --git a/lib/tfw/networking/__init__.py b/lib/tfw/networking/__init__.py index 7772eb6..49e896d 100644 --- a/lib/tfw/networking/__init__.py +++ b/lib/tfw/networking/__init__.py @@ -4,5 +4,4 @@ from .serialization import serialize_tfw_msg, deserialize_tfw_msg, with_deserialize_tfw_msg, message_bytes from .server_connector import ServerUplinkConnector, ServerDownlinkConnector, ServerConnector from .event_handler_connector import EventHandlerConnector -from .message_sender import MessageSender from .scope import Scope diff --git a/lib/tfw/networking/event_handler_connector.py b/lib/tfw/networking/event_handler_connector.py index bf2c9ae..378db11 100644 --- a/lib/tfw/networking/event_handler_connector.py +++ b/lib/tfw/networking/event_handler_connector.py @@ -4,7 +4,6 @@ import zmq from zmq.eventloop.zmqstream import ZMQStream -from tfw.config import TFWENV from tfw.config.logs import logging from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg @@ -13,13 +12,12 @@ LOG = logging.getLogger(__name__) class EventHandlerDownlinkConnector(): - def __init__(self): + def __init__(self, bind_addr): self._zmq_pull_socket = zmq.Context.instance().socket(zmq.PULL) self._zmq_pull_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket) - address = f'tcp://*:{TFWENV.PULL_PORT}' - self._zmq_pull_socket.bind(address) - LOG.debug('Pull socket bound to %s', address) + self._zmq_pull_socket.bind(bind_addr) + LOG.debug('Pull socket bound to %s', bind_addr) def register_callback(self, callback): callback = with_deserialize_tfw_msg(callback) @@ -30,12 +28,11 @@ class EventHandlerDownlinkConnector(): class EventHandlerUplinkConnector(): - def __init__(self): + def __init__(self, bind_addr): self._zmq_pub_socket = zmq.Context.instance().socket(zmq.PUB) self._zmq_pub_socket.setsockopt(zmq.SNDHWM, 0) - address = f'tcp://*:{TFWENV.PUB_PORT}' - self._zmq_pub_socket.bind(address) - LOG.debug('Pub socket bound to %s', address) + self._zmq_pub_socket.bind(bind_addr) + LOG.debug('Pub socket bound to %s', bind_addr) def send_message(self, message: dict): self._zmq_pub_socket.send_multipart(serialize_tfw_msg(message)) @@ -45,9 +42,9 @@ class EventHandlerUplinkConnector(): class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector): - def __init__(self): - EventHandlerDownlinkConnector.__init__(self) - EventHandlerUplinkConnector.__init__(self) + def __init__(self, downlink_bind_addr, uplink_bind_addr): + EventHandlerDownlinkConnector.__init__(self, downlink_bind_addr) + EventHandlerUplinkConnector.__init__(self, uplink_bind_addr) def close(self): EventHandlerDownlinkConnector.close(self) diff --git a/lib/tfw/networking/server_connector.py b/lib/tfw/networking/server_connector.py index 600cb9a..6bee301 100644 --- a/lib/tfw/networking/server_connector.py +++ b/lib/tfw/networking/server_connector.py @@ -6,7 +6,6 @@ from functools import partial import zmq from zmq.eventloop.zmqstream import ZMQStream -from tfw.config import TFWENV from tfw.config.logs import logging from .scope import Scope @@ -16,9 +15,9 @@ LOG = logging.getLogger(__name__) class ServerDownlinkConnector(): - def __init__(self): + def __init__(self, connect_addr): self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB) - self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUB_PORT}') + self._zmq_sub_socket.connect(connect_addr) self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) @@ -34,9 +33,9 @@ class ServerDownlinkConnector(): class ServerUplinkConnector(): - def __init__(self): + def __init__(self, connect_addr): self._zmq_push_socket = zmq.Context.instance().socket(zmq.PUSH) - self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.PULL_PORT}') + self._zmq_push_socket.connect(connect_addr) self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0) def send_message(self, message, scope=Scope.ZMQ): @@ -47,11 +46,11 @@ class ServerUplinkConnector(): self._zmq_push_socket.close() -class ServerConnector(ServerUplinkConnector, ServerDownlinkConnector): - def __init__(self): - ServerUplinkConnector.__init__(self) - ServerDownlinkConnector.__init__(self) +class ServerConnector(ServerDownlinkConnector, ServerUplinkConnector): + def __init__(self, downlink_connect_addr, uplink_connect_addr): + ServerDownlinkConnector.__init__(self, downlink_connect_addr) + ServerUplinkConnector.__init__(self, uplink_connect_addr) def close(self): - ServerUplinkConnector.close(self) ServerDownlinkConnector.close(self) + ServerUplinkConnector.close(self) diff --git a/lib/tfw/server/tfw_server.py b/lib/tfw/server/tfw_server.py index a4cdd3f..ad2d37e 100644 --- a/lib/tfw/server/tfw_server.py +++ b/lib/tfw/server/tfw_server.py @@ -4,9 +4,10 @@ from tornado.web import Application from tfw.networking import EventHandlerConnector +from tfw.config import TFWENV from tfw.config.logs import logging -from .zmq_websocket_proxy import ZMQWebSocketProxy +from .zmq_websocket_router import ZMQWebSocketRouter LOG = logging.getLogger(__name__) @@ -18,12 +19,15 @@ class TFWServer: SUB socket. """ def __init__(self): - self._event_handler_connector = EventHandlerConnector() + self._event_handler_connector = EventHandlerConnector( + downlink_bind_addr=f'tcp://*:{TFWENV.PULL_PORT}', + uplink_bind_addr=f'tcp://*:{TFWENV.PUB_PORT}' + ) self.application = Application([( - r'/ws', ZMQWebSocketProxy, { + r'/ws', ZMQWebSocketRouter, { 'event_handler_connector': self._event_handler_connector, } )]) - def listen(self, port): - self.application.listen(port) + def listen(self): + self.application.listen(TFWENV.WEB_PORT) diff --git a/lib/tfw/server/zmq_websocket_proxy.py b/lib/tfw/server/zmq_websocket_router.py similarity index 88% rename from lib/tfw/server/zmq_websocket_proxy.py rename to lib/tfw/server/zmq_websocket_router.py index b820f85..d6021b6 100644 --- a/lib/tfw/server/zmq_websocket_proxy.py +++ b/lib/tfw/server/zmq_websocket_router.py @@ -11,7 +11,7 @@ from tfw.config.logs import logging LOG = logging.getLogger(__name__) -class ZMQWebSocketProxy(WebSocketHandler): +class ZMQWebSocketRouter(WebSocketHandler): # pylint: disable=abstract-method instances = set() @@ -22,16 +22,16 @@ class ZMQWebSocketProxy(WebSocketHandler): def send_to_zmq(self, message): self.event_handler_connector.send_message(message) - @staticmethod - def send_to_websockets(message): - for instance in ZMQWebSocketProxy.instances: + @classmethod + def send_to_websockets(cls, message): + for instance in cls.instances: instance.write_message(message) def prepare(self): - ZMQWebSocketProxy.instances.add(self) + type(self).instances.add(self) def on_close(self): - ZMQWebSocketProxy.instances.remove(self) + type(self).instances.remove(self) def open(self, *args, **kwargs): LOG.debug('WebSocket connection initiated!') diff --git a/supervisor/tfw_server.py b/supervisor/tfw_server.py index 4cb8bd6..a3848b6 100644 --- a/supervisor/tfw_server.py +++ b/supervisor/tfw_server.py @@ -1,9 +1,8 @@ from tornado.ioloop import IOLoop from tfw.server import TFWServer -from tfw.config import TFWENV if __name__ == '__main__': - TFWServer().listen(TFWENV.WEB_PORT) + TFWServer().listen() IOLoop.instance().start()