Move fsm stuff to a separate directory in lib

This commit is contained in:
Kristóf Tóth
2018-07-24 11:40:33 +02:00
parent 52b2adb9c4
commit 8c6a14cef5
5 changed files with 8 additions and 4 deletions

6
lib/tfw/fsm/__init__.py Normal file
View File

@ -0,0 +1,6 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from .fsm_base import FSMBase
from .linear_fsm import LinearFSM
from .yaml_fsm import YamlFSM

68
lib/tfw/fsm/fsm_base.py Normal file
View File

@ -0,0 +1,68 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from collections import defaultdict
from transitions import Machine, MachineError
from tfw.mixins import CallbackMixin
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class FSMBase(Machine, CallbackMixin):
"""
A general FSM base class you can inherit from to track user progress.
See linear_fsm.py for an example use-case.
TFW uses the transitions library for state machines, please refer to their
documentation for more information on creating your own machines:
https://github.com/pytransitions/transitions
"""
states, transitions = [], []
def __init__(self, initial=None, accepted_states=None):
self.accepted_states = accepted_states or [self.states[-1].name]
self.trigger_predicates = defaultdict(list)
self.trigger_history = []
Machine.__init__(
self,
states=self.states,
transitions=self.transitions,
initial=initial or self.states[0],
send_event=True,
ignore_invalid_triggers=True,
after_state_change='execute_callbacks'
)
def execute_callbacks(self, event_data):
self._execute_callbacks(event_data.kwargs)
def is_solved(self):
return self.state in self.accepted_states # pylint: disable=no-member
def subscribe_predicate(self, trigger, *predicates):
self.trigger_predicates[trigger].extend(predicates)
def unsubscribe_predicate(self, trigger, *predicates):
self.trigger_predicates[trigger] = [
predicate
for predicate in self.trigger_predicates[trigger]
not in predicates
]
def step(self, trigger):
predicate_results = (
predicate()
for predicate in self.trigger_predicates[trigger]
)
if all(predicate_results):
try:
self.trigger(trigger)
self.trigger_history.append(trigger)
return True
except (AttributeError, MachineError):
LOG.debug('FSM failed to execute nonexistent trigger: "%s"', trigger)
return False

31
lib/tfw/fsm/linear_fsm.py Normal file
View File

@ -0,0 +1,31 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from transitions import State
from .fsm_base import FSMBase
class LinearFSM(FSMBase):
# pylint: disable=anomalous-backslash-in-string
"""
This is a state machine for challenges with linear progression, consisting of
a number of steps specified in the constructor. It automatically sets up 2
actions (triggers) between states as such:
(0) -- step_1 --> (1) -- step_2 --> (2) -- step_3 --> (3) ... and so on
"""
def __init__(self, number_of_steps):
self.states = [State(name=str(index)) for index in range(number_of_steps)]
self.transitions = []
for state in self.states[:-1]:
self.transitions.append({
'trigger': f'step_{int(state.name)+1}',
'source': state.name,
'dest': str(int(state.name)+1)
})
self.transitions.append({
'trigger': 'step_next',
'source': state.name,
'dest': str(int(state.name)+1)
})
super(LinearFSM, self).__init__()

98
lib/tfw/fsm/yaml_fsm.py Normal file
View File

@ -0,0 +1,98 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from subprocess import Popen, run
from functools import partial, singledispatch
from contextlib import suppress
import yaml
import jinja2
from transitions import State
from .fsm_base import FSMBase
class YamlFSM(FSMBase):
def __init__(self, config_file, jinja2_variables=None):
self.config = ConfigParser(config_file, jinja2_variables).config
self.setup_states()
super().__init__() # FSMBase.__init__() requires states
self.setup_transitions()
def setup_states(self):
self.for_config_states_and_transitions_do(self.wrap_callbacks_with_subprocess_call)
self.states = [State(**state) for state in self.config['states']]
def setup_transitions(self):
self.for_config_states_and_transitions_do(self.subscribe_and_remove_predicates)
for transition in self.config['transitions']:
self.add_transition(**transition)
def for_config_states_and_transitions_do(self, what):
for array in ('states', 'transitions'):
for json_obj in self.config[array]:
what(json_obj)
@staticmethod
def wrap_callbacks_with_subprocess_call(json_obj):
topatch = ('on_enter', 'on_exit', 'prepare', 'before', 'after')
for key in json_obj:
if key in topatch:
json_obj[key] = partial(run_command_async, json_obj[key])
def subscribe_and_remove_predicates(self, json_obj):
if 'predicates' in json_obj:
for predicate in json_obj['predicates']:
self.subscribe_predicate(
json_obj['trigger'],
partial(
command_statuscode_is_zero,
predicate
)
)
with suppress(KeyError):
json_obj.pop('predicates')
def run_command_async(command, _):
Popen(command, shell=True)
def command_statuscode_is_zero(command):
return run(command, shell=True).returncode == 0
class ConfigParser:
def __init__(self, config_file, jinja2_variables):
self.read_variables = singledispatch(self._read_variables)
self.read_variables.register(dict, self._read_variables_dict)
self.read_variables.register(str, self._read_variables_str)
self.config = self.parse_config(config_file, jinja2_variables)
def parse_config(self, config_file, jinja2_variables):
config_string = self.read_file(config_file)
if jinja2_variables is not None:
variables = self.read_variables(jinja2_variables)
template = jinja2.Environment(loader=jinja2.BaseLoader).from_string(config_string)
config_string = template.render(**variables)
return yaml.safe_load(config_string)
@staticmethod
def read_file(filename):
with open(filename, 'r') as ifile:
return ifile.read()
@staticmethod
def _read_variables(variables):
raise TypeError(f'Invalid variables type {type(variables)}')
@staticmethod
def _read_variables_str(variables):
with open(variables, 'r') as ifile:
return yaml.safe_load(ifile)
@staticmethod
def _read_variables_dict(variables):
return variables