mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-14 06:27:16 +00:00
Implement unit tests for EventHandler subclasses
This commit is contained in:
parent
620212f00f
commit
10cd07973d
@ -0,0 +1,33 @@
|
||||
from tfw.internals.networking import Intent
|
||||
|
||||
from .util import DummyConnector, simulate_event
|
||||
from ..control_event_handler import ControlEventHandler
|
||||
|
||||
|
||||
def test_receives_control_intent():
|
||||
eh = ControlEventHandler(DummyConnector())
|
||||
messages = []
|
||||
eh.handle_event = lambda msg, _: messages.append(msg)
|
||||
|
||||
test_msg = {"key": "cica", "intent": Intent.CONTROL.value}
|
||||
simulate_event(eh, test_msg)
|
||||
assert messages[0] == test_msg
|
||||
|
||||
|
||||
def test_no_intent_defaults_to_control():
|
||||
eh = ControlEventHandler(DummyConnector())
|
||||
messages = []
|
||||
eh.handle_event = lambda msg, _: messages.append(msg)
|
||||
|
||||
test_msg = {"key": "cica"}
|
||||
simulate_event(eh, test_msg)
|
||||
assert messages[0] == test_msg
|
||||
|
||||
|
||||
def test_ignores_event_intent():
|
||||
eh = ControlEventHandler(DummyConnector())
|
||||
messages = []
|
||||
eh.handle_event = lambda msg, _: messages.append(msg)
|
||||
|
||||
simulate_event(eh, {"key": "cica", "intent": Intent.EVENT.value})
|
||||
assert not messages
|
@ -4,10 +4,10 @@ from random import randint
|
||||
|
||||
import pytest
|
||||
|
||||
from .event_handler_factory_base import EventHandlerFactoryBase
|
||||
from .event_handler import EventHandler
|
||||
from .control_event_handler import ControlEventHandler
|
||||
from .fsm_aware_event_handler import FSMAwareEventHandler
|
||||
from ..event_handler_factory_base import EventHandlerFactoryBase
|
||||
from ..event_handler import EventHandler
|
||||
from ..control_event_handler import ControlEventHandler
|
||||
from ..fsm_aware_event_handler import FSMAwareEventHandler
|
||||
|
||||
|
||||
class MockEventHandlerFactory(EventHandlerFactoryBase):
|
@ -0,0 +1,69 @@
|
||||
# pylint: disable=redefined-outer-name
|
||||
import pytest
|
||||
|
||||
from tfw.internals.networking import Intent
|
||||
from tfw.internals.crypto import KeyManager, sign_message
|
||||
|
||||
from .util import DummyConnector, simulate_event
|
||||
from ..fsm_aware_event_handler import FSMAwareEventHandler
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def key():
|
||||
yield KeyManager().auth_key
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fsm_update_msg():
|
||||
yield {
|
||||
'key': 'fsm.update',
|
||||
'intent': Intent.EVENT.value,
|
||||
'scope': 'broadcast',
|
||||
'current_state': '1',
|
||||
'in_accepted_state': False,
|
||||
'last_event': {
|
||||
'from_state': '0',
|
||||
'timestamp': '2019-09-04T16:51:15.587555',
|
||||
'to_state': '1', 'trigger':
|
||||
'step_1'
|
||||
},
|
||||
'valid_transitions': [{'trigger': 'step_2'}]
|
||||
}
|
||||
|
||||
|
||||
def test_ignores_other_keys():
|
||||
messages = []
|
||||
eh = FSMAwareEventHandler(DummyConnector())
|
||||
eh.handle_event = lambda msg, _: messages.append(msg)
|
||||
|
||||
simulate_event(eh, {"key": "not.fsm.update"})
|
||||
assert not messages
|
||||
|
||||
|
||||
def test_ignores_unauthenticated(fsm_update_msg):
|
||||
messages = []
|
||||
eh = FSMAwareEventHandler(DummyConnector())
|
||||
eh.handle_event = lambda msg, _: messages.append(msg)
|
||||
|
||||
simulate_event(eh, fsm_update_msg)
|
||||
assert not messages
|
||||
|
||||
|
||||
def test_accepts_authenticated(key, fsm_update_msg):
|
||||
messages = []
|
||||
eh = FSMAwareEventHandler(DummyConnector())
|
||||
eh.handle_event = lambda msg, _: messages.append(msg)
|
||||
|
||||
sign_message(key, fsm_update_msg)
|
||||
simulate_event(eh, fsm_update_msg)
|
||||
assert messages[0] == fsm_update_msg
|
||||
assert eh.fsm_state == '1'
|
||||
assert not eh.fsm_in_accepted_state
|
||||
assert len(eh.fsm_event_log) == 1
|
||||
|
||||
fsm_update_msg['in_accepted_state'] = True
|
||||
fsm_update_msg['current_state'] = '2'
|
||||
sign_message(key, fsm_update_msg)
|
||||
simulate_event(eh, fsm_update_msg)
|
||||
assert eh.fsm_state == '2'
|
||||
assert eh.fsm_in_accepted_state
|
@ -0,0 +1,32 @@
|
||||
# pylint: disable=redefined-outer-name
|
||||
import pytest
|
||||
|
||||
from tfw.internals.crypto import sign_message, KeyManager
|
||||
|
||||
from .util import DummyConnector, simulate_event
|
||||
from ..signed_event_handler import SignedEventHandler
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def key():
|
||||
yield KeyManager().auth_key
|
||||
|
||||
|
||||
def test_ignores_unauthenticated():
|
||||
eh = SignedEventHandler(DummyConnector())
|
||||
messages = []
|
||||
eh.handle_event = lambda msg, _: messages.append(msg)
|
||||
|
||||
simulate_event(eh, {"key": "cica"})
|
||||
assert not messages
|
||||
|
||||
|
||||
def test_accepts_authenticated(key):
|
||||
eh = SignedEventHandler(DummyConnector())
|
||||
messages = []
|
||||
eh.handle_event = lambda msg, _: messages.append(msg)
|
||||
|
||||
test_msg = {"key": "cica"}
|
||||
sign_message(key, test_msg)
|
||||
simulate_event(eh, test_msg)
|
||||
assert messages[0] == test_msg
|
11
tfw/internals/event_handling/test_event_handling/util.py
Normal file
11
tfw/internals/event_handling/test_event_handling/util.py
Normal file
@ -0,0 +1,11 @@
|
||||
class DummyConnector():
|
||||
def send_message(self, message, scope=None, intent=None):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
def simulate_event(event_handler, message):
|
||||
# pylint: disable=protected-access
|
||||
event_handler._event_callback(message)
|
Loading…
Reference in New Issue
Block a user