baseimage-tutorial-framework/tfw/internals/networking/test_networking.py
2019-07-29 13:02:46 +02:00

107 lines
2.9 KiB
Python

# pylint: disable=redefined-outer-name
from os.path import join
from secrets import token_urlsafe
from random import randint
from tempfile import TemporaryDirectory
import pytest
from tornado.ioloop import IOLoop
from tfw.internals.networking import EventHandlerConnector, ServerConnector
@pytest.fixture
def _connectors():
with TemporaryDirectory() as tmpdir:
down_sock = join(tmpdir, 'down')
up_sock = join(tmpdir, 'up')
server_downlink = f'ipc://{down_sock}'
server_uplink = f'ipc://{up_sock}'
ec = EventHandlerConnector(server_downlink, server_uplink)
sc = ServerConnector(server_uplink, server_downlink)
yield ec, sc
sc.close()
ec.close()
@pytest.fixture
def eh_connector(_connectors):
eh_connector, _ = _connectors
yield eh_connector
@pytest.fixture
def server_connector(_connectors):
_, server_connector = _connectors
yield server_connector
def run_ioloop_once():
# hack: we have to wait for the messages to get through
# the network stack of the OS while the IOLoop is waiting
# for them via select/epoll/kqueue
IOLoop.current().call_later(0.1, IOLoop.current().stop)
IOLoop.current().start()
@pytest.fixture
def test_messages():
random_str = lambda: token_urlsafe(randint(4, 8))
yield [
{
'key': random_str(),
random_str(): randint(8192, 16384),
random_str(): random_str(),
random_str(): {
random_str(): random_str(),
random_str(): {random_str(): random_str()}
},
random_str(): [random_str(), random_str()]
}
for _ in range(randint(8, 16))
]
def test_server_downlink(eh_connector, server_connector, test_messages):
messages = []
eh_connector.register_callback(messages.append)
for message in test_messages:
server_connector.send_message(message)
run_ioloop_once()
assert messages == test_messages
def test_server_uplink(eh_connector, server_connector, test_messages):
messages = []
server_connector.subscribe('')
server_connector.register_callback(messages.append)
for message in test_messages:
eh_connector.send_message(message)
run_ioloop_once()
assert messages == test_messages
def test_connector_downlink_subscribe(eh_connector, server_connector):
key1_messages = [{'key': '1', 'data': i} for i in range(randint(128, 256))]
key2_messages = [{'key': '2', 'data': i} for i in range(randint(128, 256))]
all_messages = key1_messages + key2_messages
messages = []
server_connector.subscribe('1')
server_connector.register_callback(messages.append)
for message in all_messages:
eh_connector.send_message(message)
run_ioloop_once()
assert messages == key1_messages
assert all((msg not in messages for msg in key2_messages))