baseimage-tutorial-framework/tfw/components/frontend/message_queue_handler/test_message_queue.py

109 lines
3.3 KiB
Python

# pylint: disable=redefined-outer-name
from time import time
from os import urandom
from random import randint
from queue import Queue
import pytest
from .message_queue_handler import MessageQueueHandler
class MockConnector:
def __init__(self):
self.callback = None
self.messages = Queue()
self.send_times = Queue()
def raise_event(self, message):
self.callback(message, self)
def send_message(self, message):
self.messages.put(message)
self.send_times.put(time())
@pytest.fixture
def handler():
class NoSleepMessageQueueHandler(MessageQueueHandler):
sleep_start_times = Queue()
sleep_seconds = Queue()
sleep_end_times = Queue()
def _sleep(self, seconds):
self.sleep_start_times.put(time())
self.sleep_seconds.put(seconds)
super()._sleep(seconds)
self.sleep_end_times.put(time())
handler = NoSleepMessageQueueHandler(100000)
handler.connector = MockConnector()
handler.connector.callback = handler.handle_event
handler.start()
yield handler
handler.cleanup()
def get_message_queue(*, size=None):
size = randint(5, 10) if not size else size
return {
'key': 'message.queue',
'messages': [
{'originator': urandom(4).hex(), 'message': urandom(randint(10, 20)).hex()}
for _ in range(size)
]
}
def test_order(handler):
queue = get_message_queue()
handler.connector.raise_event(queue)
expected_messages = queue['messages']
actual_messages = []
for _ in expected_messages:
actual_messages.append(handler.connector.messages.get())
assert len(actual_messages) == len(expected_messages)
for i in range(len(expected_messages)): # pylint: disable=consider-using-enumerate
message = actual_messages[i]
assert message['key'] == 'message.send'
assert message['originator'] == expected_messages[i]['originator']
assert message['typing'] == (i < len(expected_messages)-1)
def test_timing(handler):
q1 = get_message_queue(size=2)
q2 = get_message_queue(size=2)
handler.connector.raise_event(q1)
handler.connector.raise_event(q2)
messages = []
send_times = []
sleep_start_times = []
sleep_seconds = []
sleep_end_times = []
for _ in range(len(q1['messages']) + len(q2['messages'])):
messages.append(handler.connector.messages.get())
send_times.append(handler.connector.send_times.get())
sleep_start_times.append(handler.sleep_start_times.get())
sleep_seconds.append(handler.sleep_seconds.get())
sleep_end_times.append(handler.sleep_end_times.get())
# no sleep before first message
assert sleep_start_times[0] > send_times[0]
assert messages[0]['typing']
# at least 'seconds' sleep before sending next messages
assert (send_times[0] + sleep_seconds[0]) < sleep_end_times[0]
assert (send_times[0] + sleep_seconds[0]) < send_times[1]
assert messages[1]['typing']
assert (send_times[1] + sleep_seconds[1]) < sleep_end_times[1]
assert (send_times[1] + sleep_seconds[1]) < send_times[2]
assert messages[2]['typing']
# at least 'seconds' sleep after last message
assert (send_times[2] + sleep_seconds[2]) < sleep_end_times[3]
assert not messages[3]['typing']