Use prefix matching in ZMQConnector

This commit is contained in:
Kristóf Tóth 2019-08-08 15:05:37 +02:00
parent 35e5b595d1
commit b5e53cb946
Notes: R. Richard 2019-09-04 21:27:26 +02:00
- Control/Signed/FSMAwareEventHandler unit tesztek
- command_handler dict automatikus generálása
- frontend keyup ne hallgasson már a nyilakra meg ilyenekre
- CommandEventHandler humble object + tesztek
- terminalba írás ideiglenes letiltása
- ha nem kell egy komponens ki lehessen venni
- pipe, ami aláírja az üzenetet, amit beleküldünk
- legyen pötyögtetés a message queue elején
- a queue thread-jét lehessen join-olni várakozás közben is
- PipeConnector nyissa meg a már előtte is létező pipe-okat
9 changed files with 62 additions and 65 deletions

View File

@ -4,26 +4,25 @@ from .message_storage import FrontendMessageStorage
class FrontendHandler: class FrontendHandler:
# TODO: do not store dashboard messages like reloadFrontend # keys = [
keys = [ # 'console.read',
'console.read', # 'console.write',
'console.write', # 'console.showLiveLogs',
'console.showLiveLogs', # 'console.rewriteContentWithProcessLogsOnDeploy',
'console.rewriteContentWithProcessLogsOnDeploy', # 'dashboard.layout',
'dashboard.layout', # 'dashboard.hideMessages',
'dashboard.hideMessages', # 'dashboard.terminalMenuItem',
'dashboard.terminalMenuItem', # 'dashboard.reloadFrontend',
'dashboard.reloadFrontend', # 'dashboard.reloadIframe',
'dashboard.reloadIframe', # 'message.config',
'message.config', # 'message.queue',
'message.queue', # 'message.send'
'message.send' # ]
] keys = ['console', 'dashboard', 'message', 'ide.read', 'recover']
def __init__(self): def __init__(self):
self.connector = None self.connector = None
self.keys = [*type(self).keys, 'recover'] self._frontend_message_storage = FrontendMessageStorage()
self._frontend_message_storage = FrontendMessageStorage(type(self).keys)
def send_message(self, message): def send_message(self, message):
self.connector.send_message(message, scope=Scope.WEBSOCKET) self.connector.send_message(message, scope=Scope.WEBSOCKET)
@ -32,8 +31,13 @@ class FrontendHandler:
self._frontend_message_storage.save_message(message) self._frontend_message_storage.save_message(message)
if message['key'] == 'recover': if message['key'] == 'recover':
self.recover_frontend() self.recover_frontend()
if self._filter_message(message):
self.send_message(message) self.send_message(message)
@staticmethod
def _filter_message(message):
return not message['key'].startswith('ide')
def recover_frontend(self): def recover_frontend(self):
for message in self._frontend_message_storage.messages: for message in self._frontend_message_storage.messages:
self.send_message(message) self.send_message(message)

View File

@ -29,16 +29,25 @@ class MessageStorage(ABC):
class FrontendMessageStorage(MessageStorage): class FrontendMessageStorage(MessageStorage):
def __init__(self, keys):
self._keys = keys
super().__init__()
def _filter_message(self, message): def _filter_message(self, message):
key = message['key'] return message['key'].startswith((
return key in self._keys 'console.write',
'dashboard.layout',
'dashboard.terminalMenuItem',
'message.send',
'message.config',
'ide.read'
))
def _transform_message(self, message): def _transform_message(self, message):
if message['key'] == 'queueMessages': transformations = {
yield from MessageSender.generate_messages_from_queue(message) 'message.queue': MessageSender.generate_messages_from_queue,
else: 'ide.read': self._delete_ide_content
}
if message['key'] in transformations:
yield from transformations[message['key']](message)
@staticmethod
def _delete_ide_content(message):
del message['content']
yield message yield message

View File

@ -9,7 +9,8 @@ LOG = logging.getLogger(__name__)
class FSMHandler: class FSMHandler:
keys = ['fsm.step', 'fsm.announce'] keys = ['fsm']
def __init__(self, *, fsm_type): def __init__(self, *, fsm_type):
self.fsm = fsm_type() self.fsm = fsm_type()
self._fsm_updater = FSMUpdater(self.fsm) self._fsm_updater = FSMUpdater(self.fsm)

View File

@ -33,7 +33,7 @@ BUILD_ARTIFACTS = (
class IdeHandler: class IdeHandler:
keys = ['ide.read', 'ide.write'] keys = ['ide']
def __init__(self, *, patterns, initial_file=''): def __init__(self, *, patterns, initial_file=''):
self.connector = None self.connector = None

View File

@ -9,7 +9,8 @@ LOG = logging.getLogger(__name__)
class ProcessHandler(ProcessManager, ProcessLogManager): class ProcessHandler(ProcessManager, ProcessLogManager):
keys = ['process.start', 'process.stop', 'process.restart'] keys = ['process']
def __init__(self, *, supervisor_uri, log_tail=0): def __init__(self, *, supervisor_uri, log_tail=0):
ProcessManager.__init__(self, supervisor_uri) ProcessManager.__init__(self, supervisor_uri)
ProcessLogManager.__init__(self, supervisor_uri) ProcessLogManager.__init__(self, supervisor_uri)
@ -38,4 +39,5 @@ class ProcessHandler(ProcessManager, ProcessLogManager):
) )
connector.send_message(message, scope=Scope.WEBSOCKET) connector.send_message(message, scope=Scope.WEBSOCKET)
except KeyError: except KeyError:
if not message['key'].startswith('process.log'):
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -6,7 +6,7 @@ LOG = logging.getLogger(__name__)
class ProcessLogHandler: class ProcessLogHandler:
keys = ['process.log.set'] keys = ['process.log']
def __init__(self, *, process_name, supervisor_uri, log_tail=0): def __init__(self, *, process_name, supervisor_uri, log_tail=0):
self.connector, self._monitor = None, None self.connector, self._monitor = None, None

