mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-08 20:07:16 +00:00
45 lines
1.1 KiB
Python
45 lines
1.1 KiB
Python
from abc import ABC, abstractmethod
|
|
from contextlib import suppress
|
|
|
|
from .message_sender import MessageSender
|
|
|
|
|
|
class MessageStorage(ABC):
|
|
def __init__(self):
|
|
self._messages = []
|
|
|
|
def save_message(self, message):
|
|
with suppress(KeyError, AttributeError):
|
|
if self._filter_message(message):
|
|
self._messages.extend(self._transform_message(message))
|
|
|
|
@abstractmethod
|
|
def _filter_message(self, message):
|
|
raise NotImplementedError
|
|
|
|
def _transform_message(self, message): # pylint: disable=no-self-use
|
|
yield message
|
|
|
|
def clear(self):
|
|
self._messages.clear()
|
|
|
|
@property
|
|
def messages(self):
|
|
yield from self._messages
|
|
|
|
|
|
class FrontendMessageStorage(MessageStorage):
|
|
def __init__(self, keys):
|
|
self._keys = keys
|
|
super().__init__()
|
|
|
|
def _filter_message(self, message):
|
|
key = message['key']
|
|
return key in self._keys
|
|
|
|
def _transform_message(self, message):
|
|
if message['key'] == 'queueMessages':
|
|
yield from MessageSender.generate_messages_from_queue(message)
|
|
else:
|
|
yield message
|