Improve module dependencies by moving port envvars out of tfw.networking

This commit is contained in:
Kristóf Tóth 2019-06-04 13:58:03 +02:00
parent f151ecfbac
commit c8e98af516
14 changed files with 68 additions and 45 deletions

View File

@ -15,3 +15,4 @@ from .pipe_io_event_handler import TransformerPipeIOEventHandler, CommandEventHa
from .directory_snapshotting_event_handler import DirectorySnapshottingEventHandler from .directory_snapshotting_event_handler import DirectorySnapshottingEventHandler
from .commands_equal import CommandsEqual from .commands_equal import CommandsEqual
from .frontend_event_handler import FrontendEventHandler from .frontend_event_handler import FrontendEventHandler
from .message_sender import MessageSender

View File

@ -5,7 +5,8 @@ from functools import wraps
from watchdog.events import FileSystemEventHandler as FileSystemWatchdogEventHandler from watchdog.events import FileSystemEventHandler as FileSystemWatchdogEventHandler
from tfw.networking import ServerUplinkConnector, Scope from tfw.networking import Scope
from tfw.event_handlers import TFWServerUplinkConnector
from tfw.decorators.rate_limiter import RateLimiter from tfw.decorators.rate_limiter import RateLimiter
from tfw.mixins.observer_mixin import ObserverMixin from tfw.mixins.observer_mixin import ObserverMixin
@ -47,7 +48,7 @@ class IdeReloadWatchdogEventHandler(FileSystemWatchdogEventHandler):
def __init__(self, ide_key): def __init__(self, ide_key):
super().__init__() super().__init__()
self.ide_key = ide_key self.ide_key = ide_key
self.uplink = ServerUplinkConnector() self.uplink = TFWServerUplinkConnector()
self._paused = False self._paused = False
self.ignore = 0 self.ignore = 0

View File

@ -1,9 +1,10 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from contextlib import suppress from contextlib import suppress
from tfw.networking.message_sender import MessageSender
from tfw.event_handlers import FrontendEventHandlerBase from tfw.event_handlers import FrontendEventHandlerBase
from .message_sender import MessageSender
class FrontendEventHandler(FrontendEventHandlerBase): class FrontendEventHandler(FrontendEventHandlerBase):
def __init__(self): def __init__(self):

View File

@ -6,7 +6,8 @@ from os.path import dirname
from watchdog.events import PatternMatchingEventHandler as PatternMatchingWatchdogEventHandler from watchdog.events import PatternMatchingEventHandler as PatternMatchingWatchdogEventHandler
from tfw.networking import ServerUplinkConnector, Scope from tfw.networking import Scope
from tfw.event_handlers import TFWServerUplinkConnector
from tfw.decorators.rate_limiter import RateLimiter from tfw.decorators.rate_limiter import RateLimiter
from tfw.mixins.observer_mixin import ObserverMixin from tfw.mixins.observer_mixin import ObserverMixin
from tfw.mixins.supervisor_mixin import SupervisorLogMixin from tfw.mixins.supervisor_mixin import SupervisorLogMixin
@ -38,7 +39,7 @@ class SendLogWatchdogEventHandler(PatternMatchingWatchdogEventHandler, Superviso
self.procinfo['stdout_logfile'], self.procinfo['stdout_logfile'],
self.procinfo['stderr_logfile'] self.procinfo['stderr_logfile']
]) ])
self.uplink = ServerUplinkConnector() self.uplink = TFWServerUplinkConnector()
self.log_tail = log_tail self.log_tail = log_tail
@property @property

View File

@ -1,7 +1,7 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details. # All Rights Reserved. See LICENSE file for details.
from .server_connector import ServerUplinkConnector from tfw.event_handlers import TFWServerUplinkConnector
class MessageSender: class MessageSender:
@ -9,7 +9,7 @@ class MessageSender:
Provides mechanisms to send messages to our frontend messaging component. Provides mechanisms to send messages to our frontend messaging component.
""" """
def __init__(self): def __init__(self):
self.server_connector = ServerUplinkConnector() self.server_connector = TFWServerUplinkConnector()
self.key = 'message' self.key = 'message'
self.queue_key = 'queueMessages' self.queue_key = 'queueMessages'

View File

@ -5,3 +5,4 @@ from .event_handler_base import EventHandlerBase
from .frontend_event_handler_base import FrontendEventHandlerBase from .frontend_event_handler_base import FrontendEventHandlerBase
from .boradcasting_event_handler import BroadcastingEventHandler from .boradcasting_event_handler import BroadcastingEventHandler
from .fsm_aware_event_handler import FSMAwareEventHandler from .fsm_aware_event_handler import FSMAwareEventHandler
from .tfw_server_connector import TFWServerUplinkConnector, TFWServerConnector

View File