View File

@ -6,8 +6,6 @@ from datetime import datetime
from dateutil import parser as dateparser from dateutil import parser as dateparser
from tfw.internals.networking import Scope
from .snapshot_provider import SnapshotProvider from .snapshot_provider import SnapshotProvider
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -23,9 +21,9 @@ class SnapshotHandler:
self.init_snapshot_providers(directories) self.init_snapshot_providers(directories)
self.command_handlers = { self.command_handlers = {
'take_snapshot': self.handle_take_snapshot, 'snapshot.take': self.handle_take_snapshot,
'restore_snapshot': self.handle_restore_snapshot, 'snapshot.restore': self.handle_restore_snapshot,
'exclude': self.handle_exclude 'snapshot.exclude': self.handle_exclude
} }
def init_snapshot_providers(self, directories): def init_snapshot_providers(self, directories):
@ -45,23 +43,20 @@ class SnapshotHandler:
makedirs(git_dir, exist_ok=True) makedirs(git_dir, exist_ok=True)
return git_dir return git_dir
def handle_event(self, message, connector): def handle_event(self, message, _):
try: try:
data = message['data'] self.command_handlers[message['key']](message)
message['data'] = self.command_handlers[data['command']](data)
connector.send_message(message, scope=Scope.WEBSOCKET)
except KeyError: except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def handle_take_snapshot(self, data): def handle_take_snapshot(self, _):
LOG.debug('Taking snapshots of directories %s', self.snapshot_providers.keys()) LOG.debug('Taking snapshots of directories %s', self.snapshot_providers.keys())
for provider in self.snapshot_providers.values(): for provider in self.snapshot_providers.values():
provider.take_snapshot() provider.take_snapshot()
return data
def handle_restore_snapshot(self, data): def handle_restore_snapshot(self, message):
date = dateparser.parse( date = dateparser.parse(
data.get( message.get(
'value', 'value',
datetime.now().isoformat() datetime.now().isoformat()
) )
@ -73,13 +68,11 @@ class SnapshotHandler:
) )
for provider in self.snapshot_providers.values(): for provider in self.snapshot_providers.values():
provider.restore_snapshot(date) provider.restore_snapshot(date)
return data
def handle_exclude(self, data): def handle_exclude(self, message):
exclude_unix_patterns = data['value'] exclude_unix_patterns = message['value']
if not isinstance(exclude_unix_patterns, list): if not isinstance(exclude_unix_patterns, list):
raise KeyError raise KeyError
for provider in self.snapshot_providers.values(): for provider in self.snapshot_providers.values():
provider.exclude = exclude_unix_patterns provider.exclude = exclude_unix_patterns
return data

View File

@ -7,7 +7,7 @@ LOG = logging.getLogger(__name__)
class TerminalHandler: class TerminalHandler:
keys = ['terminal.write'] keys = ['terminal']
def __init__(self, *, port, user, working_directory, histfile): def __init__(self, *, port, user, working_directory, histfile):
self.connector, self._historymonitor = None, None self.connector, self._historymonitor = None, None

View File

@ -15,8 +15,6 @@ LOG = logging.getLogger(__name__)
class ZMQDownlinkConnector: class ZMQDownlinkConnector:
def __init__(self, connect_addr): def __init__(self, connect_addr):
self.keys = []
self._on_recv_callback = None
self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB) self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB)
self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0)
self._zmq_sub_socket.connect(connect_addr) self._zmq_sub_socket.connect(connect_addr)
@ -25,19 +23,14 @@ class ZMQDownlinkConnector:
def subscribe(self, *keys): def subscribe(self, *keys):
for key in keys: for key in keys:
self._zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, key) self._zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, key)
self.keys.append(key)
def unsubscribe(self, *keys): def unsubscribe(self, *keys):
for key in keys: for key in keys:
self._zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, key) self._zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, key)
self.keys.remove(key)
def register_callback(self, callback): def register_callback(self, callback):
if callback: callback = with_deserialize_tfw_msg(callback) if callback else None
self._on_recv_callback = callback self._zmq_sub_stream.on_recv(callback)
self._zmq_sub_stream.on_recv(with_deserialize_tfw_msg(self._on_recv))
else:
self._zmq_sub_stream.on_recv(None)
def recv_message(self, *, block=True): def recv_message(self, *, block=True):
if self._zmq_sub_stream.receiving(): if self._zmq_sub_stream.receiving():
@ -48,11 +41,6 @@ class ZMQDownlinkConnector:
except zmq.ZMQError: except zmq.ZMQError:
raise IOError("No data available to recv!") raise IOError("No data available to recv!")
def _on_recv(self, message):
key = message['key']
if key in self.keys or '' in self.keys:
self._on_recv_callback(message)
def close(self): def close(self):
self._zmq_sub_stream.close() self._zmq_sub_stream.close()