Merge pull request #62 from avatao-content/api-rework

Api rework
This commit is contained in:
therealkrispet 2019-08-08 16:32:30 +02:00 committed by GitHub
commit 81790e18e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 249 additions and 527 deletions

View File

@ -4,12 +4,25 @@ from .message_storage import FrontendMessageStorage
class FrontendHandler:
keys = ['message', 'queueMessages', 'dashboard', 'console']
# keys = [
# 'console.read',
# 'console.write',
# 'console.showLiveLogs',
# 'console.rewriteContentWithProcessLogsOnDeploy',
# 'dashboard.layout',
# 'dashboard.hideMessages',
# 'dashboard.terminalMenuItem',
# 'dashboard.reloadFrontend',
# 'dashboard.reloadIframe',
# 'message.config',
# 'message.queue',
# 'message.send'
# ]
keys = ['console', 'dashboard', 'message', 'ide.read', 'recover']
def __init__(self):
self.connector = None
self.keys = [*type(self).keys, 'recover']
self._frontend_message_storage = FrontendMessageStorage(type(self).keys)
self._frontend_message_storage = FrontendMessageStorage()
def send_message(self, message):
self.connector.send_message(message, scope=Scope.WEBSOCKET)
@ -18,7 +31,12 @@ class FrontendHandler:
self._frontend_message_storage.save_message(message)
if message['key'] == 'recover':
self.recover_frontend()
self.send_message(message)
if self._filter_message(message):
self.send_message(message)
@staticmethod
def _filter_message(message):
return not message['key'].startswith('ide')
def recover_frontend(self):
for message in self._frontend_message_storage.messages:

View File

