Introduce intent for TFW messages and fix FSM related bugs

This commit is contained in:
R. Richard 2019-08-23 15:27:03 +02:00 committed by therealkrispet
parent f626fef8f8
commit 2e5867cc49
14 changed files with 42 additions and 19 deletions

View File

@ -1,7 +1,7 @@
import logging import logging
from tfw.internals.crypto import KeyManager, sign_message from tfw.internals.crypto import KeyManager, sign_message
from tfw.internals.networking import Scope from tfw.internals.networking import Scope, Intent
from .fsm_updater import FSMUpdater from .fsm_updater import FSMUpdater
@ -12,6 +12,7 @@ class FSMHandler:
keys = ['fsm'] keys = ['fsm']
def __init__(self, *, fsm_type, initial_trigger): def __init__(self, *, fsm_type, initial_trigger):
self.connector = None
self.fsm = fsm_type() self.fsm = fsm_type()
self._fsm_updater = FSMUpdater(self.fsm) self._fsm_updater = FSMUpdater(self.fsm)
self.auth_key = KeyManager().auth_key self.auth_key = KeyManager().auth_key
@ -20,7 +21,7 @@ class FSMHandler:
self.command_handlers = { self.command_handlers = {
'frontend.ready' : self.handle_ready, 'frontend.ready' : self.handle_ready,
'fsm.step' : self.handle_step, 'fsm.step' : self.handle_step,
'fsm.announce' : self.handle_announce 'fsm.update' : self.handle_update
} }
def start(self): def start(self):
@ -32,7 +33,7 @@ class FSMHandler:
if message: if message:
fsm_update_message = self._fsm_updater.fsm_update fsm_update_message = self._fsm_updater.fsm_update
sign_message(self.auth_key, fsm_update_message) sign_message(self.auth_key, fsm_update_message)
connector.send_message(fsm_update_message, Scope.BROADCAST) connector.send_message(fsm_update_message, Scope.BROADCAST, Intent.EVENT)
except KeyError: except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
@ -41,10 +42,10 @@ class FSMHandler:
self.connector.unsubscribe('frontend.ready') self.connector.unsubscribe('frontend.ready')
return message return message
def handle_step(self, message): def handle_step(self, message): # pylint: disable=inconsistent-return-statements
if self.fsm.step(message['trigger']): if self.fsm.step(message['trigger']):
return message return message
def handle_announce(self, message): def handle_update(self, message):
# pylint: disable=no-self-use # pylint: disable=no-self-use
return message return message

View File

@ -15,8 +15,9 @@ class FSMUpdater:
{'trigger': trigger} {'trigger': trigger}
for trigger in self.fsm.get_triggers(self.fsm.state) for trigger in self.fsm.get_triggers(self.fsm.state)
] ]
if not self.fsm.event_log:
return {}
last_fsm_event = self.fsm.event_log[-1] last_fsm_event = self.fsm.event_log[-1]
last_fsm_event['timestamp'] = last_fsm_event['timestamp'].isoformat()
return { return {
'current_state': self.fsm.state, 'current_state': self.fsm.state,
'valid_transitions': valid_transitions, 'valid_transitions': valid_transitions,

View File

@ -1,6 +1,4 @@
import logging from tfw.internals.networking import Scope, Intent
from tfw.internals.networking import Scope
from tfw.internals.inotify import InotifyObserver from tfw.internals.inotify import InotifyObserver
from .supervisor import ProcessLogManager from .supervisor import ProcessLogManager
@ -33,4 +31,4 @@ class LogInotifyObserver(InotifyObserver, ProcessLogManager):
'key': 'process.log.new', 'key': 'process.log.new',
'stdout': self.read_stdout(self.process_name, tail=self.log_tail), 'stdout': self.read_stdout(self.process_name, tail=self.log_tail),
'stderr': self.read_stderr(self.process_name, tail=self.log_tail) 'stderr': self.read_stderr(self.process_name, tail=self.log_tail)
}, Scope.BROADCAST) }, Scope.BROADCAST, Intent.EVENT)

View File

@ -1,7 +1,7 @@
import logging import logging
from xmlrpc.client import Fault as SupervisorFault from xmlrpc.client import Fault as SupervisorFault
from tfw.internals.networking import Scope from tfw.internals.networking import Scope, Intent
from .supervisor import ProcessManager from .supervisor import ProcessManager
@ -26,7 +26,7 @@ class ProcessHandler(ProcessManager):
self.commands[message['key']](message['name']) self.commands[message['key']](message['name'])
except SupervisorFault as fault: except SupervisorFault as fault:
message['error'] = fault.faultString message['error'] = fault.faultString
connector.send_message(message, scope=Scope.WEBSOCKET) connector.send_message(message, scope=Scope.BROADCAST, intent=Intent.EVENT)
except KeyError: except KeyError:
if not message['key'].startswith('process.log'): if not message['key'].startswith('process.log'):
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -1,2 +1,2 @@
# pylint: disable=unused-import # pylint: disable=unused-import
from tfw.internals.event_handling import EventHandler, FSMAwareEventHandler, SignedEventHandler from tfw.internals.event_handling import EventHandler, ControlEventHandler, FSMAwareEventHandler, SignedEventHandler

