Introduce intent for TFW messages and fix FSM related bugs

This commit is contained in:
R. Richard 2019-08-23 15:27:03 +02:00
parent f626fef8f8
commit 3a720ae540
14 changed files with 42 additions and 19 deletions

View File

@ -1,7 +1,7 @@
import logging
from tfw.internals.crypto import KeyManager, sign_message
from tfw.internals.networking import Scope
from tfw.internals.networking import Scope, Intent
from .fsm_updater import FSMUpdater
@ -12,15 +12,16 @@ class FSMHandler:
keys = ['fsm']
def __init__(self, *, fsm_type, initial_trigger):
self.connector = None
self.fsm = fsm_type()
self._fsm_updater = FSMUpdater(self.fsm)
self.auth_key = KeyManager().auth_key
self.initial_trigger = initial_trigger
self.command_handlers = {
'frontend.ready': self.handle_ready,
'fsm.step' : self.handle_step,
'fsm.announce' : self.handle_announce
'frontend.ready' : self.handle_ready,
'fsm.step' : self.handle_step,
'fsm.update' : self.handle_update
}
def start(self):
@ -32,7 +33,7 @@ class FSMHandler:
if message:
fsm_update_message = self._fsm_updater.fsm_update
sign_message(self.auth_key, fsm_update_message)
connector.send_message(fsm_update_message, Scope.BROADCAST)
connector.send_message(fsm_update_message, Scope.BROADCAST, Intent.EVENT)
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
@ -41,10 +42,10 @@ class FSMHandler:
self.connector.unsubscribe('frontend.ready')
return message
def handle_step(self, message):
def handle_step(self, message): # pylint: disable=inconsistent-return-statements
if self.fsm.step(message['trigger']):
return message
def handle_announce(self, message):
def handle_update(self, message):
# pylint: disable=no-self-use
return message

View File

@ -15,8 +15,9 @@ class FSMUpdater:
{'trigger': trigger}
for trigger in self.fsm.get_triggers(self.fsm.state)
]
if not self.fsm.event_log:
return {}
last_fsm_event = self.fsm.event_log[-1]
last_fsm_event['timestamp'] = last_fsm_event['timestamp'].isoformat()
return {
'current_state': self.fsm.state,
'valid_transitions': valid_transitions,

View File

@ -1,6 +1,4 @@
import logging
from tfw.internals.networking import Scope
from tfw.internals.networking import Scope, Intent
from tfw.internals.inotify import InotifyObserver
from .supervisor import ProcessLogManager
@ -33,4 +31,4 @@ class LogInotifyObserver(InotifyObserver, ProcessLogManager):
'key': 'process.log.new',
'stdout': self.read_stdout(self.process_name, tail=self.log_tail),
'stderr': self.read_stderr(self.process_name, tail=self.log_tail)
}, Scope.BROADCAST)
}, Scope.BROADCAST, Intent.EVENT)

View File

@ -1,7 +1,7 @@
import logging
from xmlrpc.client import Fault as SupervisorFault
from tfw.internals.networking import Scope
from tfw.internals.networking import Scope, Intent
from .supervisor import ProcessManager
@ -26,7 +26,7 @@ class ProcessHandler(ProcessManager):
self.commands[message['key']](message['name'])
except SupervisorFault as fault:
message['error'] = fault.faultString
connector.send_message(message, scope=Scope.WEBSOCKET)
connector.send_message(message, scope=Scope.BROADCAST, intent=Intent.EVENT)
except KeyError:
if not message['key'].startswith('process.log'):
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -1,2 +1,2 @@
# pylint: disable=unused-import
from tfw.internals.event_handling import EventHandler, FSMAwareEventHandler, SignedEventHandler
from tfw.internals.event_handling import EventHandler, ControlEventHandler, FSMAwareEventHandler, SignedEventHandler

View File

@ -76,7 +76,7 @@ class FSMBase(Machine, CallbackMixin):
'from_state': from_state,
'to_state': self.state,
'trigger': trigger,
'timestamp': datetime.utcnow()
'timestamp': datetime.utcnow().isoformat()
})
@property

View File

@ -12,6 +12,7 @@ from tfw.internals.networking import message_bytes
def sign_message(key, message):
message.pop('scope', None)
message.pop('intent', None)
message.pop('signature', None)
signature = message_signature(key, message)
message['signature'] = b64encode(signature).decode()
@ -23,6 +24,7 @@ def message_signature(key, message):
def verify_message(key, message):
message.pop('scope', None)
message.pop('intent', None)
message = deepcopy(message)
try:
signature_b64 = message.pop('signature')

View File

@ -1,4 +1,5 @@
from .event_handler import EventHandler
from .event_handler_factory_base import EventHandlerFactoryBase
from .control_event_handler import ControlEventHandler
from .fsm_aware_event_handler import FSMAwareEventHandler
from .signed_event_handler import SignedEventHandler

View File

@ -0,0 +1,9 @@
from tfw.internals.networking import Intent
from .event_handler import EventHandler
class ControlEventHandler(EventHandler): # pylint: disable=abstract-method
def _event_callback(self, message):
if message.get('intent') != Intent.EVENT.value:
self.handle_event(message, self.connector)

View File

@ -7,6 +7,7 @@ LOG = logging.getLogger(__name__)
class FSMAware:
keys = ['fsm.update']
def __init__(self):
self.fsm_state = None
self.fsm_in_accepted_state = False

View File

@ -2,13 +2,13 @@ import logging
from tfw.internals.crypto import KeyManager, verify_message
from .event_handler import EventHandler
from .control_event_handler import ControlEventHandler
LOG = logging.getLogger(__name__)
# pylint: disable=abstract-method
class SignedEventHandler(EventHandler):
class SignedEventHandler(ControlEventHandler):
def __init__(self, connector):
self._auth_key = KeyManager().auth_key
super().__init__(connector)

View File

@ -2,3 +2,4 @@ from .serialization import serialize_tfw_msg, deserialize_tfw_msg, with_deserial
from .zmq_connector import ZMQConnector, ZMQDownlinkConnector, ZMQUplinkConnector
from .zmq_listener import ZMQListener
from .scope import Scope
from .intent import Intent

View File

@ -0,0 +1,6 @@
from enum import Enum
class Intent(Enum):
CONTROL = 'control'
EVENT = 'event'

View File

@ -4,6 +4,7 @@ import zmq
from zmq.eventloop.zmqstream import ZMQStream
from .scope import Scope
from .intent import Intent
from .serialization import (
serialize_tfw_msg,
deserialize_tfw_msg,
@ -51,8 +52,10 @@ class ZMQUplinkConnector:
self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0)
self._zmq_push_socket.connect(connect_addr)
def send_message(self, message, scope=Scope.ZMQ):
def send_message(self, message, scope=Scope.ZMQ, intent=None):
message['scope'] = scope.value
if isinstance(intent, Intent):
message['intent'] = intent.value
self._zmq_push_socket.send_multipart(serialize_tfw_msg(message))
def close(self):