# pylint: disable=redefined-outer-name from os import stat, urandom from os.path import exists, dirname, realpath, join from stat import S_ISFIFO from secrets import token_urlsafe import pytest from echo_server import EchoPipeIOServer @pytest.fixture def pipe_io(): here = dirname(realpath(__file__)) pipe_server = EchoPipeIOServer( join(here, 'in_pipe_tests'), join(here, 'out_pipe_tests') ) pipe_server.run() yield pipe_server pipe_server.stop() def test_pipes_exist(pipe_io): for path in (pipe_io.in_pipe, pipe_io.out_pipe): assert exists(path) def test_pipes_isfifo(pipe_io): for path in (pipe_io.in_pipe, pipe_io.out_pipe): assert S_ISFIFO(stat(path).st_mode) @pytest.mark.parametrize( 'test_data', [ 'Cats and cheese', 'You ever wonder why we are here?', 'Lorem ipsum dolor sit amet', 'You always have a plan, Dutch!', token_urlsafe(32), token_urlsafe(32), token_urlsafe(32), token_urlsafe(32) ] ) def test_io(pipe_io, test_data): File(pipe_io.in_pipe, string=True).write(test_data) assert File(pipe_io.out_pipe, string=True).read() == test_data @pytest.mark.parametrize( 'test_data_size', [ 1024, 1024*1024, 2*1024*1024, 4*1024*1024, 8*1024*1024, 16*1024*1024, 32*1024*1024 ] ) def test_io_large_data(pipe_io, test_data_size): random_data = urandom(test_data_size) File(pipe_io.in_pipe).write(random_data) assert File(pipe_io.out_pipe).read() == random_data def test_stop_removes_pipes(pipe_io): pipe_io.stop() for path in (pipe_io.in_pipe, pipe_io.out_pipe): assert not exists(path) class File: def __init__(self, path, string=False): self.path = path self._filemode = '' if string else 'b' def write(self, what): with open(self.path, f'w{self._filemode}') as ofile: ofile.write(what) def read(self): with open(self.path, f'r{self._filemode}') as ifile: return ifile.read()