mirror of
				https://github.com/avatao-content/baseimage-tutorial-framework
				synced 2025-11-04 00:52:55 +00:00 
			
		
		
		
	Move message queue logic to backend
This commit is contained in:
		
				
					committed by
					
						
						therealkrispet
					
				
			
			
				
	
			
			
			
						parent
						
							965973a32f
						
					
				
				
					commit
					0f379891e3
				
			@@ -1,3 +1,4 @@
 | 
			
		||||
from .frontend_proxy_handler import FrontendProxyHandler
 | 
			
		||||
from .console_logs_handler import ConsoleLogsHandler
 | 
			
		||||
from .frontend_proxy_handler import FrontendProxyHandler
 | 
			
		||||
from .message_queue_handler import MessageQueueHandler
 | 
			
		||||
from .message_sender import MessageSender
 | 
			
		||||
 
 | 
			
		||||
@@ -22,7 +22,7 @@ class FrontendProxyHandler:
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _filter_message(message):
 | 
			
		||||
        return not message['key'].startswith('ide')
 | 
			
		||||
        return not message['key'].startswith(('ide', 'message.queue'))
 | 
			
		||||
 | 
			
		||||
    def recover_frontend(self):
 | 
			
		||||
        for message in self._frontend_message_storage.messages:
 | 
			
		||||
 
 | 
			
		||||
@@ -0,0 +1 @@
 | 
			
		||||
from .message_queue_handler import MessageQueueHandler
 | 
			
		||||
@@ -0,0 +1,50 @@
 | 
			
		||||
from time import sleep
 | 
			
		||||
from queue import Queue
 | 
			
		||||
from threading import Thread
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MessageQueueHandler:
 | 
			
		||||
    keys = ['message.queue']
 | 
			
		||||
 | 
			
		||||
    def __init__(self, wpm):
 | 
			
		||||
        self.connector = None
 | 
			
		||||
        self._wpm, self._cps = None, None
 | 
			
		||||
        self.wpm = wpm
 | 
			
		||||
        self._queue = Queue()
 | 
			
		||||
        self._thread = Thread(target=self.dispatch_messages)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def wpm(self):
 | 
			
		||||
        return self._wpm
 | 
			
		||||
 | 
			
		||||
    @wpm.setter
 | 
			
		||||
    def wpm(self, wpm):
 | 
			
		||||
        self._wpm = wpm
 | 
			
		||||
        self._cps = 5*wpm/60
 | 
			
		||||
 | 
			
		||||
    def dispatch_messages(self):
 | 
			
		||||
        for message in iter(self._queue.get, None):
 | 
			
		||||
            sleep(len(message['message'])/self._cps)
 | 
			
		||||
            self.connector.send_message(message)
 | 
			
		||||
 | 
			
		||||
    def handle_event(self, message, _):
 | 
			
		||||
        for unpacked in self.generate_messages_from_queue(message):
 | 
			
		||||
            self._queue.put(unpacked)
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def generate_messages_from_queue(queue_message):
 | 
			
		||||
        last = queue_message['value'][-1]
 | 
			
		||||
        for message in queue_message['value']:
 | 
			
		||||
            yield {
 | 
			
		||||
                'key': 'message.send',
 | 
			
		||||
                'typing': message is not last,
 | 
			
		||||
                **message
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
    def start(self):
 | 
			
		||||
        self._thread.start()
 | 
			
		||||
 | 
			
		||||
    def cleanup(self):
 | 
			
		||||
        self._queue.queue.clear()
 | 
			
		||||
        self._queue.put(None)
 | 
			
		||||
        self._thread.join()
 | 
			
		||||
@@ -0,0 +1,67 @@
 | 
			
		||||
# pylint: disable=redefined-outer-name
 | 
			
		||||
from math import inf
 | 
			
		||||
from time import sleep
 | 
			
		||||
from os import urandom
 | 
			
		||||
from random import randint
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from .message_queue_handler import MessageQueueHandler
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MockConnector:
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.callback = None
 | 
			
		||||
        self.messages = []
 | 
			
		||||
 | 
			
		||||
    def raise_event(self, message):
 | 
			
		||||
        self.callback(message, self)
 | 
			
		||||
        sleep(0.01)
 | 
			
		||||
 | 
			
		||||
    def send_message(self, message):
 | 
			
		||||
        self.messages.append(message)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def handler():
 | 
			
		||||
    connector = MockConnector()
 | 
			
		||||
    handler = MessageQueueHandler(inf)
 | 
			
		||||
    handler.connector = connector
 | 
			
		||||
    connector.callback = handler.handle_event
 | 
			
		||||
    handler.start()
 | 
			
		||||
    yield handler
 | 
			
		||||
    handler.cleanup()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def queue():
 | 
			
		||||
    yield {
 | 
			
		||||
        'key': 'message.queue',
 | 
			
		||||
        'value': [
 | 
			
		||||
            {'originator': urandom(4).hex(), 'message': urandom(16).hex()}
 | 
			
		||||
            for _ in range(randint(5, 10))
 | 
			
		||||
        ]
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_message_order(handler, queue):
 | 
			
		||||
    handler.connector.raise_event(queue)
 | 
			
		||||
    old_list = queue['value']
 | 
			
		||||
    new_list = handler.connector.messages
 | 
			
		||||
    length = len(old_list)
 | 
			
		||||
    assert len(new_list) == length
 | 
			
		||||
    for i in range(length):
 | 
			
		||||
        unpacked = new_list[i]
 | 
			
		||||
        assert unpacked['key'] == 'message.send'
 | 
			
		||||
        assert unpacked['originator'] == old_list[i]['originator']
 | 
			
		||||
        assert unpacked['typing'] == (i < length-1)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_wpm(handler, queue):
 | 
			
		||||
    handler.wpm = 10000
 | 
			
		||||
    handler.connector.raise_event(queue)
 | 
			
		||||
    assert not handler.connector.messages
 | 
			
		||||
    handler.wpm = 100000000
 | 
			
		||||
    handler.connector.raise_event(queue)
 | 
			
		||||
    sleep(0.25)
 | 
			
		||||
    assert len(handler.connector.messages) == 2*len(queue['value'])
 | 
			
		||||
@@ -28,11 +28,3 @@ class MessageSender:
 | 
			
		||||
            'key': 'message.config',
 | 
			
		||||
            'originator': originator
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def generate_messages_from_queue(queue_message):
 | 
			
		||||
        for message in queue_message['value']:
 | 
			
		||||
            yield {
 | 
			
		||||
                'key': 'message.send',
 | 
			
		||||
                **message
 | 
			
		||||
            }
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,6 @@
 | 
			
		||||
from abc import ABC, abstractmethod
 | 
			
		||||
from contextlib import suppress
 | 
			
		||||
 | 
			
		||||
from .message_sender import MessageSender
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MessageStorage(ABC):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
@@ -36,13 +34,11 @@ class FrontendMessageStorage(MessageStorage):
 | 
			
		||||
            'dashboard.terminalMenuItem',
 | 
			
		||||
            'message.config',
 | 
			
		||||
            'message.send',
 | 
			
		||||
            'message.queue',
 | 
			
		||||
            'ide.read'
 | 
			
		||||
        ))
 | 
			
		||||
 | 
			
		||||
    def _transform_message(self, message):
 | 
			
		||||
        transformations = {
 | 
			
		||||
            'message.queue': MessageSender.generate_messages_from_queue,
 | 
			
		||||
            'ide.read': self._delete_ide_content
 | 
			
		||||
        }
 | 
			
		||||
        if message['key'] in transformations:
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user