diff --git a/echo_server.py b/echo_server.py index 136c933..2ea4c2b 100644 --- a/echo_server.py +++ b/echo_server.py @@ -12,7 +12,7 @@ if __name__ == "__main__": pipe_io = EchoPipeIOServer('in', 'out') signal(SIGTERM, lambda a, b: pipe_io.stop()) signal(SIGINT, lambda a, b: pipe_io.stop()) + pipe_io.start() print('Running pipe IO server with named pipes:') print(f'Input: {pipe_io.in_pipe}') print(f'Output: {pipe_io.out_pipe}') - pipe_io.run() diff --git a/pipe_io_server/pipe_io_server.py b/pipe_io_server/pipe_io_server.py index 1563484..de8d088 100644 --- a/pipe_io_server/pipe_io_server.py +++ b/pipe_io_server/pipe_io_server.py @@ -1,13 +1,15 @@ from abc import ABC, abstractmethod -from threading import Event +from threading import Thread, Event from .pipe_reader_thread import PipeReaderThread from .pipe_writer_thread import PipeWriterThread from .pipe import Pipe +from .terminate_process_on_failure import terminate_process_on_failure -class PipeIOServer(ABC): +class PipeIOServer(ABC, Thread): def __init__(self, in_pipe=None, out_pipe=None): + super().__init__() self._in_pipe, self._out_pipe = in_pipe, out_pipe self._create_pipes() self._stop_event = Event() @@ -38,6 +40,7 @@ class PipeIOServer(ABC): def send(self, message): self._writer_thread.write(message) + @terminate_process_on_failure def run(self): for thread in self._io_threads: thread.start() @@ -46,6 +49,8 @@ class PipeIOServer(ABC): def stop(self): self._stop_event.set() + if self.is_alive(): + self.join() def _stop(self): for thread in self._io_threads: diff --git a/tests.py b/tests.py index 518adcf..16bd122 100644 --- a/tests.py +++ b/tests.py @@ -13,10 +13,6 @@ import pytest from echo_server import EchoPipeIOServer -BLOCK_TIMEOUT = 1 -SHUTDOWN_FAILURE_MSG = 'PipeIOServer failed to shut down!' - - @pytest.fixture def io_pipes(): with pipe_io_server() as pipe_io: @@ -27,22 +23,18 @@ def io_pipes(): @contextmanager def pipe_io_server(pipe_io_server_type=EchoPipeIOServer): pipe_io_server = build_pipe_io_server(pipe_io_server_type) - thread = Thread(target=pipe_io_server.run) - thread.start() + pipe_io_server.start() yield pipe_io_server pipe_io_server.stop() - thread.join(timeout=BLOCK_TIMEOUT) - if thread.is_alive(): - raise RuntimeError(SHUTDOWN_FAILURE_MSG) -def raise_if_thread_blocks(thread_target_function, unblock_function, timeout=BLOCK_TIMEOUT): +def raise_if_thread_blocks(thread_target_function, unblock_function): thread = Thread(target=thread_target_function) thread.start() unblock_function() - thread.join(timeout=timeout) + thread.join(timeout=1) if thread.is_alive(): - raise RuntimeError(SHUTDOWN_FAILURE_MSG) + raise RuntimeError('PipeIOServer failed to shut down!') def build_pipe_io_server(pipe_io_server_type=EchoPipeIOServer):