From 30223fe99ff410a0eaa2d2824cc5eeddbe1d4a96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Tue, 30 Jul 2019 15:17:29 +0200 Subject: [PATCH] =?UTF-8?q?Rename=20ServerConnector=20interface=20to=20Con?= =?UTF-8?q?nector=20=C2=AF\=5F(=E3=83=84)=5F/=C2=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tfw/components/frontend/frontend_handler.py | 4 +- tfw/components/fsm/fsm_handler.py | 4 +- tfw/components/ide/ide_handler.py | 4 +- tfw/components/pipe_io/pipe_io_handler.py | 6 +-- .../log_inotify_observer.py | 6 +-- .../process_management/process_handler.py | 4 +- .../process_management/process_log_handler.py | 4 +- tfw/components/snapshots/snapshot_handler.py | 4 +- tfw/components/terminal/terminal_handler.py | 4 +- tfw/internals/event_handling/event_handler.py | 12 ++--- .../event_handler_factory_base.py | 14 +++--- .../event_handling/fsm_aware_event_handler.py | 6 +-- .../event_handling/test_event_handler.py | 48 +++++++++---------- tfw/internals/networking/__init__.py | 4 +- tfw/internals/networking/test_networking.py | 48 +++++++++---------- .../{server_connector.py => zmq_connector.py} | 14 +++--- ...t_handler_connector.py => zmq_listener.py} | 14 +++--- tfw/internals/server/zmq_websocket_router.py | 6 +-- tfw/main/event_handler_factory.py | 2 +- tfw/main/tfw_connector.py | 6 +-- tfw/main/tfw_server.py | 6 +-- 21 files changed, 110 insertions(+), 110 deletions(-) rename tfw/internals/networking/{server_connector.py => zmq_connector.py} (83%) rename tfw/internals/networking/{event_handler_connector.py => zmq_listener.py} (75%) diff --git a/tfw/components/frontend/frontend_handler.py b/tfw/components/frontend/frontend_handler.py index b5e7c8b..5829ad7 100644 --- a/tfw/components/frontend/frontend_handler.py +++ b/tfw/components/frontend/frontend_handler.py @@ -7,12 +7,12 @@ class FrontendHandler: keys = ['message', 'queueMessages', 'dashboard', 'console'] def __init__(self): - self.server_connector = None + self.connector = None self.keys = [*type(self).keys, 'recover'] self._frontend_message_storage = FrontendMessageStorage(type(self).keys) def send_message(self, message): - self.server_connector.send_message(message, scope=Scope.WEBSOCKET) + self.connector.send_message(message, scope=Scope.WEBSOCKET) def handle_event(self, message, _): self._frontend_message_storage.save_message(message) diff --git a/tfw/components/fsm/fsm_handler.py b/tfw/components/fsm/fsm_handler.py index e86dee2..18302d2 100644 --- a/tfw/components/fsm/fsm_handler.py +++ b/tfw/components/fsm/fsm_handler.py @@ -35,14 +35,14 @@ class FSMHandler: 'update': self.handle_update } - def handle_event(self, message, server_connector): + def handle_event(self, message, connector): try: message = self.command_handlers[message['data']['command']](message) if message: fsm_update_message = self._fsm_updater.fsm_update sign_message(self.auth_key, message) sign_message(self.auth_key, fsm_update_message) - server_connector.send_message(fsm_update_message, Scope.BROADCAST) + connector.send_message(fsm_update_message, Scope.BROADCAST) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) diff --git a/tfw/components/ide/ide_handler.py b/tfw/components/ide/ide_handler.py index 7f09247..38f7842 100644 --- a/tfw/components/ide/ide_handler.py +++ b/tfw/components/ide/ide_handler.py @@ -55,7 +55,7 @@ class IdeHandler: :param selected_file: file that is selected by default :param exclude: list of filenames that should not appear between files (for .o, .pyc, etc.) """ - self.server_connector = None + self.connector = None try: self.filemanager = FileManager( allowed_directories=allowed_directories, @@ -90,7 +90,7 @@ class IdeHandler: }) def send_message(self, message): - self.server_connector.send_message(message, scope=Scope.WEBSOCKET) + self.connector.send_message(message, scope=Scope.WEBSOCKET) def read(self, data): """ diff --git a/tfw/components/pipe_io/pipe_io_handler.py b/tfw/components/pipe_io/pipe_io_handler.py index 115b508..74aabe8 100644 --- a/tfw/components/pipe_io/pipe_io_handler.py +++ b/tfw/components/pipe_io/pipe_io_handler.py @@ -20,7 +20,7 @@ class PipeIOHandlerBase: keys = [''] def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS): - self.server_connector = None + self.connector = None self.pipe_io = CallbackPipeIOServer( in_pipe_path, out_pipe_path, @@ -56,7 +56,7 @@ class PipeIOHandler(PipeIOHandlerBase): def handle_pipe_event(self, message_bytes): json = loads(message_bytes) - self.server_connector.send_message(json) + self.connector.send_message(json) class TransformerPipeIOHandler(PipeIOHandlerBase): @@ -93,7 +93,7 @@ class TransformerPipeIOHandler(PipeIOHandlerBase): transformed_bytes = self._transform_in(message_bytes) if transformed_bytes: json_message = loads(transformed_bytes) - self.server_connector.send_message(json_message) + self.connector.send_message(json_message) class CommandHandler(PipeIOHandler): diff --git a/tfw/components/process_management/log_inotify_observer.py b/tfw/components/process_management/log_inotify_observer.py index 94400ca..cbbd5dc 100644 --- a/tfw/components/process_management/log_inotify_observer.py +++ b/tfw/components/process_management/log_inotify_observer.py @@ -7,9 +7,9 @@ from .supervisor import ProcessLogManager class LogInotifyObserver(InotifyObserver, ProcessLogManager): - def __init__(self, server_connector, supervisor_uri, process_name, log_tail=0): + def __init__(self, connector, supervisor_uri, process_name, log_tail=0): self._prevent_log_recursion() - self._server_connector = server_connector + self._connector = connector self._process_name = process_name self.log_tail = log_tail self._procinfo = None @@ -35,7 +35,7 @@ class LogInotifyObserver(InotifyObserver, ProcessLogManager): self.paths = self._get_logfiles() def on_modified(self, event): - self._server_connector.send_message({ + self._connector.send_message({ 'key': 'processlog', 'data': { 'command': 'new_log', diff --git a/tfw/components/process_management/process_handler.py b/tfw/components/process_management/process_handler.py index becd28d..1c54118 100644 --- a/tfw/components/process_management/process_handler.py +++ b/tfw/components/process_management/process_handler.py @@ -32,7 +32,7 @@ class ProcessHandler(ProcessManager, ProcessLogManager): 'restart': self.restart_process } - def handle_event(self, message, server_connector): + def handle_event(self, message, connector): try: data = message['data'] try: @@ -48,6 +48,6 @@ class ProcessHandler(ProcessManager, ProcessLogManager): data['process_name'], self.log_tail ) - server_connector.send_message(message, scope=Scope.WEBSOCKET) + connector.send_message(message, scope=Scope.WEBSOCKET) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) diff --git a/tfw/components/process_management/process_log_handler.py b/tfw/components/process_management/process_log_handler.py index a7ed020..8386120 100644 --- a/tfw/components/process_management/process_log_handler.py +++ b/tfw/components/process_management/process_log_handler.py @@ -17,7 +17,7 @@ class ProcessLogHandler: The API of each command is documented in their respective handler. """ def __init__(self, *, process_name, supervisor_uri, log_tail=0): - self.server_connector = None + self.connector = None self.process_name = process_name self._supervisor_uri = supervisor_uri self._initial_log_tail = log_tail @@ -30,7 +30,7 @@ class ProcessLogHandler: def start(self): self._monitor = LogInotifyObserver( - server_connector=self.server_connector, + connector=self.connector, supervisor_uri=self._supervisor_uri, process_name=self.process_name, log_tail=self._initial_log_tail diff --git a/tfw/components/snapshots/snapshot_handler.py b/tfw/components/snapshots/snapshot_handler.py index c60c720..f992b07 100644 --- a/tfw/components/snapshots/snapshot_handler.py +++ b/tfw/components/snapshots/snapshot_handler.py @@ -45,11 +45,11 @@ class SnapshotHandler: makedirs(git_dir, exist_ok=True) return git_dir - def handle_event(self, message, server_connector): + def handle_event(self, message, connector): try: data = message['data'] message['data'] = self.command_handlers[data['command']](data) - server_connector.send_message(message, scope=Scope.WEBSOCKET) + connector.send_message(message, scope=Scope.WEBSOCKET) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) diff --git a/tfw/components/terminal/terminal_handler.py b/tfw/components/terminal/terminal_handler.py index a0a5f0c..c8df229 100644 --- a/tfw/components/terminal/terminal_handler.py +++ b/tfw/components/terminal/terminal_handler.py @@ -22,7 +22,7 @@ class TerminalHandler: :param key: key this EventHandler listens to :param monitor: tfw.components.HistoryMonitor instance to read command history from """ - self.server_connector = None + self.connector = None self._histfile = histfile self._historymonitor = None bash_as_user_cmd = ['sudo', '-u', user, 'bash'] @@ -42,7 +42,7 @@ class TerminalHandler: self.terminado_server.listen() def start(self): - self._historymonitor = BashMonitor(self.server_connector, self._histfile) + self._historymonitor = BashMonitor(self.connector, self._histfile) self._historymonitor.start() @property diff --git a/tfw/internals/event_handling/event_handler.py b/tfw/internals/event_handling/event_handler.py index a75381e..f288042 100644 --- a/tfw/internals/event_handling/event_handler.py +++ b/tfw/internals/event_handling/event_handler.py @@ -1,17 +1,17 @@ class EventHandler: _instances = set() - def __init__(self, server_connector): + def __init__(self, connector): type(self)._instances.add(self) - self.server_connector = server_connector + self.connector = connector def start(self): - self.server_connector.register_callback(self._event_callback) + self.connector.register_callback(self._event_callback) def _event_callback(self, message): - self.handle_event(message, self.server_connector) + self.handle_event(message, self.connector) - def handle_event(self, message, server_connector): + def handle_event(self, message, connector): raise NotImplementedError() @classmethod @@ -20,7 +20,7 @@ class EventHandler: instance.stop() def stop(self): - self.server_connector.close() + self.connector.close() self.cleanup() def cleanup(self): diff --git a/tfw/internals/event_handling/event_handler_factory_base.py b/tfw/internals/event_handling/event_handler_factory_base.py index 4c4adcd..b62d9f1 100644 --- a/tfw/internals/event_handling/event_handler_factory_base.py +++ b/tfw/internals/event_handling/event_handler_factory_base.py @@ -6,15 +6,15 @@ from .event_handler import EventHandler class EventHandlerFactoryBase: def build(self, handler_stub, *, keys=None, event_handler_type=EventHandler): builder = EventHandlerBuilder(handler_stub, keys, event_handler_type) - server_connector = self._build_server_connector() - event_handler = builder.build(server_connector) - handler_stub.server_connector = server_connector + connector = self._build_connector() + event_handler = builder.build(connector) + handler_stub.connector = connector with suppress(AttributeError): handler_stub.start() event_handler.start() return event_handler - def _build_server_connector(self): + def _build_connector(self): raise NotImplementedError() @@ -23,9 +23,9 @@ class EventHandlerBuilder: self._analyzer = HandlerStubAnalyzer(event_handler, supplied_keys) self._event_handler_type = event_handler_type - def build(self, server_connector): - event_handler = self._event_handler_type(server_connector) - server_connector.subscribe(*self._try_get_keys(event_handler)) + def build(self, connector): + event_handler = self._event_handler_type(connector) + connector.subscribe(*self._try_get_keys(event_handler)) event_handler.handle_event = self._analyzer.handle_event with suppress(AttributeError): event_handler.cleanup = self._analyzer.cleanup diff --git a/tfw/internals/event_handling/fsm_aware_event_handler.py b/tfw/internals/event_handling/fsm_aware_event_handler.py index 966d4d4..a128de8 100644 --- a/tfw/internals/event_handling/fsm_aware_event_handler.py +++ b/tfw/internals/event_handling/fsm_aware_event_handler.py @@ -8,12 +8,12 @@ class FSMAwareEventHandler(EventHandler, FSMAware): Abstract base class for EventHandlers which automatically keep track of the state of the TFW FSM. """ - def __init__(self, server_connector): - EventHandler.__init__(self, server_connector) + def __init__(self, connector): + EventHandler.__init__(self, connector) FSMAware.__init__(self) def _event_callback(self, message): self.process_message(message) def handle_fsm_step(self, message): - self.handle_event(message, self.server_connector) + self.handle_event(message, self.connector) diff --git a/tfw/internals/event_handling/test_event_handler.py b/tfw/internals/event_handling/test_event_handler.py index 049bb78..03f4141 100644 --- a/tfw/internals/event_handling/test_event_handler.py +++ b/tfw/internals/event_handling/test_event_handler.py @@ -9,11 +9,11 @@ from .event_handler import EventHandler class MockEventHandlerFactory(EventHandlerFactoryBase): - def _build_server_connector(self): - return MockServerConnector() + def _build_connector(self): + return MockConnector() -class MockServerConnector: +class MockConnector: def __init__(self): self.keys = [] self._on_message = None @@ -40,7 +40,7 @@ class MockServerConnector: class MockEventHandlerStub: def __init__(self): - self.server_connector = None + self.connector = None self.last_message = None self.cleaned_up = False self.started = False @@ -54,12 +54,12 @@ class MockEventHandlerStub: class MockEventHandler(MockEventHandlerStub): # pylint: disable=unused-argument - def handle_event(self, message, server_connector): + def handle_event(self, message, connector): self.last_message = message class MockCallable(MockEventHandlerStub): - def __call__(self, message, server_connector): + def __call__(self, message, connector): self.last_message = message @@ -78,17 +78,17 @@ def test_keys(): def test_build_from_object(test_keys, test_msg): mock_eh = MockEventHandlerStub() - def handle_event(message, server_connector): - raise RuntimeError(message, server_connector.keys) + def handle_event(message, connector): + raise RuntimeError(message, connector.keys) mock_eh.handle_event = handle_event assert not mock_eh.started eh = MockEventHandlerFactory().build(mock_eh, keys=test_keys) assert mock_eh.started - assert mock_eh.server_connector is eh.server_connector + assert mock_eh.connector is eh.connector with pytest.raises(RuntimeError) as err: - eh.server_connector.simulate_message(test_msg) + eh.connector.simulate_message(test_msg) msg, keys = err.args assert msg == test_msg assert keys == test_keys @@ -104,11 +104,11 @@ def test_build_from_object_with_keys(test_keys, test_msg): assert not mock_eh.started eh = MockEventHandlerFactory().build(mock_eh) - assert mock_eh.server_connector.keys == test_keys - assert eh.server_connector is mock_eh.server_connector + assert mock_eh.connector.keys == test_keys + assert eh.connector is mock_eh.connector assert mock_eh.started assert not mock_eh.last_message - eh.server_connector.simulate_message(test_msg) + eh.connector.simulate_message(test_msg) assert mock_eh.last_message == test_msg assert not mock_eh.cleaned_up EventHandler.stop_all_instances() @@ -118,14 +118,14 @@ def test_build_from_object_with_keys(test_keys, test_msg): def test_build_from_simple_object(test_keys, test_msg): class SimpleMockEventHandler: # pylint: disable=no-self-use - def handle_event(self, message, server_connector): - raise RuntimeError(message, server_connector) + def handle_event(self, message, connector): + raise RuntimeError(message, connector) mock_eh = SimpleMockEventHandler() eh = MockEventHandlerFactory().build(mock_eh, keys=test_keys) with pytest.raises(RuntimeError) as err: - eh.server_connector.simulate_message(test_msg) + eh.connector.simulate_message(test_msg) msg, keys = err.args assert msg == test_msg assert keys == test_keys @@ -138,10 +138,10 @@ def test_build_from_callable(test_keys, test_msg): eh = MockEventHandlerFactory().build(mock_eh, keys=test_keys) assert mock_eh.started - assert mock_eh.server_connector is eh.server_connector - assert eh.server_connector.keys == test_keys + assert mock_eh.connector is eh.connector + assert eh.connector.keys == test_keys assert not mock_eh.last_message - eh.server_connector.simulate_message(test_msg) + eh.connector.simulate_message(test_msg) assert mock_eh.last_message == test_msg assert not mock_eh.cleaned_up eh.stop() @@ -149,13 +149,13 @@ def test_build_from_callable(test_keys, test_msg): def test_build_from_function(test_keys, test_msg): - def some_function(message, server_connector): - raise RuntimeError(message, server_connector.keys) + def some_function(message, connector): + raise RuntimeError(message, connector.keys) eh = MockEventHandlerFactory().build(some_function, keys=test_keys) - assert eh.server_connector.keys == test_keys + assert eh.connector.keys == test_keys with pytest.raises(RuntimeError) as err: - eh.server_connector.simulate_message(test_msg) + eh.connector.simulate_message(test_msg) msg, keys = err.args assert msg == test_msg assert keys == test_keys @@ -166,7 +166,7 @@ def test_build_from_lambda(test_keys, test_msg): assert msg == test_msg fun = lambda msg, sc: assert_messages_equal(msg) eh = MockEventHandlerFactory().build(fun, keys=test_keys) - eh.server_connector.simulate_message(test_msg) + eh.connector.simulate_message(test_msg) def test_build_raises_if_no_key(test_keys): diff --git a/tfw/internals/networking/__init__.py b/tfw/internals/networking/__init__.py index 62766cd..dca7b24 100644 --- a/tfw/internals/networking/__init__.py +++ b/tfw/internals/networking/__init__.py @@ -1,4 +1,4 @@ from .serialization import serialize_tfw_msg, deserialize_tfw_msg, with_deserialize_tfw_msg, message_bytes -from .server_connector import ServerUplinkConnector, ServerDownlinkConnector, ServerConnector -from .event_handler_connector import EventHandlerConnector +from .zmq_connector import ZMQConnector, ZMQDownlinkConnector, ZMQUplinkConnector +from .zmq_listener import ZMQListener from .scope import Scope diff --git a/tfw/internals/networking/test_networking.py b/tfw/internals/networking/test_networking.py index cc23069..9bd35af 100644 --- a/tfw/internals/networking/test_networking.py +++ b/tfw/internals/networking/test_networking.py @@ -7,34 +7,34 @@ from tempfile import TemporaryDirectory import pytest from tornado.ioloop import IOLoop -from tfw.internals.networking import EventHandlerConnector, ServerConnector +from tfw.internals.networking import ZMQListener, ZMQConnector @pytest.fixture -def _connectors(): +def _listener_and_connector(): with TemporaryDirectory() as tmpdir: down_sock = join(tmpdir, 'down') up_sock = join(tmpdir, 'up') server_downlink = f'ipc://{down_sock}' server_uplink = f'ipc://{up_sock}' - ec = EventHandlerConnector(server_downlink, server_uplink) - sc = ServerConnector(server_uplink, server_downlink) - yield ec, sc - sc.close() - ec.close() + listener = ZMQListener(server_downlink, server_uplink) + connector = ZMQConnector(server_uplink, server_downlink) + yield listener, connector + listener.close() + connector.close() @pytest.fixture -def eh_connector(_connectors): - eh_connector, _ = _connectors - yield eh_connector +def zmq_listener(_listener_and_connector): + listener, _ = _listener_and_connector + yield listener @pytest.fixture -def server_connector(_connectors): - _, server_connector = _connectors - yield server_connector +def zmq_connector(_listener_and_connector): + _, connector = _listener_and_connector + yield connector def run_ioloop_once(): @@ -63,42 +63,42 @@ def test_messages(): ] -def test_server_downlink(eh_connector, server_connector, test_messages): +def test_server_downlink(zmq_listener, zmq_connector, test_messages): messages = [] - eh_connector.register_callback(messages.append) + zmq_listener.register_callback(messages.append) for message in test_messages: - server_connector.send_message(message) + zmq_connector.send_message(message) run_ioloop_once() assert messages == test_messages -def test_server_uplink(eh_connector, server_connector, test_messages): +def test_server_uplink(zmq_listener, zmq_connector, test_messages): messages = [] - server_connector.subscribe('') - server_connector.register_callback(messages.append) + zmq_connector.subscribe('') + zmq_connector.register_callback(messages.append) for message in test_messages: - eh_connector.send_message(message) + zmq_listener.send_message(message) run_ioloop_once() assert messages == test_messages -def test_connector_downlink_subscribe(eh_connector, server_connector): +def test_connector_downlink_subscribe(zmq_listener, zmq_connector): key1_messages = [{'key': '1', 'data': i} for i in range(randint(128, 256))] key2_messages = [{'key': '2', 'data': i} for i in range(randint(128, 256))] all_messages = key1_messages + key2_messages messages = [] - server_connector.subscribe('1') - server_connector.register_callback(messages.append) + zmq_connector.subscribe('1') + zmq_connector.register_callback(messages.append) for message in all_messages: - eh_connector.send_message(message) + zmq_listener.send_message(message) run_ioloop_once() diff --git a/tfw/internals/networking/server_connector.py b/tfw/internals/networking/zmq_connector.py similarity index 83% rename from tfw/internals/networking/server_connector.py rename to tfw/internals/networking/zmq_connector.py index 0ab1644..a701c99 100644 --- a/tfw/internals/networking/server_connector.py +++ b/tfw/internals/networking/zmq_connector.py @@ -9,7 +9,7 @@ from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg LOG = logging.getLogger(__name__) -class ServerDownlinkConnector: +class ZMQDownlinkConnector: def __init__(self, connect_addr): self.keys = [] self._on_recv_callback = None @@ -41,7 +41,7 @@ class ServerDownlinkConnector: self._zmq_sub_stream.close() -class ServerUplinkConnector: +class ZMQUplinkConnector: def __init__(self, connect_addr): self._zmq_push_socket = zmq.Context.instance().socket(zmq.PUSH) self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0) @@ -55,11 +55,11 @@ class ServerUplinkConnector: self._zmq_push_socket.close() -class ServerConnector(ServerDownlinkConnector, ServerUplinkConnector): +class ZMQConnector(ZMQDownlinkConnector, ZMQUplinkConnector): def __init__(self, downlink_connect_addr, uplink_connect_addr): - ServerDownlinkConnector.__init__(self, downlink_connect_addr) - ServerUplinkConnector.__init__(self, uplink_connect_addr) + ZMQDownlinkConnector.__init__(self, downlink_connect_addr) + ZMQUplinkConnector.__init__(self, uplink_connect_addr) def close(self): - ServerDownlinkConnector.close(self) - ServerUplinkConnector.close(self) + ZMQDownlinkConnector.close(self) + ZMQUplinkConnector.close(self) diff --git a/tfw/internals/networking/event_handler_connector.py b/tfw/internals/networking/zmq_listener.py similarity index 75% rename from tfw/internals/networking/event_handler_connector.py rename to tfw/internals/networking/zmq_listener.py index 74b6270..a46fb5a 100644 --- a/tfw/internals/networking/event_handler_connector.py +++ b/tfw/internals/networking/zmq_listener.py @@ -8,7 +8,7 @@ from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg LOG = logging.getLogger(__name__) -class EventHandlerDownlinkConnector: +class ZMQDownlinkListener: def __init__(self, bind_addr): self._zmq_pull_socket = zmq.Context.instance().socket(zmq.PULL) self._zmq_pull_socket.setsockopt(zmq.RCVHWM, 0) @@ -24,7 +24,7 @@ class EventHandlerDownlinkConnector: self._zmq_pull_stream.close() -class EventHandlerUplinkConnector: +class ZMQUplinkListener: def __init__(self, bind_addr): self._zmq_pub_socket = zmq.Context.instance().socket(zmq.PUB) self._zmq_pub_socket.setsockopt(zmq.SNDHWM, 0) @@ -38,11 +38,11 @@ class EventHandlerUplinkConnector: self._zmq_pub_socket.close() -class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector): +class ZMQListener(ZMQDownlinkListener, ZMQUplinkListener): def __init__(self, downlink_bind_addr, uplink_bind_addr): - EventHandlerDownlinkConnector.__init__(self, downlink_bind_addr) - EventHandlerUplinkConnector.__init__(self, uplink_bind_addr) + ZMQDownlinkListener.__init__(self, downlink_bind_addr) + ZMQUplinkListener.__init__(self, uplink_bind_addr) def close(self): - EventHandlerDownlinkConnector.close(self) - EventHandlerUplinkConnector.close(self) + ZMQDownlinkListener.close(self) + ZMQUplinkListener.close(self) diff --git a/tfw/internals/server/zmq_websocket_router.py b/tfw/internals/server/zmq_websocket_router.py index 3f5e7fc..7d8b3f5 100644 --- a/tfw/internals/server/zmq_websocket_router.py +++ b/tfw/internals/server/zmq_websocket_router.py @@ -13,11 +13,11 @@ class ZMQWebSocketRouter(WebSocketHandler): instances = set() def initialize(self, **kwargs): - self.event_handler_connector = kwargs['event_handler_connector'] + self._listener = kwargs['listener'] self.tfw_router = TFWRouter(self.send_to_zmq, self.send_to_websockets) def send_to_zmq(self, message): - self.event_handler_connector.send_message(message) + self._listener.send_message(message) @classmethod def send_to_websockets(cls, message): @@ -32,7 +32,7 @@ class ZMQWebSocketRouter(WebSocketHandler): def open(self, *args, **kwargs): LOG.debug('WebSocket connection initiated!') - self.event_handler_connector.register_callback(self.zmq_callback) + self._listener.register_callback(self.zmq_callback) def zmq_callback(self, message): LOG.debug('Received on ZMQ pull socket: %s', message) diff --git a/tfw/main/event_handler_factory.py b/tfw/main/event_handler_factory.py index c2c4045..b913898 100644 --- a/tfw/main/event_handler_factory.py +++ b/tfw/main/event_handler_factory.py @@ -4,5 +4,5 @@ from .tfw_connector import TFWConnector class EventHandlerFactory(EventHandlerFactoryBase): - def _build_server_connector(self): + def _build_connector(self): return TFWConnector() diff --git a/tfw/main/tfw_connector.py b/tfw/main/tfw_connector.py index 5324ee6..be21dab 100644 --- a/tfw/main/tfw_connector.py +++ b/tfw/main/tfw_connector.py @@ -1,4 +1,4 @@ -from tfw.internals.networking import ServerConnector, ServerUplinkConnector +from tfw.internals.networking import ZMQConnector, ZMQUplinkConnector from tfw.config import TFWENV @@ -12,12 +12,12 @@ class ConnAddrMixin: return f'tcp://localhost:{TFWENV.PUB_PORT}' -class TFWUplinkConnector(ServerUplinkConnector, ConnAddrMixin): +class TFWUplinkConnector(ZMQUplinkConnector, ConnAddrMixin): def __init__(self): super().__init__(self.uplink_conn_addr) -class TFWConnector(ServerConnector, ConnAddrMixin): +class TFWConnector(ZMQConnector, ConnAddrMixin): def __init__(self): super().__init__( self.downlink_conn_addr, diff --git a/tfw/main/tfw_server.py b/tfw/main/tfw_server.py index 4cc1c5e..1706417 100644 --- a/tfw/main/tfw_server.py +++ b/tfw/main/tfw_server.py @@ -2,7 +2,7 @@ import logging from tornado.web import Application -from tfw.internals.networking import EventHandlerConnector +from tfw.internals.networking import ZMQListener from tfw.internals.server import ZMQWebSocketRouter from tfw.config import TFWENV @@ -16,13 +16,13 @@ class TFWServer: SUB socket. """ def __init__(self): - self._event_handler_connector = EventHandlerConnector( + self._listener = ZMQListener( downlink_bind_addr=f'tcp://*:{TFWENV.PULL_PORT}', uplink_bind_addr=f'tcp://*:{TFWENV.PUB_PORT}' ) self.application = Application([( r'/ws', ZMQWebSocketRouter, { - 'event_handler_connector': self._event_handler_connector, + 'listener': self._listener, } )])