72 lines
1.9 KiB
Python
72 lines
1.9 KiB
Python
# pylint: disable=redefined-outer-name
|
|
import pytest
|
|
|
|
from tfw.internals.networking import Intent
|
|
from tfw.internals.crypto import KeyManager, sign_message
|
|
|
|
from .util import DummyConnector, simulate_event
|
|
from ..fsm_aware_event_handler import FSMAwareEventHandler
|
|
|
|
|
|
@pytest.fixture
|
|
def key():
|
|
yield KeyManager().auth_key
|
|
|
|
|
|
@pytest.fixture
|
|
def fsm_update_msg():
|
|
yield {
|
|
'key': 'fsm.update',
|
|
'intent': Intent.EVENT.value,
|
|
'scope': 'broadcast',
|
|
'current_state': '1',
|
|
'in_accepted_state': False,
|
|
'last_event': {
|
|
'from_state': '0',
|
|
'timestamp': '2019-09-04T16:51:15.587555',
|
|
'to_state': '1', 'trigger':
|
|
'step_1'
|
|
},
|
|
'valid_transitions': [{'trigger': 'step_2'}]
|
|
}
|
|
|
|
|
|
def test_ignores_unauthenticated(fsm_update_msg):
|
|
messages = []
|
|
eh = FSMAwareEventHandler(DummyConnector())
|
|
eh.handle_event = lambda msg, _: messages.append(msg)
|
|
|
|
simulate_event(eh, fsm_update_msg)
|
|
assert not messages
|
|
|
|
|
|
def test_ignores_other_keys(key):
|
|
messages = []
|
|
eh = FSMAwareEventHandler(DummyConnector())
|
|
eh.handle_event = lambda msg, _: messages.append(msg)
|
|
|
|
test_msg = {"key": "not.fsm.update"}
|
|
sign_message(key, test_msg)
|
|
simulate_event(eh, test_msg)
|
|
assert not messages
|
|
|
|
|
|
def test_accepts_authenticated(key, fsm_update_msg):
|
|
messages = []
|
|
eh = FSMAwareEventHandler(DummyConnector())
|
|
eh.handle_event = lambda msg, _: messages.append(msg)
|
|
|
|
sign_message(key, fsm_update_msg)
|
|
simulate_event(eh, fsm_update_msg)
|
|
assert messages[0] == fsm_update_msg
|
|
assert eh.fsm_state == '1'
|
|
assert not eh.fsm_in_accepted_state
|
|
assert len(eh.fsm_event_log) == 1
|
|
|
|
fsm_update_msg['in_accepted_state'] = True
|
|
fsm_update_msg['current_state'] = '2'
|
|
sign_message(key, fsm_update_msg)
|
|
simulate_event(eh, fsm_update_msg)
|
|
assert eh.fsm_state == '2'
|
|
assert eh.fsm_in_accepted_state
|