Implement message handler command to drain message queue

This commit is contained in:
Kristóf Tóth 2019-10-14 13:59:13 +02:00
parent 88bc42c7f2
commit 6cdfb0f6ec
2 changed files with 90 additions and 5 deletions

View File

@ -1,19 +1,30 @@
from time import sleep import logging
from queue import Queue from time import sleep, time
from queue import Queue, Empty
from threading import Thread from threading import Thread
from contextlib import suppress
LOG = logging.getLogger(__name__)
class MessageQueueHandler: class MessageQueueHandler:
keys = ['message.queue'] keys = ['message.queue']
type_id = 'ControlEventHandler' type_id = 'ControlEventHandler'
avg_word_len = 5 avg_word_len = 5
drain_poll_freq = 0.2
def __init__(self, wpm): def __init__(self, wpm):
self.connector = None self.connector = None
self.wpm = wpm self.wpm = wpm
self._queue = Queue() self._queue = Queue()
self._drain_queue = Queue()
self._thread = Thread(target=self._dispatch_messages) self._thread = Thread(target=self._dispatch_messages)
self._commands = {
'message.queue': self.handle_queue,
'message.queue.drain': self.handle_drain
}
def _dispatch_messages(self): def _dispatch_messages(self):
for message in iter(self._queue.get, None): for message in iter(self._queue.get, None):
message['typing'] = not self._queue.empty() message['typing'] = not self._queue.empty()
@ -25,10 +36,35 @@ class MessageQueueHandler:
chars_per_min = self.avg_word_len * words_per_min / 60 chars_per_min = self.avg_word_len * words_per_min / 60
return len(message['message']) / chars_per_min return len(message['message']) / chars_per_min
def _sleep(self, seconds): # pylint: disable=no-self-use def _sleep(self, seconds):
sleep(seconds) poll_freq = self.drain_poll_freq
if seconds < poll_freq:
poll_freq = seconds
sleep_until = time() + seconds
while time() < sleep_until:
sleep(poll_freq)
with suppress(Empty):
self._drain_queue.get(block=False)
self._drain()
break
def _drain(self):
with suppress(Empty):
while True:
message = self._queue.get(block=False)
if message is None:
break
message['typing'] = False
self.connector.send_message(message)
def handle_event(self, message, _): def handle_event(self, message, _):
try:
self._commands[message['key']](message)
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def handle_queue(self, message):
for unpacked in self._generate_messages_from_queue(message): for unpacked in self._generate_messages_from_queue(message):
self._queue.put(unpacked) self._queue.put(unpacked)
@ -40,10 +76,15 @@ class MessageQueueHandler:
**message **message
} }
def handle_drain(self, _):
self._drain_queue.put(True)
def start(self): def start(self):
self._thread.start() self._thread.start()
def cleanup(self): def cleanup(self):
# clearing the queue forces the loop in
# _dispatch_messages to block on _queue.get
self._queue.queue.clear() self._queue.queue.clear()
self._queue.put(None) self._queue.put(None)
self._thread.join() self._thread.join()

View File

@ -2,7 +2,7 @@
from time import time from time import time
from os import urandom from os import urandom
from random import randint from random import randint
from queue import Queue from queue import Queue, Empty
import pytest import pytest
@ -106,3 +106,47 @@ def test_timing(handler):
# at least 'seconds' sleep after last message # at least 'seconds' sleep after last message
assert (send_times[2] + sleep_seconds[2]) < sleep_end_times[3] assert (send_times[2] + sleep_seconds[2]) < sleep_end_times[3]
assert not messages[3]['typing'] assert not messages[3]['typing']
def test_drain(handler):
q1 = get_message_queue()
q2 = get_message_queue()
messages_count = len(q1['messages']) + len(q2['messages'])
handler.wpm = 0.01 # very slow, everything will just block
handler.connector.raise_event(q1)
handler.connector.raise_event(q2)
handler.connector.messages.get(timeout=0.1) # no sleep before first message
with pytest.raises(Empty):
handler.connector.messages.get(timeout=0.1)
handler.connector.raise_event({'key': 'message.queue.drain'})
for _ in range(messages_count - 1):
handler.connector.messages.get()
def test_queue_works_after_drain(handler):
handler.connector.raise_event({'key': 'message.queue.drain'})
q = get_message_queue()
expected_msg_count = len(q['messages'])
handler.connector.raise_event(q)
messages = []
for _ in range(expected_msg_count):
messages.append(handler.connector.messages.get())
assert len(messages) == expected_msg_count
assert handler._thread.is_alive() # pylint: disable=protected-access
def test_queue_can_be_stopped(handler):
handler.wpm = 0.01 # very slow, everything will just block
q = get_message_queue()
handler.connector.raise_event(q)
handler.cleanup()
handler.connector.raise_event({'key': 'message.queue.drain'})
assert not handler._thread.is_alive() # pylint: disable=protected-access