Merge branch 'eventhandler-rework' into chausie

This commit is contained in:
Kristóf Tóth 2019-07-25 12:12:01 +02:00
commit 7698ff430e
9 changed files with 122 additions and 423 deletions

View File

@ -2,8 +2,8 @@ FROM avatao/controller:debian-buster
USER root
ENV PYTHONPATH="/usr/local/lib" \
TFW_PUBLISHER_PORT=7654 \
TFW_RECEIVER_PORT=8765 \
TFW_PUB_PORT=7654 \
TFW_PULL_PORT=8765 \
TFW_AUTH_KEY="/tmp/tfw-auth.key" \
CONTROLLER_PORT=5555

View File

@ -4,12 +4,13 @@ import json
from tornado.ioloop import IOLoop
from tornado.web import RequestHandler, Application
from tfw.builtins import FSMAwareEventHandler
from tfw.event_handlers import FSMAwareEventHandler
from tfw.main import EventHandlerFactory, setup_signal_handlers
class ControllerPostHandler(RequestHandler):
# pylint: disable=abstract-method
def initialize(self, **kwargs): # pylint: disable=arguments-differ
# pylint: disable=abstract-method,attribute-defined-outside-init,unused-argument
def initialize(self, **kwargs):
self.controller = kwargs['controller']
def post(self, *args, **kwargs):
@ -19,21 +20,18 @@ class ControllerPostHandler(RequestHandler):
}))
class ControllerEventHandler(FSMAwareEventHandler):
def handle_event(self, message):
pass
if __name__ == '__main__':
controller = ControllerEventHandler('controller')
controller_eh = EventHandlerFactory().build(
lambda *_: None,
event_handler_type=FSMAwareEventHandler
)
application = Application([(
f'/{os.environ["SECRET"]}',
ControllerPostHandler,
{'controller': controller}
{'controller': controller_eh}
)])
application.listen(os.environ['CONTROLLER_PORT'])
try:
IOLoop.instance().start()
finally:
controller.cleanup()
setup_signal_handlers()
IOLoop.instance().start()

View File

@ -1,71 +0,0 @@
import logging
from ast import literal_eval
from tfw.components import MessageSender
from tfw.builtins import TFWServerUplinkConnector
from tfw.builtins import EventHandler, FSMAwareEventHandler, TerminalCommandsEventHandler
LOG = logging.getLogger(__name__)
class CenatorEventHandler(EventHandler):
"""
Logs commands executed in terminal to messages and invokes an
additional callback function to handle special commands.
!! Please remove from production code. !!
"""
def __init__(self, key):
super().__init__(key)
def handle_event(self, message):
command = message['value']
LOG.debug('User executed command: "%s"', command)
MessageSender(self.server_connector).send('JOHN CENA', f'You\'ve executed "{command}"')
class TestCommandsEventHandler(TerminalCommandsEventHandler):
"""
Some example commands useful for debugging.
!! Please remove from production code !! and inherit your own
class from TerminalCommands if you need to define custom
commands in your challenge.
"""
# pylint: disable=unused-argument, attribute-defined-outside-init, no-self-use
def command_sendmessage(self, *args):
"""
Insert TFW message template as first argument if executed without args.
Evaluate first argumen as a dict and send it to the frontend.
This is useful for playing around with frontend APIs.
"""
if not args:
message_template = """'{"key": "", "data": {"command": ""}}'"""
TFWServerUplinkConnector().send_message({
'key': 'shell',
'data': {
'command': 'write',
'value': f'sendmessage {message_template}'
}
})
else:
TFWServerUplinkConnector().send_message(literal_eval(args[0]))
class MessageFSMStepsEventHandler(FSMAwareEventHandler):
"""
This example EventHandler is capable of detecting FSM state.
!! Please remove from production code !!
"""
def handle_event(self, message):
pass
def handle_fsm_step(self, **kwargs):
"""
When the FSM steps this method is invoked.
Receives a 'data' field from an fsm_update message as kwargs.
"""
MessageSender(self.server_connector).send(
'FSM info',
f'FSM has stepped from state "{kwargs["last_event"]["from_state"]}" '
f'to state "{kwargs["current_state"]}" in response to trigger "{kwargs["last_event"]["trigger"]}"'
)