@ -1,48 +1,38 @@
class MessageSender:
"""
Provides mechanisms to send messages to our frontend messaging component.
"""
def __init__(self, uplink):
self.uplink = uplink
self.key = 'message'
self.queue_key = 'queueMessages'
def send(self, originator, message):
"""
Sends a message.
:param originator: name of sender to be displayed on the frontend
:param message: message to send
"""
def send(self, message, originator=None):
message = {
'key': self.key,
'data': {
'originator': originator,
'message': message
}
'key': 'message.send',
'message': message
}
if originator:
message['originator'] = originator
self.uplink.send_message(message)
def queue_messages(self, originator, messages):
"""
Queues a list of messages to be displayed in a chatbot-like manner.
:param originator: name of sender to be displayed on the frontend
:param messages: list of messages to queue
"""
message = {
'key': self.queue_key,
'data': {
'messages': [
{'message': message, 'originator': originator}
for message in messages
]
}
def queue_messages(self, messages, originator=None):
message_queue = {
'key': 'message.queue',
'value': []
}
self.uplink.send_message(message)
for message in messages:
next_message = {'message': message}
if originator:
next_message['originator'] = originator
message_queue['value'].append(next_message)
self.uplink.send_message(message_queue)
def set_originator(self, originator):
self.uplink.send_message({
'key': 'message.config',
'originator': originator
})
@staticmethod
def generate_messages_from_queue(queue_message):
for message in queue_message['data']['messages']:
for message in queue_message['value']:
yield {
'key': 'message',
'data': message
'key': 'message.send',
**message
}

View File

@ -29,16 +29,25 @@ class MessageStorage(ABC):
class FrontendMessageStorage(MessageStorage):
def __init__(self, keys):
self._keys = keys
super().__init__()
def _filter_message(self, message):
key = message['key']
return key in self._keys
return message['key'].startswith((
'console.write',
'dashboard.layout',
'dashboard.terminalMenuItem',
'message.send',
'message.config',
'ide.read'
))
def _transform_message(self, message):
if message['key'] == 'queueMessages':
yield from MessageSender.generate_messages_from_queue(message)
else:
yield message
transformations = {
'message.queue': MessageSender.generate_messages_from_queue,
'ide.read': self._delete_ide_content
}
if message['key'] in transformations:
yield from transformations[message['key']](message)
@staticmethod
def _delete_ide_content(message):
del message['content']
yield message

View File

@ -1,6 +1,6 @@
import logging
from tfw.internals.crypto import KeyManager, sign_message, verify_message
from tfw.internals.crypto import KeyManager, sign_message
from tfw.internals.networking import Scope
from .fsm_updater import FSMUpdater
@ -10,56 +10,31 @@ LOG = logging.getLogger(__name__)
class FSMHandler:
keys = ['fsm']
"""
EventHandler responsible for managing the state machine of
the framework (TFW FSM).
tfw.networking.TFWServer instances automatically send 'trigger'
commands to the event handler listening on the 'fsm' key,
which should be an instance of this event handler.
This event handler accepts messages that have a
data['command'] key specifying a command to be executed.
An 'fsm_update' message is broadcasted after every successful
command.
"""
def __init__(self, *, fsm_type):
self.fsm = fsm_type()
self._fsm_updater = FSMUpdater(self.fsm)
self.auth_key = KeyManager().auth_key
self.command_handlers = {
'trigger': self.handle_trigger,
'update': self.handle_update
'fsm.step' : self.handle_step,
'fsm.announce' : self.handle_announce
}
def handle_event(self, message, connector):
try:
message = self.command_handlers[message['data']['command']](message)
message = self.command_handlers[message['key']](message)
if message:
fsm_update_message = self._fsm_updater.fsm_update
sign_message(self.auth_key, message)
sign_message(self.auth_key, fsm_update_message)
connector.send_message(fsm_update_message, Scope.BROADCAST)
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def handle_trigger(self, message):
"""
Attempts to step the FSM with the supplied trigger.
:param message: TFW message with a data field containing
the action to try triggering in data['value']
"""
trigger = message['data']['value']
if self.fsm.step(trigger):
def handle_step(self, message):
if self.fsm.step(message['trigger']):
return message
return None
def handle_update(self, message):
"""
Does nothing, but triggers an 'fsm_update' message.
"""
def handle_announce(self, message):
# pylint: disable=no-self-use
return message

View File

@ -5,7 +5,7 @@ class FSMUpdater:
@property
def fsm_update(self):
return {
'key': 'fsm_update',
'key': 'fsm.update',
**self.fsm_update_data
}

View File

@ -1,93 +1,56 @@
from typing import Iterable
from functools import wraps
from glob import glob
from fnmatch import fnmatchcase
from os.path import basename, isfile, join, relpath, exists, isdir, realpath
from os.path import dirname, isdir, isfile, realpath
class FileManager: # pylint: disable=too-many-instance-attributes
def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None):
self._exclude, self.exclude = [], exclude
self._allowed_directories, self.allowed_directories = None, allowed_directories
self._workdir, self.workdir = None, working_directory
self._filename, self.filename = None, selected_file or self.files[0]
def _with_is_allowed(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if self._is_allowed(args[0]): # pylint: disable=protected-access
return func(self, *args, **kwargs)
raise ValueError('Forbidden path.')
return wrapper
@property
def exclude(self):
return self._exclude
@exclude.setter
def exclude(self, exclude):
if exclude is None:
return
if not isinstance(exclude, Iterable):
raise TypeError('Exclude must be Iterable!')
self._exclude = exclude
@property
def workdir(self):
return self._workdir
@workdir.setter
def workdir(self, directory):
if not exists(directory) or not isdir(directory):
raise EnvironmentError(f'"{directory}" is not a directory!')
if not self._is_in_allowed_dir(directory):
raise EnvironmentError(f'Directory "{directory}" is not allowed!')
self._workdir = directory
@property
def allowed_directories(self):
return self._allowed_directories
@allowed_directories.setter
def allowed_directories(self, directories):
self._allowed_directories = [realpath(directory) for directory in directories]
@property
def filename(self):
return self._filename
@filename.setter
def filename(self, filename):
if filename not in self.files:
raise EnvironmentError('No such file in workdir!')
self._filename = filename
class FileManager: # pylint: disable=too-many-instance-attributes
def __init__(self, patterns):
self.patterns = patterns
@property
def files(self):
return [
self._relpath(file)
for file in glob(join(self._workdir, '**/*'), recursive=True)
if isfile(file)
and self._is_in_allowed_dir(file)
and not self._is_blacklisted(file)
]
return list(set(
path
for pattern in self.patterns
for path in glob(pattern, recursive=True)
if isfile(path) and self._is_allowed(path)
))
@property
def file_contents(self):
with open(self._filepath(self.filename), 'rb', buffering=0) as ifile:
def parents(self):
return list(set(
self._find_directory(pattern)
for pattern in self.patterns
))
@staticmethod
def _find_directory(pattern):
while pattern and not isdir(pattern):
pattern = dirname(pattern)
return pattern
def _is_allowed(self, filepath):
return any(
fnmatchcase(realpath(filepath), pattern)
for pattern in self.patterns
)
@_with_is_allowed
def read_file(self, filepath): # pylint: disable=no-self-use
with open(filepath, 'rb', buffering=0) as ifile:
return ifile.read().decode(errors='surrogateescape')
@file_contents.setter
def file_contents(self, value):
with open(self._filepath(self.filename), 'wb', buffering=0) as ofile:
ofile.write(value.encode())
def _is_in_allowed_dir(self, path):
return any(
realpath(path).startswith(allowed_dir)
for allowed_dir in self.allowed_directories
)
def _is_blacklisted(self, file):
return any(
fnmatchcase(file, blacklisted) or
fnmatchcase(basename(file), blacklisted)
for blacklisted in self.exclude
)
def _filepath(self, filename):
return join(self._workdir, filename)
def _relpath(self, filename):
return relpath(self._filepath(filename), start=self._workdir)
@_with_is_allowed
def write_file(self, filepath, contents): # pylint: disable=no-self-use
with open(filepath, 'wb', buffering=0) as ofile:
ofile.write(contents.encode())

View File

@ -1,8 +1,8 @@
# pylint: disable=redefined-outer-name
from dataclasses import dataclass
from secrets import token_urlsafe
from os import mkdir, symlink
from os.path import join
from os import chdir, mkdir, symlink
from pathlib import Path
from tempfile import TemporaryDirectory
@ -13,112 +13,75 @@ from .file_manager import FileManager
@dataclass
class ManagerContext:
folder: str
workdir: str
subdir: str
subfile: str
manager: FileManager
def join(self, path):
return join(self.folder, path)
def create_random_file(self, dirname, extension):
filename = self.join(f'{dirname}/{generate_name()}{extension}')
Path(filename).touch()
return filename
def create_random_folder(self, basepath):
dirname = self.join(f'{basepath}/{generate_name()}')
mkdir(dirname)
return dirname
def create_random_link(self, source, dirname, extension):
linkname = self.join(f'{dirname}/{generate_name()}{extension}')
symlink(source, linkname)
return linkname
def join(self, path):
return join(self.workdir, path)
def generate_name():
return token_urlsafe(16)
@pytest.fixture()
def context():
dirs = {}
with TemporaryDirectory() as workdir:
chdir(workdir)
for name in ['allowed', 'excluded', 'invis']:
node = join(workdir, name)
mkdir(node)
Path(join(node, 'empty.txt')).touch()
Path(join(node, 'empty.bin')).touch()
dirs[name] = node
subdir = join(workdir, generate_name())
subfile = join(subdir, generate_name() + '.txt')
mkdir(subdir)
Path(subfile).touch()
manager = FileManager([join(workdir, '**/*.txt')])
yield ManagerContext(workdir, subdir, subfile, manager)
yield ManagerContext(
workdir,
FileManager(
dirs['allowed'],
[dirs['allowed'], dirs['excluded']],
exclude=['*/excluded/*']
)
)
def test_matching_files(context):
newdir = context.create_random_folder(context.subdir)
newfile = context.create_random_file(newdir, '.txt')
newlink = context.create_random_link(newfile, newdir, '.txt')
assert set(context.manager.files) == {context.subfile, newfile, newlink}
@pytest.mark.parametrize('subdir', ['allowed/', 'excluded/'])
def test_select_allowed_dirs(context, subdir):
context.manager.workdir = context.join(subdir)
assert context.manager.workdir == context.join(subdir)
newdir = context.join(subdir+'deep')
mkdir(newdir)
context.manager.workdir = newdir
assert context.manager.workdir == newdir
def test_unmatching_files(context):
newtxt = context.create_random_file(context.workdir, '.txt')
newbin = context.create_random_file(context.subdir, '.bin')
context.create_random_link(newtxt, context.subdir, '.txt')
context.create_random_link(newbin, context.subdir, '.txt')
assert context.manager.files == [context.subfile]
@pytest.mark.parametrize('invdir', ['', 'invis'])
def test_select_forbidden_dirs(context, invdir):
fullpath = context.join(invdir)
with pytest.raises(OSError):
context.manager.workdir = fullpath
assert context.manager.workdir != fullpath
context.manager.allowed_directories += [fullpath]
context.manager.workdir = fullpath
assert context.manager.workdir == fullpath
@pytest.mark.parametrize('filename', ['another.txt', '*.txt'])
def test_select_allowed_files(context, filename):
Path(context.join('allowed/'+filename)).touch()
assert filename in context.manager.files
context.manager.filename = filename
assert context.manager.filename == filename
@pytest.mark.parametrize('path', [
{'dir': 'allowed/', 'file': 'illegal.bin'},
{'dir': 'excluded/', 'file': 'legal.txt'},
{'dir': 'allowed/', 'file': token_urlsafe(16)+'.bin'},
{'dir': 'excluded/', 'file': token_urlsafe(16)+'.txt'},
{'dir': 'allowed/', 'file': token_urlsafe(32)+'.bin'},
{'dir': 'excluded/', 'file': token_urlsafe(32)+'.txt'}
])
def test_select_excluded_files(context, path):
context.manager.workdir = context.join(path['dir'])
context.manager.exclude = ['*/excluded/*', '*.bin']
Path(context.join(path['dir']+path['file'])).touch()
assert path['file'] not in context.manager.files
with pytest.raises(OSError):
context.manager.filename = path['file']
@pytest.mark.parametrize('path', [
{'src': 'excluded/empty.txt', 'dst': 'allowed/link.txt'},
{'src': 'invis/empty.txt', 'dst': 'allowed/link.txt'},
{'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'},
{'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'},
{'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'},
{'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'}
])
def test_select_excluded_symlinks(context, path):
symlink(context.join(path['src']), context.join(path['dst']))
assert path['dst'] not in context.manager.files
def test_parents(context):
newdir = context.create_random_folder(context.workdir)
context.manager.patterns += [f'{newdir}/[!/@]*/**/?.c']
assert set(context.manager.parents) == {context.workdir, newdir}
def test_read_write_file(context):
for _ in range(128):
context.manager.filename = 'empty.txt'
content = token_urlsafe(32)
context.manager.file_contents = content
assert context.manager.file_contents == content
with open(context.join('allowed/empty.txt'), 'r') as ifile:
context.manager.write_file(context.subfile, content)
assert context.manager.read_file(context.subfile) == content
with open(context.subfile, 'r') as ifile:
assert ifile.read() == content
def test_regular_ide_actions(context):
context.manager.workdir = context.join('allowed')
newfile1, newfile2 = token_urlsafe(16), token_urlsafe(16)
Path(context.join(f'allowed/{newfile1}')).touch()
Path(context.join(f'allowed/{newfile2}')).touch()
for _ in range(8):
newfile1 = context.create_random_file(context.subdir, '.txt')
newfile2 = context.create_random_file(context.subdir, '.txt')
for _ in range(4):
context.manager.filename = newfile1
content1 = token_urlsafe(32)
context.manager.file_contents = content1
context.manager.filename = newfile2
content2 = token_urlsafe(32)
context.manager.file_contents = content2
context.manager.filename = newfile1
assert context.manager.file_contents == content1
context.manager.filename = newfile2
assert context.manager.file_contents == content2
content1, content2 = token_urlsafe(32), token_urlsafe(32)
context.manager.write_file(newfile1, content1)
context.manager.write_file(newfile2, content2)
assert context.manager.read_file(newfile1) == content1
assert context.manager.read_file(newfile2) == content2

View File

@ -1,4 +1,5 @@
import logging
from os.path import isfile
from tfw.internals.networking import Scope
from tfw.internals.inotify import InotifyObserver
@ -33,160 +34,61 @@ BUILD_ARTIFACTS = (
class IdeHandler:
keys = ['ide']
# pylint: disable=too-many-arguments,anomalous-backslash-in-string
"""
Event handler implementing the backend of our browser based IDE.
By default all files in the directory specified in __init__ are displayed
on the fontend. Note that this is a stateful component.
When any file in the selected directory changes they are automatically refreshed
on the frontend (this is done by listening to inotify events).
This EventHandler accepts messages that have a data['command'] key specifying
a command to be executed.
The API of each command is documented in their respective handler.
"""
def __init__(self, *, directory, allowed_directories, selected_file=None, exclude=None):
"""
:param key: the key this instance should listen to
:param directory: working directory which the EventHandler should serve files from
:param allowed_directories: list of directories that can be switched to using selectdir
:param selected_file: file that is selected by default
:param exclude: list of filenames that should not appear between files (for .o, .pyc, etc.)
"""
def __init__(self, *, patterns, initial_file=''):
self.connector = None
try:
self.filemanager = FileManager(
allowed_directories=allowed_directories,
working_directory=directory,
selected_file=selected_file,
exclude=exclude
)
except IndexError:
raise EnvironmentError(
f'No file(s) in IdeEventHandler working_directory "{directory}"!'
)
self.filemanager = FileManager(patterns)
self._initial_file = initial_file
self.monitor = InotifyObserver(
self.filemanager.allowed_directories,
path=self.filemanager.parents,
exclude=BUILD_ARTIFACTS
)
self.monitor.on_modified = self._reload_frontend
self.monitor.start()
self.commands = {
'read': self.read,
'write': self.write,
'select': self.select,
'selectdir': self.select_dir,
'exclude': self.exclude
'ide.read' : self.read,
'ide.write' : self.write
}
@property
def initial_file(self):
if not isfile(self._initial_file):
self._initial_file = self.filemanager.files[0]
return self._initial_file
def _reload_frontend(self, event): # pylint: disable=unused-argument
self.send_message({
'key': 'ide',
'data': {'command': 'reload'}
})
self.send_message({'key': 'ide.reload'})
def send_message(self, message):
self.connector.send_message(message, scope=Scope.WEBSOCKET)
def read(self, data):
"""
Read the currently selected file.
:return dict: TFW message data containing key 'content'
(contents of the selected file)
"""
def read(self, message):
if message.get('files'):
self.filemanager.patterns = message['files']
try:
data['content'] = self.filemanager.file_contents
message['content'] = self.filemanager.read_file(message['filename'])
except PermissionError:
data['content'] = 'You have no permission to open that file :('
message['content'] = 'You have no permission to open that file :('
except FileNotFoundError:
data['content'] = 'This file was removed :('
message['content'] = 'This file was removed :('
except Exception: # pylint: disable=broad-except
data['content'] = 'Failed to read file :('
return data
message['content'] = 'Failed to read file :('
def write(self, data):
"""
Overwrites a file with the desired string.
:param data: TFW message data containing key 'content'
(new file content)
"""
def write(self, message):
try:
self.filemanager.file_contents = data['content']
self.filemanager.write_file(message['filename'], message['content'])
except Exception: # pylint: disable=broad-except
LOG.exception('Error writing file!')
del data['content']
return data
def select(self, data):
"""
Selects a file from the current directory.
:param data: TFW message data containing 'filename'
(name of file to select relative to the current directory)
"""
try:
self.filemanager.filename = data['filename']
except EnvironmentError:
LOG.exception('Failed to select file "%s"', data['filename'])
return data
def select_dir(self, data):
"""
Select a new working directory to display files from.
:param data: TFW message data containing 'directory'
(absolute path of diretory to select.
must be a path whitelisted in
self.allowed_directories)
"""
try:
self.filemanager.workdir = data['directory']
try:
self.filemanager.filename = self.filemanager.files[0]
self.read(data)
except IndexError:
data['content'] = 'No files in this directory :('
except EnvironmentError as err:
LOG.error(
'Failed to select directory "%s". Reason: %s',
data['directory'], str(err)
)
return data
def exclude(self, data):
"""
Overwrite list of excluded files
:param data: TFW message data containing 'exclude'
(list of unix-style filename patterns to be excluded,
e.g.: ["\*.pyc", "\*.o")
"""
try:
self.filemanager.exclude = list(data['exclude'])
except TypeError:
LOG.error('Exclude must be Iterable!')
return data
def attach_fileinfo(self, data):
"""
Basic information included in every response to the frontend.
"""
data['filename'] = self.filemanager.filename
data['files'] = self.filemanager.files
data['directory'] = self.filemanager.workdir
del message['content']
def handle_event(self, message, _):
try:
data = message['data']
message['data'] = self.commands[data['command']](data)
self.attach_fileinfo(data)
if message['filename'] == '':
message['filename'] = self.initial_file
self.commands[message['key']](message)
message['files'] = self.filemanager.files
self.send_message(message)
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -7,7 +7,7 @@ from .supervisor import ProcessLogManager
class LogInotifyObserver(InotifyObserver, ProcessLogManager):
def __init__(self, connector, supervisor_uri, process_name, log_tail=0):
def __init__(self, connector, process_name, supervisor_uri, log_tail=0):
self._prevent_log_recursion()
self._connector = connector
self._process_name = process_name
@ -36,10 +36,7 @@ class LogInotifyObserver(InotifyObserver, ProcessLogManager):
def on_modified(self, event):
self._connector.send_message({
'key': 'processlog',
'data': {
'command': 'new_log',
'stdout': self.read_stdout(self.process_name, tail=self.log_tail),
'stderr': self.read_stderr(self.process_name, tail=self.log_tail)
}
'key': 'process.log.new',
'stdout': self.read_stdout(self.process_name, tail=self.log_tail),
'stderr': self.read_stderr(self.process_name, tail=self.log_tail)
}, Scope.BROADCAST)

View File

@ -9,45 +9,35 @@ LOG = logging.getLogger(__name__)
class ProcessHandler(ProcessManager, ProcessLogManager):
keys = ['processmanager']
"""
Event handler that can manage processes managed by supervisor.
keys = ['process']
This EventHandler accepts messages that have a data['command'] key specifying
a command to be executed.
Every message must contain a data['process_name'] field with the name of the
process to manage. This is the name specified in supervisor config files like so:
[program:someprogram]
Commands available: start, stop, restart, readlog
(the names are as self-documenting as it gets)
"""
def __init__(self, *, supervisor_uri, log_tail=0):
ProcessManager.__init__(self, supervisor_uri)
ProcessLogManager.__init__(self, supervisor_uri)
self.log_tail = log_tail
self.commands = {
'start': self.start_process,
'stop': self.stop_process,
'restart': self.restart_process
'process.start': self.start_process,
'process.stop': self.stop_process,
'process.restart': self.restart_process
}
def handle_event(self, message, connector):
try:
data = message['data']
try:
self.commands[data['command']](data['process_name'])
self.commands[message['key']](message['name'])
except SupervisorFault as fault:
message['data']['error'] = fault.faultString
message['error'] = fault.faultString
finally:
message['data']['stdout'] = self.read_stdout(
data['process_name'],
message['stdout'] = self.read_stdout(
message['name'],
self.log_tail
)
message['data']['stderr'] = self.read_stderr(
data['process_name'],
message['stderr'] = self.read_stderr(
message['name'],
self.log_tail
)
connector.send_message(message, scope=Scope.WEBSOCKET)
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
if not message['key'].startswith('process.log'):
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -6,63 +6,38 @@ LOG = logging.getLogger(__name__)
class ProcessLogHandler:
keys = ['logmonitor']
"""
Monitors the output of a supervisor process (stdout, stderr) and
sends the results to the frontend.
keys = ['process.log']
Accepts messages that have a data['command'] key specifying
a command to be executed.
The API of each command is documented in their respective handler.
"""
def __init__(self, *, process_name, supervisor_uri, log_tail=0):
self.connector = None
self.connector, self._monitor = None, None
self.process_name = process_name
self._supervisor_uri = supervisor_uri
self._initial_log_tail = log_tail
self._monitor = None
self.command_handlers = {
'process_name': self.handle_process_name,
'log_tail': self.handle_log_tail
'process.log.set': self.handle_set
}
def start(self):
self._monitor = LogInotifyObserver(
connector=self.connector,
supervisor_uri=self._supervisor_uri,
process_name=self.process_name,
supervisor_uri=self._supervisor_uri,
log_tail=self._initial_log_tail
)
self._monitor.start()
def handle_event(self, message, _):
try:
data = message['data']
self.command_handlers[data['command']](data)
self.command_handlers[message['key']](message)
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def handle_process_name(self, data):
"""
Changes the monitored process.
:param data: TFW message data containing 'value'
(name of the process to monitor)
"""
self._monitor.process_name = data['value']
def handle_log_tail(self, data):
"""
Sets tail length of the log the monitor will send
to the frontend (the monitor will send back the last
'value' characters of the log).
:param data: TFW message data containing 'value'
(new tail length)
"""
self._monitor.log_tail = data['value']
def handle_set(self, data):
if data.get('name'):
self._monitor.process_name = data['name']
if data.get('tail'):
self._monitor.log_tail = data['tail']
def cleanup(self):
self._monitor.stop()

View File

@ -6,8 +6,6 @@ from datetime import datetime
from dateutil import parser as dateparser
from tfw.internals.networking import Scope
from .snapshot_provider import SnapshotProvider
LOG = logging.getLogger(__name__)
@ -23,9 +21,9 @@ class SnapshotHandler:
self.init_snapshot_providers(directories)
self.command_handlers = {
'take_snapshot': self.handle_take_snapshot,
'restore_snapshot': self.handle_restore_snapshot,
'exclude': self.handle_exclude
'snapshot.take': self.handle_take_snapshot,
'snapshot.restore': self.handle_restore_snapshot,
'snapshot.exclude': self.handle_exclude
}
def init_snapshot_providers(self, directories):
@ -45,23 +43,20 @@ class SnapshotHandler:
makedirs(git_dir, exist_ok=True)
return git_dir
def handle_event(self, message, connector):
def handle_event(self, message, _):
try:
data = message['data']
message['data'] = self.command_handlers[data['command']](data)
connector.send_message(message, scope=Scope.WEBSOCKET)
self.command_handlers[message['key']](message)
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def handle_take_snapshot(self, data):
def handle_take_snapshot(self, _):
LOG.debug('Taking snapshots of directories %s', self.snapshot_providers.keys())
for provider in self.snapshot_providers.values():
provider.take_snapshot()
return data
def handle_restore_snapshot(self, data):
def handle_restore_snapshot(self, message):
date = dateparser.parse(
data.get(
message.get(
'value',
datetime.now().isoformat()
)
@ -73,13 +68,11 @@ class SnapshotHandler:
)
for provider in self.snapshot_providers.values():
provider.restore_snapshot(date)
return data
def handle_exclude(self, data):
exclude_unix_patterns = data['value']
def handle_exclude(self, message):
exclude_unix_patterns = message['value']
if not isinstance(exclude_unix_patterns, list):
raise KeyError
for provider in self.snapshot_providers.values():
provider.exclude = exclude_unix_patterns
return data

View File

@ -7,40 +7,25 @@ LOG = logging.getLogger(__name__)
class TerminalHandler:
keys = ['shell']
"""
Event handler responsible for managing terminal sessions for frontend xterm
sessions to connect to. You need to instanciate this in order for frontend
terminals to work.
keys = ['terminal']
This EventHandler accepts messages that have a data['command'] key specifying
a command to be executed.
The API of each command is documented in their respective handler.
"""
def __init__(self, *, port, user, workind_directory, histfile):
"""
:param key: key this EventHandler listens to
:param monitor: tfw.components.HistoryMonitor instance to read command history from
"""
self.connector = None
def __init__(self, *, port, user, working_directory, histfile):
self.connector, self._historymonitor = None, None
self._histfile = histfile
self._historymonitor = None
bash_as_user_cmd = ['sudo', '-u', user, 'bash']
self.terminado_server = TerminadoMiniServer(
'/terminal',
port,
workind_directory,
working_directory,
bash_as_user_cmd
)
self.terminado_server.listen()
self.commands = {
'write': self.write,
'read': self.read
'terminal.write': self.handle_write
}
self.terminado_server.listen()
def start(self):
self._historymonitor = BashMonitor(self.connector, self._histfile)
self._historymonitor.start()
@ -51,34 +36,12 @@ class TerminalHandler:
def handle_event(self, message, _):
try:
data = message['data']
message['data'] = self.commands[data['command']](data)
self.commands[message['key']](message)
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def write(self, data):
"""
Writes a string to the terminal session (on the pty level).
Useful for pre-typing and executing commands for the user.
:param data: TFW message data containing 'value'
(command to be written to the pty)
"""
self.terminado_server.pty.write(data['value'])
return data
def read(self, data):
"""
Reads the history of commands executed.
:param data: TFW message data containing 'count'
(the number of history elements to return)
:return dict: message with list of commands in data['history']
"""
data['count'] = int(data.get('count', 1))
if self.historymonitor:
data['history'] = self.historymonitor.history[-data['count']:]
return data
def handle_write(self, message):
self.terminado_server.pty.write(message['command'])
def cleanup(self):
self.terminado_server.stop()

View File

@ -6,11 +6,7 @@ LOG = logging.getLogger(__name__)
class FSMAware:
keys = ['fsm_update']
"""
Base class for stuff that has to be aware of the framework FSM.
This is done by processing 'fsm_update' messages.
"""
keys = ['fsm.update']
def __init__(self):
self.fsm_state = None
self.fsm_in_accepted_state = False
@ -18,7 +14,7 @@ class FSMAware:
self._auth_key = KeyManager().auth_key
def process_message(self, message):
if message['key'] == 'fsm_update':
if message['key'] == 'fsm.update':
if verify_message(self._auth_key, message):
self._handle_fsm_update(message)

View File

@ -15,8 +15,6 @@ LOG = logging.getLogger(__name__)
class ZMQDownlinkConnector:
def __init__(self, connect_addr):
self.keys = []
self._on_recv_callback = None
self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB)
self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0)
self._zmq_sub_socket.connect(connect_addr)
@ -25,19 +23,14 @@ class ZMQDownlinkConnector:
def subscribe(self, *keys):
for key in keys:
self._zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, key)
self.keys.append(key)
def unsubscribe(self, *keys):
for key in keys:
self._zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, key)
self.keys.remove(key)
def register_callback(self, callback):
if callback:
self._on_recv_callback = callback
self._zmq_sub_stream.on_recv(with_deserialize_tfw_msg(self._on_recv))
else:
self._zmq_sub_stream.on_recv(None)
callback = with_deserialize_tfw_msg(callback) if callback else None
self._zmq_sub_stream.on_recv(callback)
def recv_message(self, *, block=True):
if self._zmq_sub_stream.receiving():
@ -48,11 +41,6 @@ class ZMQDownlinkConnector:
except zmq.ZMQError:
raise IOError("No data available to recv!")
def _on_recv(self, message):
key = message['key']
if key in self.keys or '' in self.keys:
self._on_recv_callback(message)
def close(self):
self._zmq_sub_stream.close()