View File

@ -76,7 +76,7 @@ class FSMBase(Machine, CallbackMixin):
'from_state': from_state, 'from_state': from_state,
'to_state': self.state, 'to_state': self.state,
'trigger': trigger, 'trigger': trigger,
'timestamp': datetime.utcnow() 'timestamp': datetime.utcnow().isoformat()
}) })
@property @property

View File

@ -12,6 +12,7 @@ from tfw.internals.networking import message_bytes
def sign_message(key, message): def sign_message(key, message):
message.pop('scope', None) message.pop('scope', None)
message.pop('intent', None)
message.pop('signature', None) message.pop('signature', None)
signature = message_signature(key, message) signature = message_signature(key, message)
message['signature'] = b64encode(signature).decode() message['signature'] = b64encode(signature).decode()
@ -23,6 +24,7 @@ def message_signature(key, message):
def verify_message(key, message): def verify_message(key, message):
message.pop('scope', None) message.pop('scope', None)
message.pop('intent', None)
message = deepcopy(message) message = deepcopy(message)
try: try:
signature_b64 = message.pop('signature') signature_b64 = message.pop('signature')

View File

@ -1,4 +1,5 @@
from .event_handler import EventHandler from .event_handler import EventHandler
from .event_handler_factory_base import EventHandlerFactoryBase from .event_handler_factory_base import EventHandlerFactoryBase
from .control_event_handler import ControlEventHandler
from .fsm_aware_event_handler import FSMAwareEventHandler from .fsm_aware_event_handler import FSMAwareEventHandler
from .signed_event_handler import SignedEventHandler from .signed_event_handler import SignedEventHandler

View File

@ -0,0 +1,9 @@
from tfw.internals.networking import Intent
from .event_handler import EventHandler
class ControlEventHandler(EventHandler): # pylint: disable=abstract-method
def _event_callback(self, message):
if message.get('intent') != Intent.EVENT.value:
self.handle_event(message, self.connector)

View File

@ -7,6 +7,7 @@ LOG = logging.getLogger(__name__)
class FSMAware: class FSMAware:
keys = ['fsm.update'] keys = ['fsm.update']
def __init__(self): def __init__(self):
self.fsm_state = None self.fsm_state = None
self.fsm_in_accepted_state = False self.fsm_in_accepted_state = False

View File

@ -2,13 +2,13 @@ import logging
from tfw.internals.crypto import KeyManager, verify_message from tfw.internals.crypto import KeyManager, verify_message
from .event_handler import EventHandler from .control_event_handler import ControlEventHandler
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
# pylint: disable=abstract-method # pylint: disable=abstract-method
class SignedEventHandler(EventHandler): class SignedEventHandler(ControlEventHandler):
def __init__(self, connector): def __init__(self, connector):
self._auth_key = KeyManager().auth_key self._auth_key = KeyManager().auth_key
super().__init__(connector) super().__init__(connector)

View File

@ -2,3 +2,4 @@ from .serialization import serialize_tfw_msg, deserialize_tfw_msg, with_deserial
from .zmq_connector import ZMQConnector, ZMQDownlinkConnector, ZMQUplinkConnector from .zmq_connector import ZMQConnector, ZMQDownlinkConnector, ZMQUplinkConnector
from .zmq_listener import ZMQListener from .zmq_listener import ZMQListener
from .scope import Scope from .scope import Scope
from .intent import Intent

View File

@ -0,0 +1,6 @@
from enum import Enum
class Intent(Enum):
CONTROL = 'control'
EVENT = 'event'

View File

@ -4,6 +4,7 @@ import zmq
from zmq.eventloop.zmqstream import ZMQStream from zmq.eventloop.zmqstream import ZMQStream
from .scope import Scope from .scope import Scope
from .intent import Intent
from .serialization import ( from .serialization import (
serialize_tfw_msg, serialize_tfw_msg,
deserialize_tfw_msg, deserialize_tfw_msg,
@ -51,8 +52,10 @@ class ZMQUplinkConnector:
self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0) self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0)
self._zmq_push_socket.connect(connect_addr) self._zmq_push_socket.connect(connect_addr)
def send_message(self, message, scope=Scope.ZMQ): def send_message(self, message, scope=Scope.ZMQ, intent=None):
message['scope'] = scope.value message['scope'] = scope.value
if isinstance(intent, Intent):
message['intent'] = intent.value
self._zmq_push_socket.send_multipart(serialize_tfw_msg(message)) self._zmq_push_socket.send_multipart(serialize_tfw_msg(message))
def close(self): def close(self):