View File

@ -0,0 +1,45 @@
import logging
from ast import literal_eval
from tfw.components.frontend import MessageSender
from tfw.components.terminal import TerminalCommandsHandler
from tfw.main import TFWUplinkConnector
LOG = logging.getLogger(__name__)
class CenatorHandler:
keys = ['history.bash']
def handle_event(self, message, server_connector): # pylint: disable=no-self-use
command = message['value']
LOG.debug('User executed command: "%s"', command)
MessageSender(server_connector).send('JOHN CENA', f'You\'ve executed "{command}"')
class TestCommandsHandler(TerminalCommandsHandler):
# pylint: disable=unused-argument,attribute-defined-outside-init,no-self-use
def command_sendmessage(self, *args):
if not args:
message_template = """'{"key": "", "data": {"command": ""}}'"""
TFWUplinkConnector().send_message({
'key': 'shell',
'data': {
'command': 'write',
'value': f'sendmessage {message_template}'
}
})
else:
TFWUplinkConnector().send_message(literal_eval(args[0]))
def messageFSMStepsHandler(message, server_connector):
"""
When the FSM steps this method is invoked.
Receives a 'data' field from an fsm_update message as kwargs.
"""
MessageSender(server_connector).send(
'FSM info',
f'FSM has stepped from state "{message["last_event"]["from_state"]}" '
f'to state "{message["current_state"]}" in response to trigger "{message["last_event"]["trigger"]}"'
)

View File

