diff --git a/tfw/components/frontend/message_queue_handler/message_queue_handler.py b/tfw/components/frontend/message_queue_handler/message_queue_handler.py index 17fedc1..b661877 100644 --- a/tfw/components/frontend/message_queue_handler/message_queue_handler.py +++ b/tfw/components/frontend/message_queue_handler/message_queue_handler.py @@ -1,19 +1,30 @@ -from time import sleep -from queue import Queue +import logging +from time import sleep, time +from queue import Queue, Empty from threading import Thread +from contextlib import suppress + +LOG = logging.getLogger(__name__) class MessageQueueHandler: keys = ['message.queue'] type_id = 'ControlEventHandler' avg_word_len = 5 + drain_poll_freq = 0.2 def __init__(self, wpm): self.connector = None self.wpm = wpm self._queue = Queue() + self._drain_queue = Queue() self._thread = Thread(target=self._dispatch_messages) + self._commands = { + 'message.queue': self.handle_queue, + 'message.queue.drain': self.handle_drain + } + def _dispatch_messages(self): for message in iter(self._queue.get, None): message['typing'] = not self._queue.empty() @@ -25,10 +36,35 @@ class MessageQueueHandler: chars_per_min = self.avg_word_len * words_per_min / 60 return len(message['message']) / chars_per_min - def _sleep(self, seconds): # pylint: disable=no-self-use - sleep(seconds) + def _sleep(self, seconds): + poll_freq = self.drain_poll_freq + if seconds < poll_freq: + poll_freq = seconds + + sleep_until = time() + seconds + while time() < sleep_until: + sleep(poll_freq) + with suppress(Empty): + self._drain_queue.get(block=False) + self._drain() + break + + def _drain(self): + with suppress(Empty): + while True: + message = self._queue.get(block=False) + if message is None: + break + message['typing'] = False + self.connector.send_message(message) def handle_event(self, message, _): + try: + self._commands[message['key']](message) + except KeyError: + LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) + + def handle_queue(self, message): for unpacked in self._generate_messages_from_queue(message): self._queue.put(unpacked) @@ -40,10 +76,15 @@ class MessageQueueHandler: **message } + def handle_drain(self, _): + self._drain_queue.put(True) + def start(self): self._thread.start() def cleanup(self): + # clearing the queue forces the loop in + # _dispatch_messages to block on _queue.get self._queue.queue.clear() self._queue.put(None) self._thread.join() diff --git a/tfw/components/frontend/message_queue_handler/test_message_queue.py b/tfw/components/frontend/message_queue_handler/test_message_queue.py index 8910981..2598872 100644 --- a/tfw/components/frontend/message_queue_handler/test_message_queue.py +++ b/tfw/components/frontend/message_queue_handler/test_message_queue.py @@ -2,7 +2,7 @@ from time import time from os import urandom from random import randint -from queue import Queue +from queue import Queue, Empty import pytest @@ -106,3 +106,47 @@ def test_timing(handler): # at least 'seconds' sleep after last message assert (send_times[2] + sleep_seconds[2]) < sleep_end_times[3] assert not messages[3]['typing'] + + +def test_drain(handler): + q1 = get_message_queue() + q2 = get_message_queue() + messages_count = len(q1['messages']) + len(q2['messages']) + handler.wpm = 0.01 # very slow, everything will just block + handler.connector.raise_event(q1) + handler.connector.raise_event(q2) + + handler.connector.messages.get(timeout=0.1) # no sleep before first message + with pytest.raises(Empty): + handler.connector.messages.get(timeout=0.1) + + handler.connector.raise_event({'key': 'message.queue.drain'}) + + for _ in range(messages_count - 1): + handler.connector.messages.get() + + +def test_queue_works_after_drain(handler): + handler.connector.raise_event({'key': 'message.queue.drain'}) + + q = get_message_queue() + expected_msg_count = len(q['messages']) + handler.connector.raise_event(q) + + messages = [] + for _ in range(expected_msg_count): + messages.append(handler.connector.messages.get()) + + assert len(messages) == expected_msg_count + assert handler._thread.is_alive() # pylint: disable=protected-access + + +def test_queue_can_be_stopped(handler): + handler.wpm = 0.01 # very slow, everything will just block + + q = get_message_queue() + handler.connector.raise_event(q) + handler.cleanup() + handler.connector.raise_event({'key': 'message.queue.drain'}) + + assert not handler._thread.is_alive() # pylint: disable=protected-access