Add test case for stopping when pipes are deleted from the FS

This commit is contained in:
Kristóf Tóth 2019-08-05 17:33:43 +02:00
parent 7cb6af9c4e
commit 64be21cfa6

View File

@ -1,5 +1,5 @@
# pylint: disable=redefined-outer-name # pylint: disable=redefined-outer-name
from os import stat, urandom from os import stat, urandom, remove
from os.path import exists, dirname, realpath, join from os.path import exists, dirname, realpath, join
from stat import S_ISFIFO from stat import S_ISFIFO
from secrets import token_urlsafe from secrets import token_urlsafe
@ -130,6 +130,15 @@ def test_start_open_eof_stop():
raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop) raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop)
def test_start_open_delete_pipes_stop():
pipe_io = EchoPipeIOServer(*get_test_init_params())
pipe_io.start()
with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe):
remove(pipe_io.in_pipe)
remove(pipe_io.out_pipe)
raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'test_data', [ 'test_data', [
'Cats and cheese', 'Cats and cheese',