2018-07-04 16:11:42 +00:00
|
|
|
from subprocess import Popen, run
|
2018-07-11 13:30:52 +00:00
|
|
|
from functools import partial, singledispatch
|
2018-07-04 16:11:42 +00:00
|
|
|
from contextlib import suppress
|
2018-07-03 17:06:54 +00:00
|
|
|
|
2018-07-03 13:14:00 +00:00
|
|
|
import yaml
|
2018-07-11 13:30:52 +00:00
|
|
|
import jinja2
|
2018-07-03 13:14:00 +00:00
|
|
|
from transitions import State
|
|
|
|
|
|
|
|
from tfw import FSMBase
|
|
|
|
|
|
|
|
|
|
|
|
class YamlFSM(FSMBase):
|
2018-07-11 13:30:52 +00:00
|
|
|
def __init__(self, config_file, jinja2_variables=None):
|
|
|
|
self.config = ConfigParser(config_file, jinja2_variables).config
|
2018-07-03 17:06:54 +00:00
|
|
|
self.setup_states()
|
2018-07-04 14:22:03 +00:00
|
|
|
super().__init__() # FSMBase.__init__() requires states
|
2018-07-03 17:06:54 +00:00
|
|
|
self.setup_transitions()
|
2018-07-03 13:14:00 +00:00
|
|
|
|
2018-07-03 17:06:54 +00:00
|
|
|
def setup_states(self):
|
2018-07-06 11:31:25 +00:00
|
|
|
self.for_config_states_and_transitions_do(self.wrap_callbacks_with_subprocess_call)
|
2018-07-03 17:06:54 +00:00
|
|
|
self.states = [State(**state) for state in self.config['states']]
|
|
|
|
|
|
|
|
def setup_transitions(self):
|
2018-07-06 11:27:26 +00:00
|
|
|
self.for_config_states_and_transitions_do(self.subscribe_and_remove_predicates)
|
2018-07-03 17:06:54 +00:00
|
|
|
for transition in self.config['transitions']:
|
2018-07-04 14:22:03 +00:00
|
|
|
self.add_transition(**transition)
|
2018-07-03 17:06:54 +00:00
|
|
|
|
2018-07-04 16:00:41 +00:00
|
|
|
def for_config_states_and_transitions_do(self, what):
|
2018-07-04 14:22:03 +00:00
|
|
|
for array in ('states', 'transitions'):
|
2018-07-03 18:09:47 +00:00
|
|
|
for json_obj in self.config[array]:
|
2018-07-04 16:00:41 +00:00
|
|
|
what(json_obj)
|
|
|
|
|
2018-07-04 16:15:34 +00:00
|
|
|
@staticmethod
|
2018-07-06 11:31:25 +00:00
|
|
|
def wrap_callbacks_with_subprocess_call(json_obj):
|
2018-07-04 16:00:41 +00:00
|
|
|
topatch = ('on_enter', 'on_exit', 'prepare', 'before', 'after')
|
|
|
|
for key in json_obj:
|
|
|
|
if key in topatch:
|
2018-07-04 16:15:34 +00:00
|
|
|
json_obj[key] = partial(run_command_async, json_obj[key])
|
2018-07-03 17:06:54 +00:00
|
|
|
|
2018-07-04 16:11:42 +00:00
|
|
|
def subscribe_and_remove_predicates(self, json_obj):
|
2018-07-04 19:58:30 +00:00
|
|
|
if 'predicates' in json_obj:
|
|
|
|
for predicate in json_obj['predicates']:
|
2018-07-04 16:11:42 +00:00
|
|
|
self.subscribe_predicate(
|
|
|
|
json_obj['trigger'],
|
|
|
|
partial(
|
2018-07-04 16:15:34 +00:00
|
|
|
command_statuscode_is_zero,
|
2018-07-04 16:11:42 +00:00
|
|
|
predicate
|
|
|
|
)
|
|
|
|
)
|
2018-07-04 19:58:30 +00:00
|
|
|
|
2018-07-04 16:11:42 +00:00
|
|
|
with suppress(KeyError):
|
|
|
|
json_obj.pop('predicates')
|
|
|
|
|
|
|
|
|
2018-07-04 16:15:34 +00:00
|
|
|
def run_command_async(command, event):
|
|
|
|
Popen(command, shell=True)
|
|
|
|
|
|
|
|
|
|
|
|
def command_statuscode_is_zero(command):
|
|
|
|
return run(command, shell=True).returncode == 0
|
2018-07-11 13:30:52 +00:00
|
|
|
|
|
|
|
|
|
|
|
class ConfigParser:
|
|
|
|
def __init__(self, config_file, jinja2_variables):
|
|
|
|
self.read_variables = singledispatch(self.read_variables)
|
|
|
|
self.read_variables.register(dict, self._read_variables_dict)
|
|
|
|
self.read_variables.register(str, self._read_variables_str)
|
|
|
|
|
|
|
|
self.config = self.parse_config(config_file, jinja2_variables)
|
|
|
|
|
|
|
|
def parse_config(self, config_file, jinja2_variables):
|
|
|
|
config_string = self.read_file(config_file)
|
|
|
|
if jinja2_variables is not None:
|
|
|
|
variables = self.read_variables(jinja2_variables)
|
|
|
|
template = jinja2.Environment(loader=jinja2.BaseLoader).from_string(config_string)
|
|
|
|
config_string = template.render(**variables)
|
|
|
|
return yaml.safe_load(config_string)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def read_file(filename):
|
|
|
|
with open(filename, 'r') as ifile:
|
|
|
|
return ifile.read()
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def read_variables(variables):
|
|
|
|
raise TypeError(f'Invalid variables type {type(variables)}')
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _read_variables_str(variables):
|
|
|
|
if isinstance(variables, str):
|
|
|
|
with open(variables, 'r') as ifile:
|
|
|
|
return yaml.safe_load(ifile)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _read_variables_dict(variables):
|
|
|
|
return variables
|
|
|
|
|