@ -5,16 +5,18 @@ from functools import partial
from tornado.ioloop import IOLoop
from tfw.fsm import YamlFSM
from tfw.builtins import IdeEventHandler, TerminalEventHandler, FrontendEventHandler
from tfw.builtins import LogMonitoringEventHandler, ProcessManagingEventHandler
from tfw.builtins import DirectorySnapshottingEventHandler, FSMManagingEventHandler
from tfw.config import TFWENV
from tfw.event_handlers import FSMAwareEventHandler
from tfw.components.ide import IdeHandler
from tfw.components.terminal import TerminalHandler
from tfw.components.frontend import FrontendHandler
from tfw.components.process_management import ProcessHandler, ProcessLogHandler
from tfw.components.snapshots import SnapshotHandler
from tfw.components.fsm import FSMHandler
from tfw.main import EventHandlerFactory, setup_signal_handlers
from tfw.logging import Log, Logger, LogFormatter, VerboseLogFormatter
from tao.config import TAOENV
from tfw.config import TFWENV, TAOENV
from custom_event_handlers import MessageFSMStepsEventHandler
from custom_event_handlers import CenatorEventHandler, TestCommandsEventHandler
from signal_handling import setup_signal_handlers
from custom_handlers import CenatorHandler, TestCommandsHandler, messageFSMStepsHandler
LOG = logging.getLogger(__name__)
@ -26,51 +28,63 @@ def main():
Log(TFWENV.LOGFILE, VerboseLogFormatter())
]).start()
# TFW component EventHandlers (builtins, required for their respective functionalities)
fsm = FSMManagingEventHandler( # TFW FSM
key='fsm',
eh_factory = EventHandlerFactory()
# TFW builtin EventHandlers (required for their respective functionalities)
# TFW FSM
fsm_eh = eh_factory.build(FSMHandler(
fsm_type=partial(
YamlFSM,
'test_fsm.yml',
{} # jinja2 variables, use empty dict to enable jinja2 parsing without any variables
{} # jinja2 variables, empty dict enables jinja2 without any variables
)
)
ide = IdeEventHandler( # Web IDE backend
key='ide',
))
# Web IDE backend
ide_eh = eh_factory.build(IdeHandler(
allowed_directories=[TFWENV.IDE_WD, TFWENV.WEBSERVICE_DIR],
directory=TFWENV.IDE_WD,
exclude=['*.pyc']
)
terminal = TerminalEventHandler( # Web shell backend
key='shell'
)
cenator = CenatorEventHandler('history.bash') # Reacts to terminal commands
commands = TestCommandsEventHandler( # Catches special commands
key='history.bash',
bashrc=f'/home/{TAOENV.USER}/.bashrc'
)
processmanager = ProcessManagingEventHandler( # Handles 'deploy' button clicks
key='processmanager',
log_tail=2000
)
logmonitor = LogMonitoringEventHandler( # Sends live logs of webservice process to frontend
key='logmonitor',
))
# Web shell backend
terminal_eh = eh_factory.build(TerminalHandler(
port=TFWENV.TERMINADO_PORT,
user=TAOENV.USER,
workind_directory=TFWENV.TERMINADO_WD,
histfile=TFWENV.HISTFILE
))
# Handles 'deploy' button clicks
processmanager_eh = eh_factory.build(ProcessHandler(
supervisor_uri=TFWENV.SUPERVISOR_HTTP_URI,
log_tail=2000,
))
# Sends live logs of webservice process to frontend
logmonitor_eh = eh_factory.build(ProcessLogHandler(
process_name='webservice',
supervisor_uri=TFWENV.SUPERVISOR_HTTP_URI,
log_tail=2000
)
snapshot = DirectorySnapshottingEventHandler( # Manages filesystem snapshots of directories
key='snapshot',
))
# Manages filesystem snapshots of directories
snapshot_eh = eh_factory.build(SnapshotHandler(
directories=[
TFWENV.IDE_WD,
TFWENV.WEBSERVICE_DIR
]
)
frontend = FrontendEventHandler() # Proxies frontend API calls to frontend
],
snapshots_dir=TFWENV.SNAPSHOTS_DIR
))
# Proxies frontend API calls to frontend
frontend_eh = eh_factory.build(FrontendHandler())
# Your custom event handlers
message_fsm_steps_eh = MessageFSMStepsEventHandler(
key='test'
# Replace these with your custom event handlers
# Echoes executed commands to messages
cenator_eh = eh_factory.build(CenatorHandler())
# Echoes FSM steps
message_fsm_steps_eh = eh_factory.build(
messageFSMStepsHandler,
event_handler_type=FSMAwareEventHandler
)
# Catches special commands
commands_eh = eh_factory.build(TestCommandsHandler(
bashrc=f'/home/{TAOENV.USER}/.bashrc'
))
setup_signal_handlers()
IOLoop.instance().start()

View File

@ -1,235 +0,0 @@
from json import dumps, loads
from tfw.crypto import KeyManager, sign_message, verify_message
from tfw.builtins import PipeIOEventHandlerBase
from tfw.builtins.pipe_io_event_handler import DEFAULT_PERMISSIONS
from tfw.networking.scope import Scope
class SignMessagePipeIOEventHandler(PipeIOEventHandlerBase):
"""
Signs a valid TFW message with HMAC.
Note that the running process needs root permissions in order to read
the authentication key.
When forwarding is true, it will send the signed message to the TFW
server before writing it into the output pipe.
"""
def __init__(
self, in_pipe_path, out_pipe_path,
permissions=DEFAULT_PERMISSIONS,
forwarding=True
):
self.forwarding = forwarding
self.auth_key = KeyManager().auth_key
super().__init__(None, in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
pass
def handle_pipe_event(self, message_bytes):
message = loads(message_bytes)
sign_message(self.auth_key, message)
if self.forwarding:
self.server_connector.send_message(message)
self.pipe_io.send_message(dumps(message).encode())
class VerifyMessagePipeIOEventHandler(PipeIOEventHandlerBase):
"""
Verifies a signed TFW message.
This pipe also needs root permissions. Send the serialized JSON object
to the pipe, then wait for its boolean response.
"""
def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS):
self.auth_key = KeyManager().auth_key
super().__init__(None, in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
pass
def handle_pipe_event(self, message_bytes):
message = loads(message_bytes)
validity = verify_message(self.auth_key, message)
self.pipe_io.send_message(str(validity).lower().encode())
class BotPipeIOEventHandler(PipeIOEventHandlerBase):
"""
Sends bot messages to the frontend.
If you assign @originator, it will be the default message sender.
When you write a line to the pipe, it will be considered as a single
message and gets appended to the queue until an empty line is received,
which triggers forwarding the messages to the TFW server.
"""
def __init__(
self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS,
originator='avataobot'
):
self.queue = []
self.originator = originator
super().__init__(None, in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
pass
def handle_pipe_event(self, message_bytes):
if message_bytes == b"":
if self.queue:
self.server_connector.send_message({
'key': 'queueMessages',
'data': {
'messages': self.queue
}
})
self.queue = []
else:
self.queue.append({
'originator': self.originator,
'message': message_bytes.decode().replace('\\n', '\n')
})
class DeployPipeIOEventHandler(PipeIOEventHandlerBase):
"""
Manages deployment in the IDE.
When you receive "deploy", then you have to answer with a "true" or
"false" depending whether you are satisfied with the result or not.
The @process parameter is the name of the supervised service.
"""
# pylint: disable=too-many-arguments
def __init__(
self, in_pipe_path, out_pipe_path, process,
permissions=DEFAULT_PERMISSIONS
):
self.expected = False
self.process = process
self.onsuccess = {
'key': 'processmanager',
'data': {
'process_name': process,
'command': 'restart'
}
}
self.onerror = {
'key': 'processmanager',
'data': {
'process_name': process,
'error': True
}
}
super().__init__('processmanager', in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
if message == self.onsuccess:
self.expected = True
self.pipe_io.send_message(b'deploy')
def handle_pipe_event(self, message_bytes):
if not self.expected:
raise ValueError(
f'{self.pipe_io.in_pipe}: There is nothing to deploy.'
)
self.expected = False
if message_bytes == b'true':
self.server_connector.send_message(self.onsuccess, scope=Scope.WEBSOCKET)
elif message_bytes == b'false':
self.server_connector.send_message(self.onerror, scope=Scope.WEBSOCKET)
else:
raise ValueError(
f'{self.pipe_io.in_pipe}: Expected "true" or "false".'
)
class IdePipeIOEventHandler(PipeIOEventHandlerBase):
"""
Manipulates the content of the IDE.
You can observe a file, and when the user edits it, you will receive
the new contents where newlines are escaped as "\\n".
In order to overwrite the file, send an escaped text back to the pipe.
Since the pipe doesn't know if the file is selected initially in the IDE,
you have to provide this information by yourself with @selected,
but it will track it later on.
"""
# pylint: disable=too-many-arguments
def __init__(
self, in_pipe_path, out_pipe_path, filename,
permissions=DEFAULT_PERMISSIONS,
selected=True
):
self.selected = selected
self.filename = filename
super().__init__('ide', in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
data = message['data']
if data['command'] == 'select':
self.selected = data['filename'] == self.filename
elif data['command'] == 'write' and self.selected:
clean = data['content'].replace('\n', '\\n')
self.pipe_io.send_message(clean.encode())
def handle_pipe_event(self, message_bytes):
if not self.selected:
self.server_connector.send_message({
'key': 'mirror',
'data': {
'key': 'ide',
'data': {
'command': 'select',
'filename': self.filename
}
}
})
self.server_connector.send_message({
'key': 'mirror',
'data': {
'key': 'ide',
'data': {
'command': 'write',
'content': message_bytes.decode().replace('\\n', '\n')
}
}
})
self.server_connector.send_message({
'key': 'mirror',
'data': {
'key': 'ide',
'data': {
'command': 'read'
}
}
})
class FSMPipeIOEventHandler(PipeIOEventHandlerBase):
"""
Handles FSM steps.
When the FSM enters the next state, you will receive a line containing
its name. To trigger a state change, send the name of the transition to
the pipe.
"""
def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS):
super().__init__(
['fsm', 'fsm_update'],
in_pipe_path, out_pipe_path, permissions
)
def handle_event(self, message):
if 'current_state' in message['data']:
self.pipe_io.send_message(message['data']['current_state'].encode())
def handle_pipe_event(self, message_bytes):
self.server_connector.send_message({
'key': '',
'trigger': message_bytes.decode()
})

View File

@ -3,17 +3,10 @@ from sys import stderr
from tornado.ioloop import IOLoop
from tfw.builtins import PipeIOEventHandler
from tfw.components.pipe_io import PipeIOHandler
from tfw.config import TFWENV
from tfw.logging import Log, Logger, LogFormatter, VerboseLogFormatter
from pipe_io_auxlib import (
SignMessagePipeIOEventHandler, VerifyMessagePipeIOEventHandler,
BotPipeIOEventHandler,
DeployPipeIOEventHandler, IdePipeIOEventHandler,
FSMPipeIOEventHandler
)
from signal_handling import setup_signal_handlers
from tfw.main import EventHandlerFactory, setup_signal_handlers
LOG = logging.getLogger(__name__)
@ -25,46 +18,12 @@ def main():
Log(TFWENV.LOGFILE, VerboseLogFormatter())
]).start()
json_pipe = PipeIOEventHandler(
'',
'/tmp/tfw_json_send',
'/tmp/tfw_json_recv'
)
eh_factory = EventHandlerFactory()
sign_pipe = SignMessagePipeIOEventHandler(
'/tmp/tfw_sign_send',
'/tmp/tfw_sign_recv',
forwarding=True
)
verify_pipe = VerifyMessagePipeIOEventHandler(
'/tmp/tfw_verify_send',
'/tmp/tfw_verify_recv'
)
bot_pipe = BotPipeIOEventHandler(
'/tmp/tfw_bot_send',
'/tmp/tfw_bot_recv',
permissions=0o666
)
deploy_pipe = DeployPipeIOEventHandler(
'/tmp/tfw_deploy_send',
'/tmp/tfw_deploy_recv',
'webservice'
)
ide_pipe = IdePipeIOEventHandler(
'/tmp/tfw_ide_send',
'/tmp/tfw_ide_recv',
'user_ops.py',
selected=True
)
fsm_pipe = FSMPipeIOEventHandler(
'/tmp/tfw_fsm_send',
'/tmp/tfw_fsm_recv'
)
json_pipe_eh = eh_factory.build(PipeIOHandler(
'/tmp/tfw_send',
'/tmp/tfw_recv'
))
setup_signal_handlers()
IOLoop.instance().start()

View File

@ -1,11 +0,0 @@
from signal import signal, SIGTERM, SIGINT
from tfw.builtins import EventHandler
def setup_signal_handlers():
def stop(*_):
EventHandler.stop_all_instances()
exit(0)
signal(SIGTERM, stop)
signal(SIGINT, stop)

View File

@ -3,8 +3,8 @@
from os.path import exists
from tfw.fsm import LinearFSM
from tfw.components import MessageSender
from tfw.builtins import TFWServerUplinkConnector
from tfw.components.frontend import MessageSender
from tfw.main import TFWUplinkConnector
class TestFSM(LinearFSM):
@ -12,7 +12,7 @@ class TestFSM(LinearFSM):
def __init__(self):
super().__init__(6)
self.uplink = TFWServerUplinkConnector()
self.uplink = TFWUplinkConnector()
self.message_sender = MessageSender(self.uplink)
self.subscribe_predicate('step_3', self.step_3_allowed)