1
0
mirror of https://github.com/avatao-content/test-tutorial-framework synced 2024-11-15 04:27:18 +00:00
test-tutorial-framework/solvable/src/event_handler_main.py
2018-04-20 17:33:59 +02:00

72 lines
3.1 KiB
Python

from tornado.ioloop import IOLoop
from tfw.components import IdeEventHandler, TerminadoEventHandler, ProcessManagingEventHandler, BashMonitor
from tfw.components import TerminalCommands
from tfw.networking import MessageSender, TFWServerConnector
from tfw.config import TFWENV
from tfw.config.logs import logging
from tao.config import TAOENV
LOG = logging.getLogger(__name__)
def cenator(history):
LOG.debug('User executed command: "%s"', history[-1])
MessageSender().send('JOHN CENA', f'You\'ve executed "{history[-1]}"')
class TestCommands(TerminalCommands):
# pylint: disable=unused-argument, attribute-defined-outside-init, no-self-use
def command_selectdir(self, *args):
TFWServerConnector().send_to_eventhandler({'key': 'ide',
'data': {'command': 'selectdir',
'directory': args[0]}})
def command_trigger(self, *args):
TFWServerConnector().send({'key': '',
'trigger': args[0]})
def command_togglenext(self, *args):
if not hasattr(self, 'togglenext_visible'):
self.togglenext_visible = True
TFWServerConnector().send({'key': 'messagecontrol',
'data': {'command': 'showbutton',
'next_visibility': self.togglenext_visible}})
self.togglenext_visible = not self.togglenext_visible
def command_seppuku_tfw(self, *args):
seppuku = ('nohup sh -c "supervisorctl restart tfwserver event_handler_main" &> /dev/null & '
'clear && echo "Committed seppuku! :)" && sleep infinity')
uplink = TFWServerConnector()
uplink.send_to_eventhandler({'key': 'shell',
'data': {'command': 'write',
'shellcmd': f'{seppuku}\n'}})
uplink.send({'key': 'dashboard',
'data' :{'command': 'reload_frontend'}})
def command_changelayout(self, *args):
message = {'key': 'dashboard',
'data': {'command': 'layout',
'layout': args[0]}}
if len(args) >= 2:
message['data']['hide_messages'] = (args[1] in ['yes', 'y', '1', 'true'])
TFWServerConnector().send(message)
if __name__ == '__main__':
# pylint: disable=invalid-name
ide = IdeEventHandler(key='ide', allowed_directories=[TFWENV.IDE_WD],
directory=TFWENV.IDE_WD, exclude=['*.pyc'])
terminado = TerminadoEventHandler(key='shell', monitor=BashMonitor(TFWENV.HISTFILE))
terminado.historymonitor.subscribe_callback(cenator)
commands = TestCommands(bashrc=f'/home/{TAOENV.USER}/.bashrc')
terminado.historymonitor.subscribe_callback(commands.callback)
processmanager = ProcessManagingEventHandler(key='processmanager', dirmonitor=ide.monitor)
eventhandlers = {ide, terminado, processmanager}
try:
IOLoop.instance().start()
finally:
for eh in eventhandlers:
eh.cleanup()