test-tutorial-framework/solvable/src/event_handler_main.py

96 lines
3.0 KiB
Python

import logging
from sys import stderr
from functools import partial
from tornado.ioloop import IOLoop
from tfw.fsm import YamlFSM
from tfw.event_handlers import FSMAwareEventHandler
from tfw.components.ide import IdeHandler
from tfw.components.terminal import TerminalHandler
from tfw.components.frontend import FrontendHandler
from tfw.components.process_management import ProcessHandler, ProcessLogHandler
from tfw.components.snapshots import SnapshotHandler
from tfw.components.fsm import FSMHandler
from tfw.main import EventHandlerFactory, setup_signal_handlers
from tfw.logging import Log, Logger, LogFormatter, VerboseLogFormatter
from tfw.config import TFWENV, TAOENV
from custom_handlers import CenatorHandler, TestCommandsHandler, messageFSMStepsHandler
LOG = logging.getLogger(__name__)
def main():
# pylint: disable=unused-variable
Logger([
Log(stderr, LogFormatter(20)),
Log(TFWENV.LOGFILE, VerboseLogFormatter())
]).start()
eh_factory = EventHandlerFactory()
# TFW builtin EventHandlers (required for their respective functionalities)
# TFW FSM
fsm_eh = eh_factory.build(FSMHandler(
fsm_type=partial(
YamlFSM,
'test_fsm.yml',
{} # jinja2 variables, empty dict enables jinja2 without any variables
)
))
# Web IDE backend
ide_eh = eh_factory.build(IdeHandler(
allowed_directories=[TFWENV.IDE_WD, TFWENV.WEBSERVICE_DIR],
directory=TFWENV.IDE_WD,
exclude=['*.pyc']
))
# Web shell backend
terminal_eh = eh_factory.build(TerminalHandler(
port=TFWENV.TERMINADO_PORT,
user=TAOENV.USER,
workind_directory=TFWENV.TERMINADO_WD,
histfile=TFWENV.HISTFILE
))
# Handles 'deploy' button clicks
processmanager_eh = eh_factory.build(ProcessHandler(
supervisor_uri=TFWENV.SUPERVISOR_HTTP_URI,
log_tail=2000,
))
# Sends live logs of webservice process to frontend
logmonitor_eh = eh_factory.build(ProcessLogHandler(
process_name='webservice',
supervisor_uri=TFWENV.SUPERVISOR_HTTP_URI,
log_tail=2000
))
# Manages filesystem snapshots of directories
snapshot_eh = eh_factory.build(SnapshotHandler(
directories=[
TFWENV.IDE_WD,
TFWENV.WEBSERVICE_DIR
],
snapshots_dir=TFWENV.SNAPSHOTS_DIR
))
# Proxies frontend API calls to frontend
frontend_eh = eh_factory.build(FrontendHandler())
# Replace these with your custom event handlers
# Echoes executed commands to messages
cenator_eh = eh_factory.build(CenatorHandler())
# Echoes FSM steps
message_fsm_steps_eh = eh_factory.build(
messageFSMStepsHandler,
event_handler_type=FSMAwareEventHandler
)
# Catches special commands
commands_eh = eh_factory.build(TestCommandsHandler(
bashrc=f'/home/{TAOENV.USER}/.bashrc'
))
setup_signal_handlers()
IOLoop.instance().start()
if __name__ == '__main__':
main()