From 2e5867cc4975490fc6a0b982a947f6d03c6bad8c Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Fri, 23 Aug 2019 15:27:03 +0200 Subject: [PATCH] Introduce intent for TFW messages and fix FSM related bugs --- tfw/components/fsm/fsm_handler.py | 15 ++++++++------- tfw/components/fsm/fsm_updater.py | 3 ++- .../process_management/log_inotify_observer.py | 6 ++---- .../process_management/process_handler.py | 4 ++-- tfw/event_handlers.py | 2 +- tfw/fsm/fsm_base.py | 2 +- tfw/internals/crypto/authentication.py | 2 ++ tfw/internals/event_handling/__init__.py | 1 + .../event_handling/control_event_handler.py | 9 +++++++++ tfw/internals/event_handling/fsm_aware.py | 1 + .../event_handling/signed_event_handler.py | 4 ++-- tfw/internals/networking/__init__.py | 1 + tfw/internals/networking/intent.py | 6 ++++++ tfw/internals/networking/zmq_connector.py | 5 ++++- 14 files changed, 42 insertions(+), 19 deletions(-) create mode 100644 tfw/internals/event_handling/control_event_handler.py create mode 100644 tfw/internals/networking/intent.py diff --git a/tfw/components/fsm/fsm_handler.py b/tfw/components/fsm/fsm_handler.py index b26db86..9ea1374 100644 --- a/tfw/components/fsm/fsm_handler.py +++ b/tfw/components/fsm/fsm_handler.py @@ -1,7 +1,7 @@ import logging from tfw.internals.crypto import KeyManager, sign_message -from tfw.internals.networking import Scope +from tfw.internals.networking import Scope, Intent from .fsm_updater import FSMUpdater @@ -12,15 +12,16 @@ class FSMHandler: keys = ['fsm'] def __init__(self, *, fsm_type, initial_trigger): + self.connector = None self.fsm = fsm_type() self._fsm_updater = FSMUpdater(self.fsm) self.auth_key = KeyManager().auth_key self.initial_trigger = initial_trigger self.command_handlers = { - 'frontend.ready': self.handle_ready, - 'fsm.step' : self.handle_step, - 'fsm.announce' : self.handle_announce + 'frontend.ready' : self.handle_ready, + 'fsm.step' : self.handle_step, + 'fsm.update' : self.handle_update } def start(self): @@ -32,7 +33,7 @@ class FSMHandler: if message: fsm_update_message = self._fsm_updater.fsm_update sign_message(self.auth_key, fsm_update_message) - connector.send_message(fsm_update_message, Scope.BROADCAST) + connector.send_message(fsm_update_message, Scope.BROADCAST, Intent.EVENT) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) @@ -41,10 +42,10 @@ class FSMHandler: self.connector.unsubscribe('frontend.ready') return message - def handle_step(self, message): + def handle_step(self, message): # pylint: disable=inconsistent-return-statements if self.fsm.step(message['trigger']): return message - def handle_announce(self, message): + def handle_update(self, message): # pylint: disable=no-self-use return message diff --git a/tfw/components/fsm/fsm_updater.py b/tfw/components/fsm/fsm_updater.py index 276b65d..1c671ad 100644 --- a/tfw/components/fsm/fsm_updater.py +++ b/tfw/components/fsm/fsm_updater.py @@ -15,8 +15,9 @@ class FSMUpdater: {'trigger': trigger} for trigger in self.fsm.get_triggers(self.fsm.state) ] + if not self.fsm.event_log: + return {} last_fsm_event = self.fsm.event_log[-1] - last_fsm_event['timestamp'] = last_fsm_event['timestamp'].isoformat() return { 'current_state': self.fsm.state, 'valid_transitions': valid_transitions, diff --git a/tfw/components/process_management/log_inotify_observer.py b/tfw/components/process_management/log_inotify_observer.py index 1c80dd1..1a5887a 100644 --- a/tfw/components/process_management/log_inotify_observer.py +++ b/tfw/components/process_management/log_inotify_observer.py @@ -1,6 +1,4 @@ -import logging - -from tfw.internals.networking import Scope +from tfw.internals.networking import Scope, Intent from tfw.internals.inotify import InotifyObserver from .supervisor import ProcessLogManager @@ -33,4 +31,4 @@ class LogInotifyObserver(InotifyObserver, ProcessLogManager): 'key': 'process.log.new', 'stdout': self.read_stdout(self.process_name, tail=self.log_tail), 'stderr': self.read_stderr(self.process_name, tail=self.log_tail) - }, Scope.BROADCAST) + }, Scope.BROADCAST, Intent.EVENT) diff --git a/tfw/components/process_management/process_handler.py b/tfw/components/process_management/process_handler.py index e24a83f..04c121b 100644 --- a/tfw/components/process_management/process_handler.py +++ b/tfw/components/process_management/process_handler.py @@ -1,7 +1,7 @@ import logging from xmlrpc.client import Fault as SupervisorFault -from tfw.internals.networking import Scope +from tfw.internals.networking import Scope, Intent from .supervisor import ProcessManager @@ -26,7 +26,7 @@ class ProcessHandler(ProcessManager): self.commands[message['key']](message['name']) except SupervisorFault as fault: message['error'] = fault.faultString - connector.send_message(message, scope=Scope.WEBSOCKET) + connector.send_message(message, scope=Scope.BROADCAST, intent=Intent.EVENT) except KeyError: if not message['key'].startswith('process.log'): LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) diff --git a/tfw/event_handlers.py b/tfw/event_handlers.py index 0ea1050..3cd92f5 100644 --- a/tfw/event_handlers.py +++ b/tfw/event_handlers.py @@ -1,2 +1,2 @@ # pylint: disable=unused-import -from tfw.internals.event_handling import EventHandler, FSMAwareEventHandler, SignedEventHandler +from tfw.internals.event_handling import EventHandler, ControlEventHandler, FSMAwareEventHandler, SignedEventHandler diff --git a/tfw/fsm/fsm_base.py b/tfw/fsm/fsm_base.py index 0fac40b..c35d094 100644 --- a/tfw/fsm/fsm_base.py +++ b/tfw/fsm/fsm_base.py @@ -76,7 +76,7 @@ class FSMBase(Machine, CallbackMixin): 'from_state': from_state, 'to_state': self.state, 'trigger': trigger, - 'timestamp': datetime.utcnow() + 'timestamp': datetime.utcnow().isoformat() }) @property diff --git a/tfw/internals/crypto/authentication.py b/tfw/internals/crypto/authentication.py index 26e9e11..0ca19a3 100644 --- a/tfw/internals/crypto/authentication.py +++ b/tfw/internals/crypto/authentication.py @@ -12,6 +12,7 @@ from tfw.internals.networking import message_bytes def sign_message(key, message): message.pop('scope', None) + message.pop('intent', None) message.pop('signature', None) signature = message_signature(key, message) message['signature'] = b64encode(signature).decode() @@ -23,6 +24,7 @@ def message_signature(key, message): def verify_message(key, message): message.pop('scope', None) + message.pop('intent', None) message = deepcopy(message) try: signature_b64 = message.pop('signature') diff --git a/tfw/internals/event_handling/__init__.py b/tfw/internals/event_handling/__init__.py index 8238d21..0803cc5 100644 --- a/tfw/internals/event_handling/__init__.py +++ b/tfw/internals/event_handling/__init__.py @@ -1,4 +1,5 @@ from .event_handler import EventHandler from .event_handler_factory_base import EventHandlerFactoryBase +from .control_event_handler import ControlEventHandler from .fsm_aware_event_handler import FSMAwareEventHandler from .signed_event_handler import SignedEventHandler diff --git a/tfw/internals/event_handling/control_event_handler.py b/tfw/internals/event_handling/control_event_handler.py new file mode 100644 index 0000000..dda4dae --- /dev/null +++ b/tfw/internals/event_handling/control_event_handler.py @@ -0,0 +1,9 @@ +from tfw.internals.networking import Intent + +from .event_handler import EventHandler + + +class ControlEventHandler(EventHandler): # pylint: disable=abstract-method + def _event_callback(self, message): + if message.get('intent') != Intent.EVENT.value: + self.handle_event(message, self.connector) diff --git a/tfw/internals/event_handling/fsm_aware.py b/tfw/internals/event_handling/fsm_aware.py index b888220..e5de2dc 100644 --- a/tfw/internals/event_handling/fsm_aware.py +++ b/tfw/internals/event_handling/fsm_aware.py @@ -7,6 +7,7 @@ LOG = logging.getLogger(__name__) class FSMAware: keys = ['fsm.update'] + def __init__(self): self.fsm_state = None self.fsm_in_accepted_state = False diff --git a/tfw/internals/event_handling/signed_event_handler.py b/tfw/internals/event_handling/signed_event_handler.py index 1ee72b8..581035f 100644 --- a/tfw/internals/event_handling/signed_event_handler.py +++ b/tfw/internals/event_handling/signed_event_handler.py @@ -2,13 +2,13 @@ import logging from tfw.internals.crypto import KeyManager, verify_message -from .event_handler import EventHandler +from .control_event_handler import ControlEventHandler LOG = logging.getLogger(__name__) # pylint: disable=abstract-method -class SignedEventHandler(EventHandler): +class SignedEventHandler(ControlEventHandler): def __init__(self, connector): self._auth_key = KeyManager().auth_key super().__init__(connector) diff --git a/tfw/internals/networking/__init__.py b/tfw/internals/networking/__init__.py index dca7b24..520cbec 100644 --- a/tfw/internals/networking/__init__.py +++ b/tfw/internals/networking/__init__.py @@ -2,3 +2,4 @@ from .serialization import serialize_tfw_msg, deserialize_tfw_msg, with_deserial from .zmq_connector import ZMQConnector, ZMQDownlinkConnector, ZMQUplinkConnector from .zmq_listener import ZMQListener from .scope import Scope +from .intent import Intent diff --git a/tfw/internals/networking/intent.py b/tfw/internals/networking/intent.py new file mode 100644 index 0000000..a13c7bb --- /dev/null +++ b/tfw/internals/networking/intent.py @@ -0,0 +1,6 @@ +from enum import Enum + + +class Intent(Enum): + CONTROL = 'control' + EVENT = 'event' diff --git a/tfw/internals/networking/zmq_connector.py b/tfw/internals/networking/zmq_connector.py index 99920c7..10c5114 100644 --- a/tfw/internals/networking/zmq_connector.py +++ b/tfw/internals/networking/zmq_connector.py @@ -4,6 +4,7 @@ import zmq from zmq.eventloop.zmqstream import ZMQStream from .scope import Scope +from .intent import Intent from .serialization import ( serialize_tfw_msg, deserialize_tfw_msg, @@ -51,8 +52,10 @@ class ZMQUplinkConnector: self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0) self._zmq_push_socket.connect(connect_addr) - def send_message(self, message, scope=Scope.ZMQ): + def send_message(self, message, scope=Scope.ZMQ, intent=None): message['scope'] = scope.value + if isinstance(intent, Intent): + message['intent'] = intent.value self._zmq_push_socket.send_multipart(serialize_tfw_msg(message)) def close(self):