mirror of
https://github.com/avatao-content/test-tutorial-framework
synced 2024-11-14 15:57:17 +00:00
Make python and yaml FSMs eqvivalent so it's a nice example
This commit is contained in:
parent
6b1fa664c4
commit
98f297cff2
@ -3,6 +3,7 @@ from functools import partial
|
|||||||
|
|
||||||
from tornado.ioloop import IOLoop
|
from tornado.ioloop import IOLoop
|
||||||
|
|
||||||
|
from tfw import YamlFSM
|
||||||
from tfw.components import IdeEventHandler, TerminalEventHandler
|
from tfw.components import IdeEventHandler, TerminalEventHandler
|
||||||
from tfw.components import ProcessManagingEventHandler, BashMonitor
|
from tfw.components import ProcessManagingEventHandler, BashMonitor
|
||||||
from tfw.components import TerminalCommands, LogMonitoringEventHandler
|
from tfw.components import TerminalCommands, LogMonitoringEventHandler
|
||||||
@ -80,10 +81,9 @@ class TestCommands(TerminalCommands):
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
FiveStepTestFSM = partial(TestFSM, 5)
|
|
||||||
fsm = FSMManagingEventHandler( # TFW FSM
|
fsm = FSMManagingEventHandler( # TFW FSM
|
||||||
key='fsm',
|
key='fsm',
|
||||||
fsm_type=FiveStepTestFSM
|
fsm_type=partial(YamlFSM, 'test_fsm.yml')
|
||||||
)
|
)
|
||||||
ide = IdeEventHandler( # Web IDE backend
|
ide = IdeEventHandler( # Web IDE backend
|
||||||
key='ide',
|
key='ide',
|
||||||
@ -105,7 +105,7 @@ if __name__ == '__main__':
|
|||||||
process_name='webservice',
|
process_name='webservice',
|
||||||
log_tail=2000
|
log_tail=2000
|
||||||
)
|
)
|
||||||
eventhandlers = {ide, terminal, processmanager, logmonitor, fsm}
|
eventhandlers = {fsm, ide, terminal, processmanager, logmonitor}
|
||||||
|
|
||||||
commands = TestCommands(bashrc=f'/home/{TAOENV.USER}/.bashrc')
|
commands = TestCommands(bashrc=f'/home/{TAOENV.USER}/.bashrc')
|
||||||
terminal.historymonitor.subscribe_callback(commands.callback)
|
terminal.historymonitor.subscribe_callback(commands.callback)
|
||||||
|
@ -1,3 +1,7 @@
|
|||||||
|
# This defines an FSM equvivalent to test_fsm.yml
|
||||||
|
|
||||||
|
from os.path import exists
|
||||||
|
|
||||||
from tfw import LinearFSM
|
from tfw import LinearFSM
|
||||||
from tfw.networking import MessageSender
|
from tfw.networking import MessageSender
|
||||||
|
|
||||||
@ -5,24 +9,29 @@ from tfw.networking import MessageSender
|
|||||||
class TestFSM(LinearFSM):
|
class TestFSM(LinearFSM):
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
|
|
||||||
def __init__(self, number_of_steps):
|
def __init__(self):
|
||||||
super().__init__(number_of_steps)
|
super().__init__(6)
|
||||||
self.message_sender = MessageSender()
|
self.message_sender = MessageSender()
|
||||||
|
self.subscribe_predicate('step_3', self.step_3_allowed)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def step_3_allowed():
|
||||||
|
return exists('/home/user/workdir/allow_step_3')
|
||||||
|
|
||||||
def on_enter_1(self, event_data):
|
def on_enter_1(self, event_data):
|
||||||
self.state_notify(1)
|
self.message_sender.send('FSM', 'Entered state 1!')
|
||||||
|
|
||||||
def on_enter_2(self, event_data):
|
def on_enter_2(self, event_data):
|
||||||
self.state_notify(2)
|
filename = '/home/user/workdir/cat.txt'
|
||||||
|
with open(filename, 'w') as ofile:
|
||||||
|
ofile.write('As you can see it is possible to write arbitrary python code here.')
|
||||||
|
self.message_sender.send('FSM', f'Entered state 2! Written stuff to {filename}')
|
||||||
|
|
||||||
def on_enter_3(self, event_data):
|
def on_enter_3(self, event_data):
|
||||||
self.state_notify(3)
|
self.message_sender.send('FSM', 'Entered state 3!')
|
||||||
|
|
||||||
def on_enter_4(self, event_data):
|
def on_enter_4(self, event_data):
|
||||||
self.state_notify(4)
|
self.message_sender.send('FSM', 'Entered state 4!')
|
||||||
|
|
||||||
def on_enter_5(self, event_data):
|
def on_enter_5(self, event_data):
|
||||||
self.state_notify(5)
|
self.message_sender.send('FSM', 'Entered state 5!')
|
||||||
|
|
||||||
def state_notify(self, state):
|
|
||||||
self.message_sender.send('TestFSM', f'Entered state {state}!')
|
|
||||||
|
43
solvable/src/test_fsm.yml
Normal file
43
solvable/src/test_fsm.yml
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
# This defines an FSM equvivalent to test_fsm.py
|
||||||
|
|
||||||
|
states:
|
||||||
|
- name: '0'
|
||||||
|
- name: '1'
|
||||||
|
on_enter: |
|
||||||
|
python3 -c "from tfwconnector import MessageSender; MessageSender().send('FSM', 'Entered state 1!')"
|
||||||
|
- name: '2'
|
||||||
|
on_enter: |
|
||||||
|
file=/home/user/workdir/cat.txt
|
||||||
|
echo "As you can see it is possible to execute arbitrary shell commands here." >> $file
|
||||||
|
python3 -c \
|
||||||
|
"
|
||||||
|
from tfwconnector import MessageSender
|
||||||
|
MessageSender().send('FSM', 'Entered state 2! Written stuff to $file')
|
||||||
|
"
|
||||||
|
- name: '3'
|
||||||
|
on_enter: |
|
||||||
|
python3 -c "from tfwconnector import MessageSender; MessageSender().send('FSM', 'Entered state 3!')"
|
||||||
|
- name: '4'
|
||||||
|
on_enter: |
|
||||||
|
python3 -c "from tfwconnector import MessageSender; MessageSender().send('FSM', 'Entered state 4!')"
|
||||||
|
- name: '5'
|
||||||
|
on_enter: |
|
||||||
|
python3 -c "from tfwconnector import MessageSender; MessageSender().send('FSM', 'Entered state 5!')"
|
||||||
|
transitions:
|
||||||
|
- trigger: step_1
|
||||||
|
source: '0'
|
||||||
|
dest: '1'
|
||||||
|
- trigger: step_2
|
||||||
|
source: '1'
|
||||||
|
dest: '2'
|
||||||
|
- trigger: step_3
|
||||||
|
source: '2'
|
||||||
|
dest: '3'
|
||||||
|
predicates:
|
||||||
|
- '[ -f /home/user/workdir/allow_step_3 ]' # in bash -f means that the file exists
|
||||||
|
- trigger: step_4
|
||||||
|
source: '3'
|
||||||
|
dest: '4'
|
||||||
|
- trigger: step_5
|
||||||
|
source: '4'
|
||||||
|
dest: '5'
|
Loading…
Reference in New Issue
Block a user