@ -5,9 +5,10 @@ from abc import ABC, abstractmethod
from inspect import currentframe from inspect import currentframe
from typing import Iterable from typing import Iterable
from tfw.networking import ServerConnector
from tfw.config.logs import logging from tfw.config.logs import logging
from .tfw_server_connector import TFWServerConnector
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -19,7 +20,7 @@ class EventHandlerBase(ABC):
Derived classes must implement the handle_event() method Derived classes must implement the handle_event() method
""" """
def __init__(self, key): def __init__(self, key):
self.server_connector = ServerConnector() self.server_connector = TFWServerConnector()
self.keys = [] self.keys = []
if isinstance(key, str): if isinstance(key, str):
self.keys.append(key) self.keys.append(key)

View File

@ -0,0 +1,19 @@
from functools import partial
from tfw.networking import ServerUplinkConnector, ServerConnector
from tfw.config import TFWENV
UPLINK_CONN_ADDR = f'tcp://localhost:{TFWENV.PULL_PORT}'
DOWNLINK_CONN_ADDR = f'tcp://localhost:{TFWENV.PUB_PORT}'
TFWServerUplinkConnector = partial(
ServerUplinkConnector,
connect_addr=UPLINK_CONN_ADDR
)
TFWServerConnector = partial(
ServerConnector,
downlink_connect_addr=DOWNLINK_CONN_ADDR,
uplink_connect_addr=UPLINK_CONN_ADDR
)

View File

@ -4,5 +4,4 @@
from .serialization import serialize_tfw_msg, deserialize_tfw_msg, with_deserialize_tfw_msg, message_bytes from .serialization import serialize_tfw_msg, deserialize_tfw_msg, with_deserialize_tfw_msg, message_bytes
from .server_connector import ServerUplinkConnector, ServerDownlinkConnector, ServerConnector from .server_connector import ServerUplinkConnector, ServerDownlinkConnector, ServerConnector
from .event_handler_connector import EventHandlerConnector from .event_handler_connector import EventHandlerConnector
from .message_sender import MessageSender
from .scope import Scope from .scope import Scope

View File

@ -4,7 +4,6 @@
import zmq import zmq
from zmq.eventloop.zmqstream import ZMQStream from zmq.eventloop.zmqstream import ZMQStream
from tfw.config import TFWENV
from tfw.config.logs import logging from tfw.config.logs import logging
from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg
@ -13,13 +12,12 @@ LOG = logging.getLogger(__name__)
class EventHandlerDownlinkConnector(): class EventHandlerDownlinkConnector():
def __init__(self): def __init__(self, bind_addr):
self._zmq_pull_socket = zmq.Context.instance().socket(zmq.PULL) self._zmq_pull_socket = zmq.Context.instance().socket(zmq.PULL)
self._zmq_pull_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_pull_socket.setsockopt(zmq.RCVHWM, 0)
self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket) self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket)
address = f'tcp://*:{TFWENV.PULL_PORT}' self._zmq_pull_socket.bind(bind_addr)
self._zmq_pull_socket.bind(address) LOG.debug('Pull socket bound to %s', bind_addr)
LOG.debug('Pull socket bound to %s', address)
def register_callback(self, callback): def register_callback(self, callback):
callback = with_deserialize_tfw_msg(callback) callback = with_deserialize_tfw_msg(callback)
@ -30,12 +28,11 @@ class EventHandlerDownlinkConnector():
class EventHandlerUplinkConnector(): class EventHandlerUplinkConnector():
def __init__(self): def __init__(self, bind_addr):
self._zmq_pub_socket = zmq.Context.instance().socket(zmq.PUB) self._zmq_pub_socket = zmq.Context.instance().socket(zmq.PUB)
self._zmq_pub_socket.setsockopt(zmq.SNDHWM, 0) self._zmq_pub_socket.setsockopt(zmq.SNDHWM, 0)
address = f'tcp://*:{TFWENV.PUB_PORT}' self._zmq_pub_socket.bind(bind_addr)
self._zmq_pub_socket.bind(address) LOG.debug('Pub socket bound to %s', bind_addr)
LOG.debug('Pub socket bound to %s', address)
def send_message(self, message: dict): def send_message(self, message: dict):
self._zmq_pub_socket.send_multipart(serialize_tfw_msg(message)) self._zmq_pub_socket.send_multipart(serialize_tfw_msg(message))
@ -45,9 +42,9 @@ class EventHandlerUplinkConnector():
class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector): class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector):
def __init__(self): def __init__(self, downlink_bind_addr, uplink_bind_addr):
EventHandlerDownlinkConnector.__init__(self) EventHandlerDownlinkConnector.__init__(self, downlink_bind_addr)
EventHandlerUplinkConnector.__init__(self) EventHandlerUplinkConnector.__init__(self, uplink_bind_addr)
def close(self): def close(self):
EventHandlerDownlinkConnector.close(self) EventHandlerDownlinkConnector.close(self)

