From d31a850a4e70cce27b328e2c172d84b9c8665f14 Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Wed, 7 Aug 2019 09:44:03 +0200 Subject: [PATCH 01/10] Simplify IDE handler and file manager --- .../ide/file_manager/file_manager.py | 123 +++++--------- .../ide/file_manager/test_file_manager.py | 147 ++++++----------- tfw/components/ide/ide_handler.py | 156 ++++-------------- 3 files changed, 127 insertions(+), 299 deletions(-) diff --git a/tfw/components/ide/file_manager/file_manager.py b/tfw/components/ide/file_manager/file_manager.py index 0e4fc6a..e416cb9 100644 --- a/tfw/components/ide/file_manager/file_manager.py +++ b/tfw/components/ide/file_manager/file_manager.py @@ -1,93 +1,56 @@ -from typing import Iterable +from functools import wraps from glob import glob from fnmatch import fnmatchcase -from os.path import basename, isfile, join, relpath, exists, isdir, realpath +from os.path import dirname, isdir, isfile, realpath -class FileManager: # pylint: disable=too-many-instance-attributes - def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None): - self._exclude, self.exclude = [], exclude - self._allowed_directories, self.allowed_directories = None, allowed_directories - self._workdir, self.workdir = None, working_directory - self._filename, self.filename = None, selected_file or self.files[0] +def _with_is_allowed(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + if self._is_allowed(args[0]): # pylint: disable=protected-access + return func(self, *args, **kwargs) + raise ValueError('Forbidden path.') + return wrapper - @property - def exclude(self): - return self._exclude - @exclude.setter - def exclude(self, exclude): - if exclude is None: - return - if not isinstance(exclude, Iterable): - raise TypeError('Exclude must be Iterable!') - self._exclude = exclude - - @property - def workdir(self): - return self._workdir - - @workdir.setter - def workdir(self, directory): - if not exists(directory) or not isdir(directory): - raise EnvironmentError(f'"{directory}" is not a directory!') - if not self._is_in_allowed_dir(directory): - raise EnvironmentError(f'Directory "{directory}" is not allowed!') - self._workdir = directory - - @property - def allowed_directories(self): - return self._allowed_directories - - @allowed_directories.setter - def allowed_directories(self, directories): - self._allowed_directories = [realpath(directory) for directory in directories] - - @property - def filename(self): - return self._filename - - @filename.setter - def filename(self, filename): - if filename not in self.files: - raise EnvironmentError('No such file in workdir!') - self._filename = filename +class FileManager: # pylint: disable=too-many-instance-attributes + def __init__(self, patterns): + self.patterns = patterns @property def files(self): - return [ - self._relpath(file) - for file in glob(join(self._workdir, '**/*'), recursive=True) - if isfile(file) - and self._is_in_allowed_dir(file) - and not self._is_blacklisted(file) - ] + return list(set( + path + for pattern in self.patterns + for path in glob(pattern, recursive=True) + if isfile(path) and self._is_allowed(path) + )) @property - def file_contents(self): - with open(self._filepath(self.filename), 'rb', buffering=0) as ifile: + def parents(self): + return list(set( + self._find_directory(pattern) + for pattern in self.patterns + )) + + @staticmethod + def _find_directory(pattern): + while pattern and not isdir(pattern): + pattern = dirname(pattern) + return pattern + + def _is_allowed(self, filepath): + return any( + fnmatchcase(realpath(filepath), pattern) + for pattern in self.patterns + ) + + @_with_is_allowed + def read_file(self, filepath): # pylint: disable=no-self-use + with open(filepath, 'rb', buffering=0) as ifile: return ifile.read().decode(errors='surrogateescape') - @file_contents.setter - def file_contents(self, value): - with open(self._filepath(self.filename), 'wb', buffering=0) as ofile: - ofile.write(value.encode()) - - def _is_in_allowed_dir(self, path): - return any( - realpath(path).startswith(allowed_dir) - for allowed_dir in self.allowed_directories - ) - - def _is_blacklisted(self, file): - return any( - fnmatchcase(file, blacklisted) or - fnmatchcase(basename(file), blacklisted) - for blacklisted in self.exclude - ) - - def _filepath(self, filename): - return join(self._workdir, filename) - - def _relpath(self, filename): - return relpath(self._filepath(filename), start=self._workdir) + @_with_is_allowed + def write_file(self, filepath, contents): # pylint: disable=no-self-use + with open(filepath, 'wb', buffering=0) as ofile: + ofile.write(contents.encode()) diff --git a/tfw/components/ide/file_manager/test_file_manager.py b/tfw/components/ide/file_manager/test_file_manager.py index 8f978d3..08e4f25 100644 --- a/tfw/components/ide/file_manager/test_file_manager.py +++ b/tfw/components/ide/file_manager/test_file_manager.py @@ -1,8 +1,8 @@ # pylint: disable=redefined-outer-name from dataclasses import dataclass from secrets import token_urlsafe +from os import mkdir, symlink from os.path import join -from os import chdir, mkdir, symlink from pathlib import Path from tempfile import TemporaryDirectory @@ -13,112 +13,75 @@ from .file_manager import FileManager @dataclass class ManagerContext: - folder: str + workdir: str + subdir: str + subfile: str manager: FileManager - def join(self, path): - return join(self.folder, path) + def create_random_file(self, dirname, extension): + filename = self.join(f'{dirname}/{generate_name()}{extension}') + Path(filename).touch() + return filename + def create_random_folder(self, basepath): + dirname = self.join(f'{basepath}/{generate_name()}') + mkdir(dirname) + return dirname + + def create_random_link(self, source, dirname, extension): + linkname = self.join(f'{dirname}/{generate_name()}{extension}') + symlink(source, linkname) + return linkname + + def join(self, path): + return join(self.workdir, path) + +def generate_name(): + return token_urlsafe(16) @pytest.fixture() def context(): - dirs = {} - with TemporaryDirectory() as workdir: - chdir(workdir) - for name in ['allowed', 'excluded', 'invis']: - node = join(workdir, name) - mkdir(node) - Path(join(node, 'empty.txt')).touch() - Path(join(node, 'empty.bin')).touch() - dirs[name] = node + subdir = join(workdir, generate_name()) + subfile = join(subdir, generate_name() + '.txt') + mkdir(subdir) + Path(subfile).touch() + manager = FileManager([join(workdir, '**/*.txt')]) + yield ManagerContext(workdir, subdir, subfile, manager) - yield ManagerContext( - workdir, - FileManager( - dirs['allowed'], - [dirs['allowed'], dirs['excluded']], - exclude=['*/excluded/*'] - ) - ) +def test_matching_files(context): + newdir = context.create_random_folder(context.subdir) + newfile = context.create_random_file(newdir, '.txt') + newlink = context.create_random_link(newfile, newdir, '.txt') + assert set(context.manager.files) == {context.subfile, newfile, newlink} -@pytest.mark.parametrize('subdir', ['allowed/', 'excluded/']) -def test_select_allowed_dirs(context, subdir): - context.manager.workdir = context.join(subdir) - assert context.manager.workdir == context.join(subdir) - newdir = context.join(subdir+'deep') - mkdir(newdir) - context.manager.workdir = newdir - assert context.manager.workdir == newdir +def test_unmatching_files(context): + newtxt = context.create_random_file(context.workdir, '.txt') + newbin = context.create_random_file(context.subdir, '.bin') + context.create_random_link(newtxt, context.subdir, '.txt') + context.create_random_link(newbin, context.subdir, '.txt') + assert context.manager.files == [context.subfile] -@pytest.mark.parametrize('invdir', ['', 'invis']) -def test_select_forbidden_dirs(context, invdir): - fullpath = context.join(invdir) - with pytest.raises(OSError): - context.manager.workdir = fullpath - assert context.manager.workdir != fullpath - context.manager.allowed_directories += [fullpath] - context.manager.workdir = fullpath - assert context.manager.workdir == fullpath - - -@pytest.mark.parametrize('filename', ['another.txt', '*.txt']) -def test_select_allowed_files(context, filename): - Path(context.join('allowed/'+filename)).touch() - assert filename in context.manager.files - context.manager.filename = filename - assert context.manager.filename == filename - -@pytest.mark.parametrize('path', [ - {'dir': 'allowed/', 'file': 'illegal.bin'}, - {'dir': 'excluded/', 'file': 'legal.txt'}, - {'dir': 'allowed/', 'file': token_urlsafe(16)+'.bin'}, - {'dir': 'excluded/', 'file': token_urlsafe(16)+'.txt'}, - {'dir': 'allowed/', 'file': token_urlsafe(32)+'.bin'}, - {'dir': 'excluded/', 'file': token_urlsafe(32)+'.txt'} -]) -def test_select_excluded_files(context, path): - context.manager.workdir = context.join(path['dir']) - context.manager.exclude = ['*/excluded/*', '*.bin'] - Path(context.join(path['dir']+path['file'])).touch() - assert path['file'] not in context.manager.files - with pytest.raises(OSError): - context.manager.filename = path['file'] - -@pytest.mark.parametrize('path', [ - {'src': 'excluded/empty.txt', 'dst': 'allowed/link.txt'}, - {'src': 'invis/empty.txt', 'dst': 'allowed/link.txt'}, - {'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'}, - {'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'}, - {'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'}, - {'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'} -]) -def test_select_excluded_symlinks(context, path): - symlink(context.join(path['src']), context.join(path['dst'])) - assert path['dst'] not in context.manager.files +def test_parents(context): + newdir = context.create_random_folder(context.workdir) + context.manager.patterns += [f'{newdir}/[!/@]*/**/?.c'] + assert set(context.manager.parents) == {context.workdir, newdir} def test_read_write_file(context): for _ in range(128): - context.manager.filename = 'empty.txt' content = token_urlsafe(32) - context.manager.file_contents = content - assert context.manager.file_contents == content - with open(context.join('allowed/empty.txt'), 'r') as ifile: + context.manager.write_file(context.subfile, content) + assert context.manager.read_file(context.subfile) == content + with open(context.subfile, 'r') as ifile: assert ifile.read() == content def test_regular_ide_actions(context): - context.manager.workdir = context.join('allowed') - newfile1, newfile2 = token_urlsafe(16), token_urlsafe(16) - Path(context.join(f'allowed/{newfile1}')).touch() - Path(context.join(f'allowed/{newfile2}')).touch() - for _ in range(8): + newfile1 = context.create_random_file(context.subdir, '.txt') + newfile2 = context.create_random_file(context.subdir, '.txt') + for _ in range(4): context.manager.filename = newfile1 - content1 = token_urlsafe(32) - context.manager.file_contents = content1 - context.manager.filename = newfile2 - content2 = token_urlsafe(32) - context.manager.file_contents = content2 - context.manager.filename = newfile1 - assert context.manager.file_contents == content1 - context.manager.filename = newfile2 - assert context.manager.file_contents == content2 + content1, content2 = token_urlsafe(32), token_urlsafe(32) + context.manager.write_file(newfile1, content1) + context.manager.write_file(newfile2, content2) + assert context.manager.read_file(newfile1) == content1 + assert context.manager.read_file(newfile2) == content2 diff --git a/tfw/components/ide/ide_handler.py b/tfw/components/ide/ide_handler.py index 38f7842..f7435d6 100644 --- a/tfw/components/ide/ide_handler.py +++ b/tfw/components/ide/ide_handler.py @@ -1,4 +1,5 @@ import logging +from os.path import isfile from tfw.internals.networking import Scope from tfw.internals.inotify import InotifyObserver @@ -32,161 +33,62 @@ BUILD_ARTIFACTS = ( class IdeHandler: - keys = ['ide'] - # pylint: disable=too-many-arguments,anomalous-backslash-in-string - """ - Event handler implementing the backend of our browser based IDE. - By default all files in the directory specified in __init__ are displayed - on the fontend. Note that this is a stateful component. + keys = ['ide.read', 'ide.write'] - When any file in the selected directory changes they are automatically refreshed - on the frontend (this is done by listening to inotify events). - - This EventHandler accepts messages that have a data['command'] key specifying - a command to be executed. - - The API of each command is documented in their respective handler. - """ - def __init__(self, *, directory, allowed_directories, selected_file=None, exclude=None): - """ - :param key: the key this instance should listen to - :param directory: working directory which the EventHandler should serve files from - :param allowed_directories: list of directories that can be switched to using selectdir - :param selected_file: file that is selected by default - :param exclude: list of filenames that should not appear between files (for .o, .pyc, etc.) - """ + def __init__(self, *, patterns, initial_file=''): self.connector = None - try: - self.filemanager = FileManager( - allowed_directories=allowed_directories, - working_directory=directory, - selected_file=selected_file, - exclude=exclude - ) - except IndexError: - raise EnvironmentError( - f'No file(s) in IdeEventHandler working_directory "{directory}"!' - ) + self.filemanager = FileManager(patterns) + self._initial_file = initial_file self.monitor = InotifyObserver( - self.filemanager.allowed_directories, + path=self.filemanager.parents, exclude=BUILD_ARTIFACTS ) self.monitor.on_modified = self._reload_frontend self.monitor.start() self.commands = { - 'read': self.read, - 'write': self.write, - 'select': self.select, - 'selectdir': self.select_dir, - 'exclude': self.exclude + 'ide.read' : self.read, + 'ide.write' : self.write } + @property + def initial_file(self): + if not isfile(self._initial_file): + self._initial_file = self.filemanager.files[0] + return self._initial_file + def _reload_frontend(self, event): # pylint: disable=unused-argument - self.send_message({ - 'key': 'ide', - 'data': {'command': 'reload'} - }) + self.send_message({'key': 'ide.reload'}) def send_message(self, message): self.connector.send_message(message, scope=Scope.WEBSOCKET) - def read(self, data): - """ - Read the currently selected file. - - :return dict: TFW message data containing key 'content' - (contents of the selected file) - """ + def read(self, message): + if message.get('files'): + self.filemanager.patterns = message['files'] try: - data['content'] = self.filemanager.file_contents + message['content'] = self.filemanager.read_file(message['filename']) except PermissionError: - data['content'] = 'You have no permission to open that file :(' + message['content'] = 'You have no permission to open that file :(' except FileNotFoundError: - data['content'] = 'This file was removed :(' + message['content'] = 'This file was removed :(' except Exception: # pylint: disable=broad-except - data['content'] = 'Failed to read file :(' - return data + message['content'] = 'Failed to read file :(' - def write(self, data): - """ - Overwrites a file with the desired string. - - :param data: TFW message data containing key 'content' - (new file content) - - """ + def write(self, message): try: - self.filemanager.file_contents = data['content'] + self.filemanager.write_file(message['filename'], message['content']) except Exception: # pylint: disable=broad-except LOG.exception('Error writing file!') - del data['content'] - return data - - def select(self, data): - """ - Selects a file from the current directory. - - :param data: TFW message data containing 'filename' - (name of file to select relative to the current directory) - """ - try: - self.filemanager.filename = data['filename'] - except EnvironmentError: - LOG.exception('Failed to select file "%s"', data['filename']) - return data - - def select_dir(self, data): - """ - Select a new working directory to display files from. - - :param data: TFW message data containing 'directory' - (absolute path of diretory to select. - must be a path whitelisted in - self.allowed_directories) - """ - try: - self.filemanager.workdir = data['directory'] - try: - self.filemanager.filename = self.filemanager.files[0] - self.read(data) - except IndexError: - data['content'] = 'No files in this directory :(' - except EnvironmentError as err: - LOG.error( - 'Failed to select directory "%s". Reason: %s', - data['directory'], str(err) - ) - return data - - def exclude(self, data): - """ - Overwrite list of excluded files - - :param data: TFW message data containing 'exclude' - (list of unix-style filename patterns to be excluded, - e.g.: ["\*.pyc", "\*.o") - """ - try: - self.filemanager.exclude = list(data['exclude']) - except TypeError: - LOG.error('Exclude must be Iterable!') - return data - - def attach_fileinfo(self, data): - """ - Basic information included in every response to the frontend. - """ - data['filename'] = self.filemanager.filename - data['files'] = self.filemanager.files - data['directory'] = self.filemanager.workdir + del message['content'] def handle_event(self, message, _): try: - data = message['data'] - message['data'] = self.commands[data['command']](data) - self.attach_fileinfo(data) + if message['filename'] == '': + message['filename'] = self.initial_file + self.commands[message['key']](message) + message['files'] = self.filemanager.files self.send_message(message) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) From dded7fd65cee74d3841a2fdbb95f66473d645194 Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Wed, 7 Aug 2019 09:46:58 +0200 Subject: [PATCH 02/10] Update FSM related classes according to the new API --- tfw/components/fsm/fsm_handler.py | 41 ++++------------------- tfw/components/fsm/fsm_updater.py | 2 +- tfw/internals/event_handling/fsm_aware.py | 4 +-- 3 files changed, 10 insertions(+), 37 deletions(-) diff --git a/tfw/components/fsm/fsm_handler.py b/tfw/components/fsm/fsm_handler.py index 60b8c16..61f901e 100644 --- a/tfw/components/fsm/fsm_handler.py +++ b/tfw/components/fsm/fsm_handler.py @@ -1,6 +1,6 @@ import logging -from tfw.internals.crypto import KeyManager, sign_message, verify_message +from tfw.internals.crypto import KeyManager, sign_message from tfw.internals.networking import Scope from .fsm_updater import FSMUpdater @@ -9,57 +9,30 @@ LOG = logging.getLogger(__name__) class FSMHandler: - keys = ['fsm'] - """ - EventHandler responsible for managing the state machine of - the framework (TFW FSM). - - tfw.networking.TFWServer instances automatically send 'trigger' - commands to the event handler listening on the 'fsm' key, - which should be an instance of this event handler. - - This event handler accepts messages that have a - data['command'] key specifying a command to be executed. - - An 'fsm_update' message is broadcasted after every successful - command. - """ + keys = ['fsm.step', 'fsm.update'] def __init__(self, *, fsm_type): self.fsm = fsm_type() self._fsm_updater = FSMUpdater(self.fsm) self.auth_key = KeyManager().auth_key self.command_handlers = { - 'trigger': self.handle_trigger, - 'update': self.handle_update + 'fsm.step' : self.handle_step, + 'fsm.update' : self.handle_update } def handle_event(self, message, connector): try: - message = self.command_handlers[message['data']['command']](message) + message = self.command_handlers[message['key']](message) if message: fsm_update_message = self._fsm_updater.fsm_update - sign_message(self.auth_key, message) sign_message(self.auth_key, fsm_update_message) connector.send_message(fsm_update_message, Scope.BROADCAST) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) - def handle_trigger(self, message): - """ - Attempts to step the FSM with the supplied trigger. - - :param message: TFW message with a data field containing - the action to try triggering in data['value'] - """ - trigger = message['data']['value'] - if self.fsm.step(trigger): - return message - return None + def handle_step(self, message): + return message if self.fsm.step(message['trigger']) else None def handle_update(self, message): - """ - Does nothing, but triggers an 'fsm_update' message. - """ # pylint: disable=no-self-use return message diff --git a/tfw/components/fsm/fsm_updater.py b/tfw/components/fsm/fsm_updater.py index 93f9e7a..d1b42ec 100644 --- a/tfw/components/fsm/fsm_updater.py +++ b/tfw/components/fsm/fsm_updater.py @@ -5,7 +5,7 @@ class FSMUpdater: @property def fsm_update(self): return { - 'key': 'fsm_update', + 'key': 'fsm.announce', **self.fsm_update_data } diff --git a/tfw/internals/event_handling/fsm_aware.py b/tfw/internals/event_handling/fsm_aware.py index 5254e68..b700dba 100644 --- a/tfw/internals/event_handling/fsm_aware.py +++ b/tfw/internals/event_handling/fsm_aware.py @@ -6,7 +6,7 @@ LOG = logging.getLogger(__name__) class FSMAware: - keys = ['fsm_update'] + keys = ['fsm.announce'] """ Base class for stuff that has to be aware of the framework FSM. This is done by processing 'fsm_update' messages. @@ -18,7 +18,7 @@ class FSMAware: self._auth_key = KeyManager().auth_key def process_message(self, message): - if message['key'] == 'fsm_update': + if message['key'] == 'fsm.announce': if verify_message(self._auth_key, message): self._handle_fsm_update(message) From e414ea2631e946bcfb690218dd4fb9fa04ef7cec Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Wed, 7 Aug 2019 09:47:28 +0200 Subject: [PATCH 03/10] Simplify terminal handler --- tfw/components/terminal/terminal_handler.py | 55 ++++----------------- 1 file changed, 9 insertions(+), 46 deletions(-) diff --git a/tfw/components/terminal/terminal_handler.py b/tfw/components/terminal/terminal_handler.py index c8df229..7d3e041 100644 --- a/tfw/components/terminal/terminal_handler.py +++ b/tfw/components/terminal/terminal_handler.py @@ -7,40 +7,25 @@ LOG = logging.getLogger(__name__) class TerminalHandler: - keys = ['shell'] - """ - Event handler responsible for managing terminal sessions for frontend xterm - sessions to connect to. You need to instanciate this in order for frontend - terminals to work. + keys = ['terminal.write'] - This EventHandler accepts messages that have a data['command'] key specifying - a command to be executed. - The API of each command is documented in their respective handler. - """ - def __init__(self, *, port, user, workind_directory, histfile): - """ - :param key: key this EventHandler listens to - :param monitor: tfw.components.HistoryMonitor instance to read command history from - """ - self.connector = None + def __init__(self, *, port, user, working_directory, histfile): + self.connector, self._historymonitor = None, None self._histfile = histfile - self._historymonitor = None bash_as_user_cmd = ['sudo', '-u', user, 'bash'] self.terminado_server = TerminadoMiniServer( '/terminal', port, - workind_directory, + working_directory, bash_as_user_cmd ) + self.terminado_server.listen() self.commands = { - 'write': self.write, - 'read': self.read + 'terminal.write': self.handle_write } - self.terminado_server.listen() - def start(self): self._historymonitor = BashMonitor(self.connector, self._histfile) self._historymonitor.start() @@ -51,34 +36,12 @@ class TerminalHandler: def handle_event(self, message, _): try: - data = message['data'] - message['data'] = self.commands[data['command']](data) + self.commands[message['key']](message) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) - def write(self, data): - """ - Writes a string to the terminal session (on the pty level). - Useful for pre-typing and executing commands for the user. - - :param data: TFW message data containing 'value' - (command to be written to the pty) - """ - self.terminado_server.pty.write(data['value']) - return data - - def read(self, data): - """ - Reads the history of commands executed. - - :param data: TFW message data containing 'count' - (the number of history elements to return) - :return dict: message with list of commands in data['history'] - """ - data['count'] = int(data.get('count', 1)) - if self.historymonitor: - data['history'] = self.historymonitor.history[-data['count']:] - return data + def handle_write(self, message): + self.terminado_server.pty.write(message['command']) def cleanup(self): self.terminado_server.stop() From 4b7510e704096783de02b9c56b9c73f48c69ec07 Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Wed, 7 Aug 2019 09:47:57 +0200 Subject: [PATCH 04/10] Update process- and logmonitor according to the new API --- .../log_inotify_observer.py | 11 ++--- .../process_management/process_handler.py | 34 +++++--------- .../process_management/process_log_handler.py | 45 +++++-------------- 3 files changed, 25 insertions(+), 65 deletions(-) diff --git a/tfw/components/process_management/log_inotify_observer.py b/tfw/components/process_management/log_inotify_observer.py index cbbd5dc..4ea88cf 100644 --- a/tfw/components/process_management/log_inotify_observer.py +++ b/tfw/components/process_management/log_inotify_observer.py @@ -7,7 +7,7 @@ from .supervisor import ProcessLogManager class LogInotifyObserver(InotifyObserver, ProcessLogManager): - def __init__(self, connector, supervisor_uri, process_name, log_tail=0): + def __init__(self, connector, process_name, supervisor_uri, log_tail=0): self._prevent_log_recursion() self._connector = connector self._process_name = process_name @@ -36,10 +36,7 @@ class LogInotifyObserver(InotifyObserver, ProcessLogManager): def on_modified(self, event): self._connector.send_message({ - 'key': 'processlog', - 'data': { - 'command': 'new_log', - 'stdout': self.read_stdout(self.process_name, tail=self.log_tail), - 'stderr': self.read_stderr(self.process_name, tail=self.log_tail) - } + 'key': 'log.new', + 'stdout': self.read_stdout(self.process_name, tail=self.log_tail), + 'stderr': self.read_stderr(self.process_name, tail=self.log_tail) }, Scope.BROADCAST) diff --git a/tfw/components/process_management/process_handler.py b/tfw/components/process_management/process_handler.py index 1c54118..80e1a4e 100644 --- a/tfw/components/process_management/process_handler.py +++ b/tfw/components/process_management/process_handler.py @@ -9,43 +9,31 @@ LOG = logging.getLogger(__name__) class ProcessHandler(ProcessManager, ProcessLogManager): - keys = ['processmanager'] - """ - Event handler that can manage processes managed by supervisor. - - This EventHandler accepts messages that have a data['command'] key specifying - a command to be executed. - Every message must contain a data['process_name'] field with the name of the - process to manage. This is the name specified in supervisor config files like so: - [program:someprogram] - - Commands available: start, stop, restart, readlog - (the names are as self-documenting as it gets) - """ + keys = ['process.start', 'process.stop', 'process.restart'] def __init__(self, *, supervisor_uri, log_tail=0): ProcessManager.__init__(self, supervisor_uri) ProcessLogManager.__init__(self, supervisor_uri) self.log_tail = log_tail + self.commands = { - 'start': self.start_process, - 'stop': self.stop_process, - 'restart': self.restart_process + 'process.start': self.start_process, + 'process.stop': self.stop_process, + 'process.restart': self.restart_process } def handle_event(self, message, connector): try: - data = message['data'] try: - self.commands[data['command']](data['process_name']) + self.commands[message['key']](message['name']) except SupervisorFault as fault: - message['data']['error'] = fault.faultString + message['error'] = fault.faultString finally: - message['data']['stdout'] = self.read_stdout( - data['process_name'], + message['stdout'] = self.read_stdout( + message['name'], self.log_tail ) - message['data']['stderr'] = self.read_stderr( - data['process_name'], + message['stderr'] = self.read_stderr( + message['name'], self.log_tail ) connector.send_message(message, scope=Scope.WEBSOCKET) diff --git a/tfw/components/process_management/process_log_handler.py b/tfw/components/process_management/process_log_handler.py index 8386120..b23874e 100644 --- a/tfw/components/process_management/process_log_handler.py +++ b/tfw/components/process_management/process_log_handler.py @@ -6,63 +6,38 @@ LOG = logging.getLogger(__name__) class ProcessLogHandler: - keys = ['logmonitor'] - """ - Monitors the output of a supervisor process (stdout, stderr) and - sends the results to the frontend. + keys = ['log.set'] - Accepts messages that have a data['command'] key specifying - a command to be executed. - - The API of each command is documented in their respective handler. - """ def __init__(self, *, process_name, supervisor_uri, log_tail=0): - self.connector = None + self.connector, self._monitor = None, None self.process_name = process_name self._supervisor_uri = supervisor_uri self._initial_log_tail = log_tail - self._monitor = None self.command_handlers = { - 'process_name': self.handle_process_name, - 'log_tail': self.handle_log_tail + 'log.set': self.handle_set } def start(self): self._monitor = LogInotifyObserver( connector=self.connector, - supervisor_uri=self._supervisor_uri, process_name=self.process_name, + supervisor_uri=self._supervisor_uri, log_tail=self._initial_log_tail ) self._monitor.start() def handle_event(self, message, _): try: - data = message['data'] - self.command_handlers[data['command']](data) + self.command_handlers[message['key']](message) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) - def handle_process_name(self, data): - """ - Changes the monitored process. - - :param data: TFW message data containing 'value' - (name of the process to monitor) - """ - self._monitor.process_name = data['value'] - - def handle_log_tail(self, data): - """ - Sets tail length of the log the monitor will send - to the frontend (the monitor will send back the last - 'value' characters of the log). - - :param data: TFW message data containing 'value' - (new tail length) - """ - self._monitor.log_tail = data['value'] + def handle_set(self, data): + if data.get('name'): + self._monitor.process_name = data['name'] + if data.get('tail'): + self._monitor.log_tail = data['tail'] def cleanup(self): self._monitor.stop() From f8ff0bcbb4435225f230fa4974f42683be363e3e Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Wed, 7 Aug 2019 09:49:05 +0200 Subject: [PATCH 05/10] Simplify sending bot messages --- tfw/components/frontend/message_sender.py | 58 ++++++++--------------- 1 file changed, 20 insertions(+), 38 deletions(-) diff --git a/tfw/components/frontend/message_sender.py b/tfw/components/frontend/message_sender.py index 72915c2..0f4f249 100644 --- a/tfw/components/frontend/message_sender.py +++ b/tfw/components/frontend/message_sender.py @@ -1,48 +1,30 @@ class MessageSender: - """ - Provides mechanisms to send messages to our frontend messaging component. - """ def __init__(self, uplink): self.uplink = uplink - self.key = 'message' - self.queue_key = 'queueMessages' - def send(self, originator, message): - """ - Sends a message. - :param originator: name of sender to be displayed on the frontend - :param message: message to send - """ + def send(self, message, originator=None): message = { - 'key': self.key, - 'data': { - 'originator': originator, - 'message': message - } + 'key': 'message.send', + 'message': message } + if originator: + message['originator'] = originator self.uplink.send_message(message) - def queue_messages(self, originator, messages): - """ - Queues a list of messages to be displayed in a chatbot-like manner. - :param originator: name of sender to be displayed on the frontend - :param messages: list of messages to queue - """ - message = { - 'key': self.queue_key, - 'data': { - 'messages': [ - {'message': message, 'originator': originator} - for message in messages - ] - } + def queue_messages(self, messages, originator=None): + message_queue = { + 'key': 'message.queue', + 'value': [] } - self.uplink.send_message(message) + for message in messages: + next_message = {'message': message} + if originator: + next_message['originator'] = originator + message_queue['value'].append(next_message) + self.uplink.send_message(message_queue) - @staticmethod - def generate_messages_from_queue(queue_message): - for message in queue_message['data']['messages']: - yield { - 'key': 'message', - 'data': message - } + def set_originator(self, originator): + self.uplink.send_message({ + 'key': 'message.config', + 'originator': originator + }) From 09ffe2bdcf4639697007056c88e51ee4a82c2800 Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Wed, 7 Aug 2019 09:49:25 +0200 Subject: [PATCH 06/10] Forward relevant messages to the frontend --- tfw/components/frontend/frontend_handler.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tfw/components/frontend/frontend_handler.py b/tfw/components/frontend/frontend_handler.py index 5829ad7..7e35bc5 100644 --- a/tfw/components/frontend/frontend_handler.py +++ b/tfw/components/frontend/frontend_handler.py @@ -4,7 +4,21 @@ from .message_storage import FrontendMessageStorage class FrontendHandler: - keys = ['message', 'queueMessages', 'dashboard', 'console'] + # TODO: do not store dashboard messages like reloadFrontend + keys = [ + 'console.read', + 'console.write', + 'console.showLiveLogs', + 'console.rewriteContentWithProcessLogsOnDeploy', + 'dashboard.layout', + 'dashboard.hideMessages', + 'dashboard.terminalMenuItem', + 'dashboard.reloadFrontend', + 'dashboard.reloadIframe', + 'message.config', + 'message.queue', + 'message.send' + ] def __init__(self): self.connector = None From 13179e59ebb91141e63d8b6402b4acda0a92815f Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Wed, 7 Aug 2019 11:24:56 +0200 Subject: [PATCH 07/10] Restore generate_messages_from_queue() --- tfw/components/frontend/message_sender.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tfw/components/frontend/message_sender.py b/tfw/components/frontend/message_sender.py index 0f4f249..93826df 100644 --- a/tfw/components/frontend/message_sender.py +++ b/tfw/components/frontend/message_sender.py @@ -28,3 +28,11 @@ class MessageSender: 'key': 'message.config', 'originator': originator }) + + @staticmethod + def generate_messages_from_queue(queue_message): + for message in queue_message['value']: + yield { + 'key': 'message.send', + **message + } From f5e7d6016aff6d77b101c74a2be705d116663fc2 Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Thu, 8 Aug 2019 08:27:41 +0200 Subject: [PATCH 08/10] Fix condition in PR #62 --- tfw/components/fsm/fsm_handler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tfw/components/fsm/fsm_handler.py b/tfw/components/fsm/fsm_handler.py index 61f901e..c9cb6eb 100644 --- a/tfw/components/fsm/fsm_handler.py +++ b/tfw/components/fsm/fsm_handler.py @@ -31,7 +31,8 @@ class FSMHandler: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) def handle_step(self, message): - return message if self.fsm.step(message['trigger']) else None + if self.fsm.step(message['trigger']): + return message def handle_update(self, message): # pylint: disable=no-self-use From 35e5b595d137f7bbed4f04f5f739bf3ec7624d90 Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Thu, 8 Aug 2019 14:45:59 +0200 Subject: [PATCH 09/10] Rename message keys --- tfw/components/fsm/fsm_handler.py | 6 +++--- tfw/components/fsm/fsm_updater.py | 2 +- tfw/components/process_management/log_inotify_observer.py | 2 +- tfw/components/process_management/process_log_handler.py | 4 ++-- tfw/internals/event_handling/fsm_aware.py | 8 ++------ 5 files changed, 9 insertions(+), 13 deletions(-) diff --git a/tfw/components/fsm/fsm_handler.py b/tfw/components/fsm/fsm_handler.py index c9cb6eb..5d28c1f 100644 --- a/tfw/components/fsm/fsm_handler.py +++ b/tfw/components/fsm/fsm_handler.py @@ -9,7 +9,7 @@ LOG = logging.getLogger(__name__) class FSMHandler: - keys = ['fsm.step', 'fsm.update'] + keys = ['fsm.step', 'fsm.announce'] def __init__(self, *, fsm_type): self.fsm = fsm_type() self._fsm_updater = FSMUpdater(self.fsm) @@ -17,7 +17,7 @@ class FSMHandler: self.command_handlers = { 'fsm.step' : self.handle_step, - 'fsm.update' : self.handle_update + 'fsm.announce' : self.handle_announce } def handle_event(self, message, connector): @@ -34,6 +34,6 @@ class FSMHandler: if self.fsm.step(message['trigger']): return message - def handle_update(self, message): + def handle_announce(self, message): # pylint: disable=no-self-use return message diff --git a/tfw/components/fsm/fsm_updater.py b/tfw/components/fsm/fsm_updater.py index d1b42ec..276b65d 100644 --- a/tfw/components/fsm/fsm_updater.py +++ b/tfw/components/fsm/fsm_updater.py @@ -5,7 +5,7 @@ class FSMUpdater: @property def fsm_update(self): return { - 'key': 'fsm.announce', + 'key': 'fsm.update', **self.fsm_update_data } diff --git a/tfw/components/process_management/log_inotify_observer.py b/tfw/components/process_management/log_inotify_observer.py index 4ea88cf..97b6767 100644 --- a/tfw/components/process_management/log_inotify_observer.py +++ b/tfw/components/process_management/log_inotify_observer.py @@ -36,7 +36,7 @@ class LogInotifyObserver(InotifyObserver, ProcessLogManager): def on_modified(self, event): self._connector.send_message({ - 'key': 'log.new', + 'key': 'process.log.new', 'stdout': self.read_stdout(self.process_name, tail=self.log_tail), 'stderr': self.read_stderr(self.process_name, tail=self.log_tail) }, Scope.BROADCAST) diff --git a/tfw/components/process_management/process_log_handler.py b/tfw/components/process_management/process_log_handler.py index b23874e..144b1be 100644 --- a/tfw/components/process_management/process_log_handler.py +++ b/tfw/components/process_management/process_log_handler.py @@ -6,7 +6,7 @@ LOG = logging.getLogger(__name__) class ProcessLogHandler: - keys = ['log.set'] + keys = ['process.log.set'] def __init__(self, *, process_name, supervisor_uri, log_tail=0): self.connector, self._monitor = None, None @@ -15,7 +15,7 @@ class ProcessLogHandler: self._initial_log_tail = log_tail self.command_handlers = { - 'log.set': self.handle_set + 'process.log.set': self.handle_set } def start(self): diff --git a/tfw/internals/event_handling/fsm_aware.py b/tfw/internals/event_handling/fsm_aware.py index b700dba..b888220 100644 --- a/tfw/internals/event_handling/fsm_aware.py +++ b/tfw/internals/event_handling/fsm_aware.py @@ -6,11 +6,7 @@ LOG = logging.getLogger(__name__) class FSMAware: - keys = ['fsm.announce'] - """ - Base class for stuff that has to be aware of the framework FSM. - This is done by processing 'fsm_update' messages. - """ + keys = ['fsm.update'] def __init__(self): self.fsm_state = None self.fsm_in_accepted_state = False @@ -18,7 +14,7 @@ class FSMAware: self._auth_key = KeyManager().auth_key def process_message(self, message): - if message['key'] == 'fsm.announce': + if message['key'] == 'fsm.update': if verify_message(self._auth_key, message): self._handle_fsm_update(message) From b5e53cb9467acc2528e3d326db6a635349256cf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Thu, 8 Aug 2019 15:05:37 +0200 Subject: [PATCH 10/10] Use prefix matching in ZMQConnector --- tfw/components/frontend/frontend_handler.py | 40 ++++++++++--------- tfw/components/frontend/message_storage.py | 29 +++++++++----- tfw/components/fsm/fsm_handler.py | 3 +- tfw/components/ide/ide_handler.py | 2 +- .../process_management/process_handler.py | 6 ++- .../process_management/process_log_handler.py | 2 +- tfw/components/snapshots/snapshot_handler.py | 27 +++++-------- tfw/components/terminal/terminal_handler.py | 2 +- tfw/internals/networking/zmq_connector.py | 16 +------- 9 files changed, 62 insertions(+), 65 deletions(-) diff --git a/tfw/components/frontend/frontend_handler.py b/tfw/components/frontend/frontend_handler.py index 7e35bc5..3b8a76a 100644 --- a/tfw/components/frontend/frontend_handler.py +++ b/tfw/components/frontend/frontend_handler.py @@ -4,26 +4,25 @@ from .message_storage import FrontendMessageStorage class FrontendHandler: - # TODO: do not store dashboard messages like reloadFrontend - keys = [ - 'console.read', - 'console.write', - 'console.showLiveLogs', - 'console.rewriteContentWithProcessLogsOnDeploy', - 'dashboard.layout', - 'dashboard.hideMessages', - 'dashboard.terminalMenuItem', - 'dashboard.reloadFrontend', - 'dashboard.reloadIframe', - 'message.config', - 'message.queue', - 'message.send' - ] + # keys = [ + # 'console.read', + # 'console.write', + # 'console.showLiveLogs', + # 'console.rewriteContentWithProcessLogsOnDeploy', + # 'dashboard.layout', + # 'dashboard.hideMessages', + # 'dashboard.terminalMenuItem', + # 'dashboard.reloadFrontend', + # 'dashboard.reloadIframe', + # 'message.config', + # 'message.queue', + # 'message.send' + # ] + keys = ['console', 'dashboard', 'message', 'ide.read', 'recover'] def __init__(self): self.connector = None - self.keys = [*type(self).keys, 'recover'] - self._frontend_message_storage = FrontendMessageStorage(type(self).keys) + self._frontend_message_storage = FrontendMessageStorage() def send_message(self, message): self.connector.send_message(message, scope=Scope.WEBSOCKET) @@ -32,7 +31,12 @@ class FrontendHandler: self._frontend_message_storage.save_message(message) if message['key'] == 'recover': self.recover_frontend() - self.send_message(message) + if self._filter_message(message): + self.send_message(message) + + @staticmethod + def _filter_message(message): + return not message['key'].startswith('ide') def recover_frontend(self): for message in self._frontend_message_storage.messages: diff --git a/tfw/components/frontend/message_storage.py b/tfw/components/frontend/message_storage.py index 1c63ffc..2478827 100644 --- a/tfw/components/frontend/message_storage.py +++ b/tfw/components/frontend/message_storage.py @@ -29,16 +29,25 @@ class MessageStorage(ABC): class FrontendMessageStorage(MessageStorage): - def __init__(self, keys): - self._keys = keys - super().__init__() - def _filter_message(self, message): - key = message['key'] - return key in self._keys + return message['key'].startswith(( + 'console.write', + 'dashboard.layout', + 'dashboard.terminalMenuItem', + 'message.send', + 'message.config', + 'ide.read' + )) def _transform_message(self, message): - if message['key'] == 'queueMessages': - yield from MessageSender.generate_messages_from_queue(message) - else: - yield message + transformations = { + 'message.queue': MessageSender.generate_messages_from_queue, + 'ide.read': self._delete_ide_content + } + if message['key'] in transformations: + yield from transformations[message['key']](message) + + @staticmethod + def _delete_ide_content(message): + del message['content'] + yield message diff --git a/tfw/components/fsm/fsm_handler.py b/tfw/components/fsm/fsm_handler.py index 5d28c1f..24b8231 100644 --- a/tfw/components/fsm/fsm_handler.py +++ b/tfw/components/fsm/fsm_handler.py @@ -9,7 +9,8 @@ LOG = logging.getLogger(__name__) class FSMHandler: - keys = ['fsm.step', 'fsm.announce'] + keys = ['fsm'] + def __init__(self, *, fsm_type): self.fsm = fsm_type() self._fsm_updater = FSMUpdater(self.fsm) diff --git a/tfw/components/ide/ide_handler.py b/tfw/components/ide/ide_handler.py index f7435d6..bf1307b 100644 --- a/tfw/components/ide/ide_handler.py +++ b/tfw/components/ide/ide_handler.py @@ -33,7 +33,7 @@ BUILD_ARTIFACTS = ( class IdeHandler: - keys = ['ide.read', 'ide.write'] + keys = ['ide'] def __init__(self, *, patterns, initial_file=''): self.connector = None diff --git a/tfw/components/process_management/process_handler.py b/tfw/components/process_management/process_handler.py index 80e1a4e..33e58fa 100644 --- a/tfw/components/process_management/process_handler.py +++ b/tfw/components/process_management/process_handler.py @@ -9,7 +9,8 @@ LOG = logging.getLogger(__name__) class ProcessHandler(ProcessManager, ProcessLogManager): - keys = ['process.start', 'process.stop', 'process.restart'] + keys = ['process'] + def __init__(self, *, supervisor_uri, log_tail=0): ProcessManager.__init__(self, supervisor_uri) ProcessLogManager.__init__(self, supervisor_uri) @@ -38,4 +39,5 @@ class ProcessHandler(ProcessManager, ProcessLogManager): ) connector.send_message(message, scope=Scope.WEBSOCKET) except KeyError: - LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) + if not message['key'].startswith('process.log'): + LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) diff --git a/tfw/components/process_management/process_log_handler.py b/tfw/components/process_management/process_log_handler.py index 144b1be..9e3bb42 100644 --- a/tfw/components/process_management/process_log_handler.py +++ b/tfw/components/process_management/process_log_handler.py @@ -6,7 +6,7 @@ LOG = logging.getLogger(__name__) class ProcessLogHandler: - keys = ['process.log.set'] + keys = ['process.log'] def __init__(self, *, process_name, supervisor_uri, log_tail=0): self.connector, self._monitor = None, None diff --git a/tfw/components/snapshots/snapshot_handler.py b/tfw/components/snapshots/snapshot_handler.py index f992b07..0c8f797 100644 --- a/tfw/components/snapshots/snapshot_handler.py +++ b/tfw/components/snapshots/snapshot_handler.py @@ -6,8 +6,6 @@ from datetime import datetime from dateutil import parser as dateparser -from tfw.internals.networking import Scope - from .snapshot_provider import SnapshotProvider LOG = logging.getLogger(__name__) @@ -23,9 +21,9 @@ class SnapshotHandler: self.init_snapshot_providers(directories) self.command_handlers = { - 'take_snapshot': self.handle_take_snapshot, - 'restore_snapshot': self.handle_restore_snapshot, - 'exclude': self.handle_exclude + 'snapshot.take': self.handle_take_snapshot, + 'snapshot.restore': self.handle_restore_snapshot, + 'snapshot.exclude': self.handle_exclude } def init_snapshot_providers(self, directories): @@ -45,23 +43,20 @@ class SnapshotHandler: makedirs(git_dir, exist_ok=True) return git_dir - def handle_event(self, message, connector): + def handle_event(self, message, _): try: - data = message['data'] - message['data'] = self.command_handlers[data['command']](data) - connector.send_message(message, scope=Scope.WEBSOCKET) + self.command_handlers[message['key']](message) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) - def handle_take_snapshot(self, data): + def handle_take_snapshot(self, _): LOG.debug('Taking snapshots of directories %s', self.snapshot_providers.keys()) for provider in self.snapshot_providers.values(): provider.take_snapshot() - return data - def handle_restore_snapshot(self, data): + def handle_restore_snapshot(self, message): date = dateparser.parse( - data.get( + message.get( 'value', datetime.now().isoformat() ) @@ -73,13 +68,11 @@ class SnapshotHandler: ) for provider in self.snapshot_providers.values(): provider.restore_snapshot(date) - return data - def handle_exclude(self, data): - exclude_unix_patterns = data['value'] + def handle_exclude(self, message): + exclude_unix_patterns = message['value'] if not isinstance(exclude_unix_patterns, list): raise KeyError for provider in self.snapshot_providers.values(): provider.exclude = exclude_unix_patterns - return data diff --git a/tfw/components/terminal/terminal_handler.py b/tfw/components/terminal/terminal_handler.py index 7d3e041..dd17e97 100644 --- a/tfw/components/terminal/terminal_handler.py +++ b/tfw/components/terminal/terminal_handler.py @@ -7,7 +7,7 @@ LOG = logging.getLogger(__name__) class TerminalHandler: - keys = ['terminal.write'] + keys = ['terminal'] def __init__(self, *, port, user, working_directory, histfile): self.connector, self._historymonitor = None, None diff --git a/tfw/internals/networking/zmq_connector.py b/tfw/internals/networking/zmq_connector.py index 7dfa77c..99920c7 100644 --- a/tfw/internals/networking/zmq_connector.py +++ b/tfw/internals/networking/zmq_connector.py @@ -15,8 +15,6 @@ LOG = logging.getLogger(__name__) class ZMQDownlinkConnector: def __init__(self, connect_addr): - self.keys = [] - self._on_recv_callback = None self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB) self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_sub_socket.connect(connect_addr) @@ -25,19 +23,14 @@ class ZMQDownlinkConnector: def subscribe(self, *keys): for key in keys: self._zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, key) - self.keys.append(key) def unsubscribe(self, *keys): for key in keys: self._zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, key) - self.keys.remove(key) def register_callback(self, callback): - if callback: - self._on_recv_callback = callback - self._zmq_sub_stream.on_recv(with_deserialize_tfw_msg(self._on_recv)) - else: - self._zmq_sub_stream.on_recv(None) + callback = with_deserialize_tfw_msg(callback) if callback else None + self._zmq_sub_stream.on_recv(callback) def recv_message(self, *, block=True): if self._zmq_sub_stream.receiving(): @@ -48,11 +41,6 @@ class ZMQDownlinkConnector: except zmq.ZMQError: raise IOError("No data available to recv!") - def _on_recv(self, message): - key = message['key'] - if key in self.keys or '' in self.keys: - self._on_recv_callback(message) - def close(self): self._zmq_sub_stream.close()