2018-12-14 16:13:21 +00:00
|
|
|
# pylint: disable=redefined-outer-name
|
2018-12-14 14:40:11 +00:00
|
|
|
from os import stat
|
2018-12-14 16:13:21 +00:00
|
|
|
from os.path import exists, dirname, realpath, join
|
2018-12-14 14:40:11 +00:00
|
|
|
from stat import S_ISFIFO
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
from echo_server import EchoPipeIOServer
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def pipe_io():
|
2018-12-14 16:13:21 +00:00
|
|
|
here = dirname(realpath(__file__))
|
|
|
|
pipe_server = EchoPipeIOServer(
|
|
|
|
join(here, 'in_pipe_tests'),
|
|
|
|
join(here, 'out_pipe_tests')
|
|
|
|
)
|
2018-12-14 14:40:11 +00:00
|
|
|
pipe_server.run()
|
|
|
|
yield pipe_server
|
|
|
|
pipe_server.stop()
|
|
|
|
|
|
|
|
|
|
|
|
def test_pipes_exist(pipe_io):
|
|
|
|
for path in (pipe_io.in_pipe, pipe_io.out_pipe):
|
|
|
|
assert exists(path)
|
|
|
|
|
|
|
|
|
|
|
|
def test_pipes_isfifo(pipe_io):
|
|
|
|
for path in (pipe_io.in_pipe, pipe_io.out_pipe):
|
|
|
|
assert S_ISFIFO(stat(path).st_mode)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
'test_data', [
|
|
|
|
'cats',
|
|
|
|
'cheese',
|
|
|
|
'You ever wonder why we are here?',
|
|
|
|
'Lorem ipsum dolor sit amet',
|
|
|
|
'You always have a plan, Dutch!'
|
|
|
|
]
|
|
|
|
)
|
|
|
|
def test_echo_server(pipe_io, test_data):
|
|
|
|
File(pipe_io.in_pipe).write(test_data)
|
|
|
|
assert File(pipe_io.out_pipe).read() == test_data
|
|
|
|
|
|
|
|
|
2018-12-15 00:07:02 +00:00
|
|
|
def test_stop_removes_pipes(pipe_io):
|
|
|
|
pipe_io.stop()
|
|
|
|
for path in (pipe_io.in_pipe, pipe_io.out_pipe):
|
|
|
|
assert not exists(path)
|
|
|
|
|
|
|
|
|
2018-12-14 14:40:11 +00:00
|
|
|
class File:
|
|
|
|
def __init__(self, path):
|
|
|
|
self.path = path
|
|
|
|
|
|
|
|
def write(self, what):
|
|
|
|
with open(self.path, 'w') as ofile:
|
|
|
|
ofile.write(what)
|
|
|
|
|
|
|
|
def read(self):
|
|
|
|
with open(self.path, 'r') as ifile:
|
|
|
|
return ifile.read()
|