View File

@ -6,7 +6,6 @@ from functools import partial
import zmq import zmq
from zmq.eventloop.zmqstream import ZMQStream from zmq.eventloop.zmqstream import ZMQStream
from tfw.config import TFWENV
from tfw.config.logs import logging from tfw.config.logs import logging
from .scope import Scope from .scope import Scope
@ -16,9 +15,9 @@ LOG = logging.getLogger(__name__)
class ServerDownlinkConnector(): class ServerDownlinkConnector():
def __init__(self): def __init__(self, connect_addr):
self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB) self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB)
self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUB_PORT}') self._zmq_sub_socket.connect(connect_addr)
self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0)
self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket)
@ -34,9 +33,9 @@ class ServerDownlinkConnector():
class ServerUplinkConnector(): class ServerUplinkConnector():
def __init__(self): def __init__(self, connect_addr):
self._zmq_push_socket = zmq.Context.instance().socket(zmq.PUSH) self._zmq_push_socket = zmq.Context.instance().socket(zmq.PUSH)
self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.PULL_PORT}') self._zmq_push_socket.connect(connect_addr)
self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0) self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0)
def send_message(self, message, scope=Scope.ZMQ): def send_message(self, message, scope=Scope.ZMQ):
@ -47,11 +46,11 @@ class ServerUplinkConnector():
self._zmq_push_socket.close() self._zmq_push_socket.close()
class ServerConnector(ServerUplinkConnector, ServerDownlinkConnector): class ServerConnector(ServerDownlinkConnector, ServerUplinkConnector):
def __init__(self): def __init__(self, downlink_connect_addr, uplink_connect_addr):
ServerUplinkConnector.__init__(self) ServerDownlinkConnector.__init__(self, downlink_connect_addr)
ServerDownlinkConnector.__init__(self) ServerUplinkConnector.__init__(self, uplink_connect_addr)
def close(self): def close(self):
ServerUplinkConnector.close(self)
ServerDownlinkConnector.close(self) ServerDownlinkConnector.close(self)
ServerUplinkConnector.close(self)

View File

@ -4,9 +4,10 @@
from tornado.web import Application from tornado.web import Application
from tfw.networking import EventHandlerConnector from tfw.networking import EventHandlerConnector
from tfw.config import TFWENV
from tfw.config.logs import logging from tfw.config.logs import logging
from .zmq_websocket_proxy import ZMQWebSocketProxy from .zmq_websocket_router import ZMQWebSocketRouter
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -18,12 +19,15 @@ class TFWServer:
SUB socket. SUB socket.
""" """
def __init__(self): def __init__(self):
self._event_handler_connector = EventHandlerConnector() self._event_handler_connector = EventHandlerConnector(
downlink_bind_addr=f'tcp://*:{TFWENV.PULL_PORT}',
uplink_bind_addr=f'tcp://*:{TFWENV.PUB_PORT}'
)
self.application = Application([( self.application = Application([(
r'/ws', ZMQWebSocketProxy, { r'/ws', ZMQWebSocketRouter, {
'event_handler_connector': self._event_handler_connector, 'event_handler_connector': self._event_handler_connector,
} }
)]) )])
def listen(self, port): def listen(self):
self.application.listen(port) self.application.listen(TFWENV.WEB_PORT)

View File

@ -11,7 +11,7 @@ from tfw.config.logs import logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class ZMQWebSocketProxy(WebSocketHandler): class ZMQWebSocketRouter(WebSocketHandler):
# pylint: disable=abstract-method # pylint: disable=abstract-method
instances = set() instances = set()
@ -22,16 +22,16 @@ class ZMQWebSocketProxy(WebSocketHandler):
def send_to_zmq(self, message): def send_to_zmq(self, message):
self.event_handler_connector.send_message(message) self.event_handler_connector.send_message(message)
@staticmethod @classmethod
def send_to_websockets(message): def send_to_websockets(cls, message):
for instance in ZMQWebSocketProxy.instances: for instance in cls.instances:
instance.write_message(message) instance.write_message(message)
def prepare(self): def prepare(self):
ZMQWebSocketProxy.instances.add(self) type(self).instances.add(self)
def on_close(self): def on_close(self):
ZMQWebSocketProxy.instances.remove(self) type(self).instances.remove(self)
def open(self, *args, **kwargs): def open(self, *args, **kwargs):
LOG.debug('WebSocket connection initiated!') LOG.debug('WebSocket connection initiated!')

View File

@ -1,9 +1,8 @@
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
from tfw.server import TFWServer from tfw.server import TFWServer
from tfw.config import TFWENV
if __name__ == '__main__': if __name__ == '__main__':
TFWServer().listen(TFWENV.WEB_PORT) TFWServer().listen()
IOLoop.instance().start() IOLoop.instance().start()