Make PipeIOServer a contextmanager

This commit is contained in:
Kristóf Tóth 2019-06-23 18:40:06 +02:00
parent a1e3fe9813
commit 1155011b48
2 changed files with 13 additions and 18 deletions

View File

@ -77,3 +77,10 @@ class PipeIOServer(Thread):
def wait(self): def wait(self):
self._stop_event.wait() self._stop_event.wait()
def __enter__(self):
self.start()
return self
def __exit__(self, type_, value, tb):
self.stop()

View File

@ -5,7 +5,6 @@ from stat import S_ISFIFO
from secrets import token_urlsafe from secrets import token_urlsafe
from random import randint, getrandbits, uniform from random import randint, getrandbits, uniform
from threading import Thread from threading import Thread
from contextlib import contextmanager
from json import dumps, loads from json import dumps, loads
import pytest import pytest
@ -16,17 +15,14 @@ from pipe_io_server import PipeIOServer
@pytest.fixture @pytest.fixture
def io_pipes(): def io_pipes():
with pipe_io_server() as pipe_io: with EchoPipeIOServer(*get_test_init_params()) as pipe_io:
with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes: with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes:
yield io_pipes yield io_pipes
@contextmanager def get_test_init_params():
def pipe_io_server(pipe_io_server_type=EchoPipeIOServer): here = dirname(realpath(__file__))
pipe_io_server = build_pipe_io_server(pipe_io_server_type) return join(here, 'in_pipe_tests'), join(here, 'out_pipe_tests')
pipe_io_server.start()
yield pipe_io_server
pipe_io_server.stop()
def raise_if_thread_blocks(thread_target_function, unblock_function): def raise_if_thread_blocks(thread_target_function, unblock_function):
@ -38,14 +34,6 @@ def raise_if_thread_blocks(thread_target_function, unblock_function):
raise RuntimeError('PipeIOServer failed to shut down!') raise RuntimeError('PipeIOServer failed to shut down!')
def build_pipe_io_server(pipe_io_server_type=EchoPipeIOServer):
here = dirname(realpath(__file__))
return pipe_io_server_type(
join(here, 'in_pipe_tests'),
join(here, 'out_pipe_tests')
)
class IOPipes: class IOPipes:
def __init__(self, in_pipe_path, out_pipe_path): def __init__(self, in_pipe_path, out_pipe_path):
self.in_pipe_path = in_pipe_path self.in_pipe_path = in_pipe_path
@ -79,7 +67,7 @@ def test_run_creates_pipes(io_pipes):
def test_stop(): def test_stop():
pipe_io = build_pipe_io_server() pipe_io = EchoPipeIOServer(*get_test_init_params())
def open_close_in_pipe(): def open_close_in_pipe():
pipe_io.stop() pipe_io.stop()
raise_if_thread_blocks(pipe_io.run, open_close_in_pipe) raise_if_thread_blocks(pipe_io.run, open_close_in_pipe)
@ -160,7 +148,7 @@ def test_json_io(io_pipes):
def test_assign_message_handler(): def test_assign_message_handler():
pipe_io = build_pipe_io_server(PipeIOServer) pipe_io = PipeIOServer(*get_test_init_params())
pipe_io.handle_message = lambda msg: pipe_io.send_message(msg * 2) pipe_io.handle_message = lambda msg: pipe_io.send_message(msg * 2)
pipe_io.start() pipe_io.start()
with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes: with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes: