Make PipeIOServer non-blocking

This commit is contained in:
Kristóf Tóth 2019-04-10 12:40:51 +02:00
parent deafc516bd
commit 5ff2f6c27d
3 changed files with 12 additions and 15 deletions

View File

@ -12,7 +12,7 @@ if __name__ == "__main__":
pipe_io = EchoPipeIOServer('in', 'out') pipe_io = EchoPipeIOServer('in', 'out')
signal(SIGTERM, lambda a, b: pipe_io.stop()) signal(SIGTERM, lambda a, b: pipe_io.stop())
signal(SIGINT, lambda a, b: pipe_io.stop()) signal(SIGINT, lambda a, b: pipe_io.stop())
pipe_io.start()
print('Running pipe IO server with named pipes:') print('Running pipe IO server with named pipes:')
print(f'Input: {pipe_io.in_pipe}') print(f'Input: {pipe_io.in_pipe}')
print(f'Output: {pipe_io.out_pipe}') print(f'Output: {pipe_io.out_pipe}')
pipe_io.run()

View File

@ -1,13 +1,15 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from threading import Event from threading import Thread, Event
from .pipe_reader_thread import PipeReaderThread from .pipe_reader_thread import PipeReaderThread
from .pipe_writer_thread import PipeWriterThread from .pipe_writer_thread import PipeWriterThread
from .pipe import Pipe from .pipe import Pipe
from .terminate_process_on_failure import terminate_process_on_failure
class PipeIOServer(ABC): class PipeIOServer(ABC, Thread):
def __init__(self, in_pipe=None, out_pipe=None): def __init__(self, in_pipe=None, out_pipe=None):
super().__init__()
self._in_pipe, self._out_pipe = in_pipe, out_pipe self._in_pipe, self._out_pipe = in_pipe, out_pipe
self._create_pipes() self._create_pipes()
self._stop_event = Event() self._stop_event = Event()
@ -38,6 +40,7 @@ class PipeIOServer(ABC):
def send(self, message): def send(self, message):
self._writer_thread.write(message) self._writer_thread.write(message)
@terminate_process_on_failure
def run(self): def run(self):
for thread in self._io_threads: for thread in self._io_threads:
thread.start() thread.start()
@ -46,6 +49,8 @@ class PipeIOServer(ABC):
def stop(self): def stop(self):
self._stop_event.set() self._stop_event.set()
if self.is_alive():
self.join()
def _stop(self): def _stop(self):
for thread in self._io_threads: for thread in self._io_threads:

View File

@ -13,10 +13,6 @@ import pytest
from echo_server import EchoPipeIOServer from echo_server import EchoPipeIOServer
BLOCK_TIMEOUT = 1
SHUTDOWN_FAILURE_MSG = 'PipeIOServer failed to shut down!'
@pytest.fixture @pytest.fixture
def io_pipes(): def io_pipes():
with pipe_io_server() as pipe_io: with pipe_io_server() as pipe_io:
@ -27,22 +23,18 @@ def io_pipes():
@contextmanager @contextmanager
def pipe_io_server(pipe_io_server_type=EchoPipeIOServer): def pipe_io_server(pipe_io_server_type=EchoPipeIOServer):
pipe_io_server = build_pipe_io_server(pipe_io_server_type) pipe_io_server = build_pipe_io_server(pipe_io_server_type)
thread = Thread(target=pipe_io_server.run) pipe_io_server.start()
thread.start()
yield pipe_io_server yield pipe_io_server
pipe_io_server.stop() pipe_io_server.stop()
thread.join(timeout=BLOCK_TIMEOUT)
if thread.is_alive():
raise RuntimeError(SHUTDOWN_FAILURE_MSG)
def raise_if_thread_blocks(thread_target_function, unblock_function, timeout=BLOCK_TIMEOUT): def raise_if_thread_blocks(thread_target_function, unblock_function):
thread = Thread(target=thread_target_function) thread = Thread(target=thread_target_function)
thread.start() thread.start()
unblock_function() unblock_function()
thread.join(timeout=timeout) thread.join(timeout=1)
if thread.is_alive(): if thread.is_alive():
raise RuntimeError(SHUTDOWN_FAILURE_MSG) raise RuntimeError('PipeIOServer failed to shut down!')
def build_pipe_io_server(pipe_io_server_type=EchoPipeIOServer): def build_pipe_io_server(pipe_io_server_type=EchoPipeIOServer):