from tornado.ioloop import IOLoop from tfw.components import WebideEventHandler, TerminadoEventHandler, ProcessManagingEventHandler, BashMonitor from tfw.components import TerminalCommands from tfw.networking import MessageSender, TFWServerConnector from tfw.config import TFWENV from tfw.config.logs import logging from tao.config import TAOENV LOG = logging.getLogger(__name__) def cenator(history): LOG.debug('User executed command: "%s"', history[-1]) MessageSender().send('JOHN CENA', 'You\'ve executed "{}"'.format(history[-1])) class TestCommands(TerminalCommands): def command_selectdir(self, *args): TFWServerConnector().send_to_eventhandler('webide', {'data': {'command': 'selectdir', 'directory': args[0]}}) def command_trigger(self, *args): TFWServerConnector().send('selectdir_needs_no_key', {'trigger': args[0]}) def command_togglenext(self, *args): if not hasattr(self, 'togglenext_visible'): self.togglenext_visible = True TFWServerConnector().send('messagecontrol', {'data': {'command': 'showbutton', 'next_visibility': self.togglenext_visible}}) self.togglenext_visible = not self.togglenext_visible if __name__ == '__main__': # pylint: disable=invalid-name ide = WebideEventHandler(key='webide', allowed_directories=[TFWENV.WEBIDE_WD], directory=TFWENV.WEBIDE_WD, exclude=['*.pyc']) terminado = TerminadoEventHandler(key='shell', monitor=BashMonitor(TFWENV.HISTFILE)) terminado.historymonitor.subscribe_callback(cenator) commands = TestCommands(bashrc=f'/home/{TAOENV.USER}/.bashrc') terminado.historymonitor.subscribe_callback(commands.callback) processmanager = ProcessManagingEventHandler(key='processmanager', dirmonitor=ide.monitor) eventhandlers = {ide, terminado, processmanager} try: IOLoop.instance().start() finally: for eh in eventhandlers: eh.cleanup()