Ensure test pipes are deleted after each test session

This commit is contained in:
Kristóf Tóth 2019-09-05 13:52:08 +02:00
parent 4aff5b1274
commit b98a1df962
1 changed files with 7 additions and 3 deletions

View File

@ -29,6 +29,12 @@ def get_test_init_params():
return join(here, 'in_pipe_tests'), join(here, 'out_pipe_tests')
def teardown_module():
for pipe in get_test_init_params():
if exists(pipe):
remove(pipe)
def raise_if_thread_blocks(*, target, unblock_function):
thread = Thread(target=target)
thread.start()
@ -213,10 +219,8 @@ def test_json_io(io_pipes):
def test_assign_message_handler():
pipe_io = PipeIOServer(*get_test_init_params())
pipe_io.handle_message = lambda msg: pipe_io.send_message(msg * 2)
pipe_io.start()
with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes:
with pipe_io, IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes:
for _ in range(100):
test_data = token_urlsafe(32).encode()
io_pipes.send_message(test_data)
assert io_pipes.recv() == test_data * 2
pipe_io.stop()