mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-22 12:01:32 +00:00
Implement test cases for PipeIOHandler
This commit is contained in:
parent
89b720c439
commit
8890e67b86
@ -21,7 +21,8 @@ class PipeIOHandlerBase:
|
|||||||
|
|
||||||
def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS):
|
def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS):
|
||||||
self.connector = None
|
self.connector = None
|
||||||
self._in_pipe = in_pipe_path
|
self.in_pipe = in_pipe_path
|
||||||
|
self.out_pipe = out_pipe_path
|
||||||
self.pipe_io = PipeIOServer(
|
self.pipe_io = PipeIOServer(
|
||||||
in_pipe_path,
|
in_pipe_path,
|
||||||
out_pipe_path,
|
out_pipe_path,
|
||||||
@ -34,7 +35,7 @@ class PipeIOHandlerBase:
|
|||||||
try:
|
try:
|
||||||
self.handle_pipe_event(message)
|
self.handle_pipe_event(message)
|
||||||
except: # pylint: disable=bare-except
|
except: # pylint: disable=bare-except
|
||||||
LOG.exception('Failed to handle message %s from pipe %s!', message, self._in_pipe)
|
LOG.exception('Failed to handle message %s from pipe %s!', message, self.in_pipe)
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def handle_pipe_event(self, message_bytes):
|
def handle_pipe_event(self, message_bytes):
|
||||||
|
45
tfw/components/pipe_io/test_pipe_io_handler.py
Normal file
45
tfw/components/pipe_io/test_pipe_io_handler.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
# pylint: disable=redefined-outer-name
|
||||||
|
from tempfile import TemporaryDirectory
|
||||||
|
from os.path import join
|
||||||
|
from queue import Queue
|
||||||
|
from json import dumps, loads
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from .pipe_io_handler import PipeIOHandler
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def pipe_io():
|
||||||
|
with TemporaryDirectory() as tmpdir:
|
||||||
|
pipeio = PipeIOHandler(
|
||||||
|
join(tmpdir, 'in'),
|
||||||
|
join(tmpdir, 'out'),
|
||||||
|
permissions=0o600
|
||||||
|
)
|
||||||
|
pipeio.connector = MockConnector()
|
||||||
|
yield pipeio
|
||||||
|
pipeio.cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
class MockConnector:
|
||||||
|
def __init__(self):
|
||||||
|
self.messages = Queue()
|
||||||
|
|
||||||
|
def send_message(self, msg):
|
||||||
|
self.messages.put(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def test_pipe_io_handler_recv(pipe_io):
|
||||||
|
test_msg = {'key': 'cica'}
|
||||||
|
with open(pipe_io.in_pipe, 'w') as ofile:
|
||||||
|
ofile.write(dumps(test_msg) + '\n')
|
||||||
|
|
||||||
|
assert pipe_io.connector.messages.get() == test_msg
|
||||||
|
|
||||||
|
|
||||||
|
def test_pipe_io_handler_send(pipe_io):
|
||||||
|
test_msg = {'key': 'cica'}
|
||||||
|
pipe_io.handle_event(test_msg, None)
|
||||||
|
with open(pipe_io.out_pipe, 'r') as ifile:
|
||||||
|
assert loads(ifile.readline()) == test_msg
|
Loading…
Reference in New Issue
Block a user