mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-22 23:51:32 +00:00
50 lines
1.3 KiB
Python
50 lines
1.3 KiB
Python
from abc import ABC, abstractmethod
|
|
from contextlib import suppress
|
|
|
|
|
|
class MessageStorage(ABC):
|
|
def __init__(self):
|
|
self._messages = []
|
|
|
|
def save_message(self, message):
|
|
with suppress(KeyError, AttributeError):
|
|
if self._filter_message(message):
|
|
self._messages.extend(self._transform_message(message))
|
|
|
|
@abstractmethod
|
|
def _filter_message(self, message):
|
|
raise NotImplementedError
|
|
|
|
def _transform_message(self, message): # pylint: disable=no-self-use
|
|
yield message
|
|
|
|
def clear(self):
|
|
self._messages.clear()
|
|
|
|
@property
|
|
def messages(self):
|
|
yield from self._messages
|
|
|
|
|
|
class FrontendMessageStorage(MessageStorage):
|
|
def _filter_message(self, message):
|
|
return message['key'].startswith((
|
|
'console.write',
|
|
'message.send',
|
|
'ide.read'
|
|
))
|
|
|
|
def _transform_message(self, message):
|
|
transformations = {
|
|
'ide.read': self._delete_ide_content
|
|
}
|
|
if message['key'] in transformations:
|
|
yield from transformations[message['key']](message)
|
|
else:
|
|
yield message
|
|
|
|
@staticmethod
|
|
def _delete_ide_content(message):
|
|
del message['content']
|
|
yield message
|