diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..72a218a --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +filterwarnings = ignore::DeprecationWarning:zmq diff --git a/tfw/internals/networking/test_networking.py b/tfw/internals/networking/test_networking.py new file mode 100644 index 0000000..cc23069 --- /dev/null +++ b/tfw/internals/networking/test_networking.py @@ -0,0 +1,106 @@ +# pylint: disable=redefined-outer-name +from os.path import join +from secrets import token_urlsafe +from random import randint +from tempfile import TemporaryDirectory + +import pytest +from tornado.ioloop import IOLoop + +from tfw.internals.networking import EventHandlerConnector, ServerConnector + + +@pytest.fixture +def _connectors(): + with TemporaryDirectory() as tmpdir: + down_sock = join(tmpdir, 'down') + up_sock = join(tmpdir, 'up') + server_downlink = f'ipc://{down_sock}' + server_uplink = f'ipc://{up_sock}' + + ec = EventHandlerConnector(server_downlink, server_uplink) + sc = ServerConnector(server_uplink, server_downlink) + yield ec, sc + sc.close() + ec.close() + + +@pytest.fixture +def eh_connector(_connectors): + eh_connector, _ = _connectors + yield eh_connector + + +@pytest.fixture +def server_connector(_connectors): + _, server_connector = _connectors + yield server_connector + + +def run_ioloop_once(): + # hack: we have to wait for the messages to get through + # the network stack of the OS while the IOLoop is waiting + # for them via select/epoll/kqueue + IOLoop.current().call_later(0.1, IOLoop.current().stop) + IOLoop.current().start() + + +@pytest.fixture +def test_messages(): + random_str = lambda: token_urlsafe(randint(4, 8)) + yield [ + { + 'key': random_str(), + random_str(): randint(8192, 16384), + random_str(): random_str(), + random_str(): { + random_str(): random_str(), + random_str(): {random_str(): random_str()} + }, + random_str(): [random_str(), random_str()] + } + for _ in range(randint(8, 16)) + ] + + +def test_server_downlink(eh_connector, server_connector, test_messages): + messages = [] + eh_connector.register_callback(messages.append) + + for message in test_messages: + server_connector.send_message(message) + + run_ioloop_once() + + assert messages == test_messages + + +def test_server_uplink(eh_connector, server_connector, test_messages): + messages = [] + server_connector.subscribe('') + server_connector.register_callback(messages.append) + + for message in test_messages: + eh_connector.send_message(message) + + run_ioloop_once() + + assert messages == test_messages + + +def test_connector_downlink_subscribe(eh_connector, server_connector): + key1_messages = [{'key': '1', 'data': i} for i in range(randint(128, 256))] + key2_messages = [{'key': '2', 'data': i} for i in range(randint(128, 256))] + all_messages = key1_messages + key2_messages + + messages = [] + server_connector.subscribe('1') + server_connector.register_callback(messages.append) + + for message in all_messages: + eh_connector.send_message(message) + + run_ioloop_once() + + assert messages == key1_messages + assert all((msg not in messages for msg in key2_messages))