mirror of
				https://github.com/avatao-content/baseimage-tutorial-framework
				synced 2025-11-04 00:52:55 +00:00 
			
		
		
		
	Replace pipe-io-server source with the new pip package
This commit is contained in:
		@@ -10,7 +10,7 @@ from secrets import token_urlsafe
 | 
			
		||||
from threading import Thread
 | 
			
		||||
from contextlib import suppress
 | 
			
		||||
 | 
			
		||||
from .pipe_io_server import PipeIOServer, terminate_process_on_failure
 | 
			
		||||
from pipe_io_server import PipeIOServer, terminate_process_on_failure
 | 
			
		||||
 | 
			
		||||
LOG = logging.getLogger(__name__)
 | 
			
		||||
DEFAULT_PERMISSIONS = 0o600
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +0,0 @@
 | 
			
		||||
from .pipe_io_server import PipeIOServer
 | 
			
		||||
from .pipe_reader_server import PipeReaderServer
 | 
			
		||||
from .pipe_writer_server import PipeWriterServer
 | 
			
		||||
from .terminate_process_on_failure import terminate_process_on_failure
 | 
			
		||||
@@ -1,27 +0,0 @@
 | 
			
		||||
from collections import deque
 | 
			
		||||
from threading import Lock, Condition
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Deque:
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self._queue = deque()
 | 
			
		||||
 | 
			
		||||
        self._mutex = Lock()
 | 
			
		||||
        self._not_empty = Condition(self._mutex)
 | 
			
		||||
 | 
			
		||||
    def pop(self):
 | 
			
		||||
        with self._mutex:
 | 
			
		||||
            while not self._queue:
 | 
			
		||||
                self._not_empty.wait()
 | 
			
		||||
            return self._queue.pop()
 | 
			
		||||
 | 
			
		||||
    def push(self, item):
 | 
			
		||||
        self._push(item, self._queue.appendleft)
 | 
			
		||||
 | 
			
		||||
    def push_front(self, item):
 | 
			
		||||
        self._push(item, self._queue.append)
 | 
			
		||||
 | 
			
		||||
    def _push(self, item, put_method):
 | 
			
		||||
        with self._mutex:
 | 
			
		||||
            put_method(item)
 | 
			
		||||
            self._not_empty.notify()
 | 
			
		||||
@@ -1,18 +0,0 @@
 | 
			
		||||
from os import mkfifo, remove, chmod
 | 
			
		||||
from os.path import exists
 | 
			
		||||
 | 
			
		||||
DEFAULT_PERMISSIONS = 0o600
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Pipe:
 | 
			
		||||
    def __init__(self, path):
 | 
			
		||||
        self.path = path
 | 
			
		||||
 | 
			
		||||
    def recreate(self, permissions):
 | 
			
		||||
        self.remove()
 | 
			
		||||
        mkfifo(self.path)
 | 
			
		||||
        chmod(self.path, permissions)  # use chmod to ignore umask
 | 
			
		||||
 | 
			
		||||
    def remove(self):
 | 
			
		||||
        if exists(self.path):
 | 
			
		||||
            remove(self.path)
 | 
			
		||||
@@ -1,25 +0,0 @@
 | 
			
		||||
from .pipe import DEFAULT_PERMISSIONS
 | 
			
		||||
from .pipe_reader_server import PipeReaderServer
 | 
			
		||||
from .pipe_writer_server import PipeWriterServer
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipeIOServer(PipeReaderServer, PipeWriterServer):
 | 
			
		||||
    # pylint: disable=abstract-method
 | 
			
		||||
    def __init__(
 | 
			
		||||
            self,
 | 
			
		||||
            in_pipe,
 | 
			
		||||
            out_pipe,
 | 
			
		||||
            permissions=DEFAULT_PERMISSIONS,
 | 
			
		||||
            manage_pipes=True
 | 
			
		||||
    ):
 | 
			
		||||
        super().__init__(
 | 
			
		||||
            in_pipe=in_pipe,
 | 
			
		||||
            out_pipe=out_pipe,
 | 
			
		||||
            permissions=permissions,
 | 
			
		||||
            manage_pipes=manage_pipes
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def _io_threads(self):
 | 
			
		||||
        # pylint: disable=no-member
 | 
			
		||||
        yield from PipeReaderServer._io_threads(self)
 | 
			
		||||
        yield from PipeWriterServer._io_threads(self)
 | 
			
		||||
@@ -1,45 +0,0 @@
 | 
			
		||||
from threading import Thread, Event
 | 
			
		||||
 | 
			
		||||
from .terminate_process_on_failure import terminate_process_on_failure
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipeIOThread(Thread):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        super().__init__(daemon=True)
 | 
			
		||||
        self._stop_event = Event()
 | 
			
		||||
        self.__io_threads = []
 | 
			
		||||
 | 
			
		||||
    @terminate_process_on_failure
 | 
			
		||||
    def run(self):
 | 
			
		||||
        self.__io_threads.extend(self._io_threads())
 | 
			
		||||
        for thread in self.__io_threads:
 | 
			
		||||
            thread.start()
 | 
			
		||||
        self._stop_event.wait()
 | 
			
		||||
        self._stop_threads()
 | 
			
		||||
 | 
			
		||||
    def _io_threads(self):
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
    def _stop_threads(self):
 | 
			
		||||
        for thread in self.__io_threads:
 | 
			
		||||
            if thread.is_alive():
 | 
			
		||||
                thread.stop()
 | 
			
		||||
        self.on_stop()
 | 
			
		||||
 | 
			
		||||
    def on_stop(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        self._stop_event.set()
 | 
			
		||||
        if self.is_alive():
 | 
			
		||||
            self.join()
 | 
			
		||||
 | 
			
		||||
    def wait(self):
 | 
			
		||||
        self._stop_event.wait()
 | 
			
		||||
 | 
			
		||||
    def __enter__(self):
 | 
			
		||||
        self.start()
 | 
			
		||||
        return self
 | 
			
		||||
 | 
			
		||||
    def __exit__(self, type_, value, tb):
 | 
			
		||||
        self.stop()
 | 
			
		||||
@@ -1,39 +0,0 @@
 | 
			
		||||
from .pipe import Pipe, DEFAULT_PERMISSIONS
 | 
			
		||||
from .pipe_io_thread import PipeIOThread
 | 
			
		||||
from .pipe_reader_thread import PipeReaderThread
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipeReaderServer(PipeIOThread):
 | 
			
		||||
    def __init__(
 | 
			
		||||
            self,
 | 
			
		||||
            in_pipe,
 | 
			
		||||
            permissions=DEFAULT_PERMISSIONS,
 | 
			
		||||
            manage_pipes=True,
 | 
			
		||||
            **kwargs
 | 
			
		||||
    ):
 | 
			
		||||
        super().__init__(**kwargs)
 | 
			
		||||
        self._reader_thread = None
 | 
			
		||||
        self._manage_pipes = manage_pipes
 | 
			
		||||
        self._in_pipe = in_pipe
 | 
			
		||||
        if self._manage_pipes:
 | 
			
		||||
            Pipe(self.in_pipe).recreate(permissions)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def in_pipe(self):
 | 
			
		||||
        return self._in_pipe
 | 
			
		||||
 | 
			
		||||
    def _io_threads(self):
 | 
			
		||||
        self._reader_thread = PipeReaderThread(
 | 
			
		||||
            self.in_pipe,
 | 
			
		||||
            self._stop_event,
 | 
			
		||||
            self.handle_message
 | 
			
		||||
        )
 | 
			
		||||
        yield self._reader_thread
 | 
			
		||||
 | 
			
		||||
    def handle_message(self, message):
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        super().stop()
 | 
			
		||||
        if self._manage_pipes:
 | 
			
		||||
            Pipe(self.in_pipe).remove()
 | 
			
		||||
@@ -1,44 +0,0 @@
 | 
			
		||||
from contextlib import suppress
 | 
			
		||||
from os import open as osopen
 | 
			
		||||
from os import write, close, O_WRONLY, O_NONBLOCK
 | 
			
		||||
from threading import Thread
 | 
			
		||||
 | 
			
		||||
from .terminate_process_on_failure import terminate_process_on_failure
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipeReaderThread(Thread):
 | 
			
		||||
    eof = b''
 | 
			
		||||
    stop_sequence = b'stop_reading\n'
 | 
			
		||||
 | 
			
		||||
    def __init__(self, pipe_path, stop_event, message_handler):
 | 
			
		||||
        super().__init__(daemon=True)
 | 
			
		||||
        self._message_handler = message_handler
 | 
			
		||||
        self._pipe_path = pipe_path
 | 
			
		||||
        self._stop_event = stop_event
 | 
			
		||||
 | 
			
		||||
    @terminate_process_on_failure
 | 
			
		||||
    def run(self):
 | 
			
		||||
        with self._open() as pipe:
 | 
			
		||||
            while True:
 | 
			
		||||
                message = pipe.readline()
 | 
			
		||||
                if message == self.stop_sequence:
 | 
			
		||||
                    self._stop_event.set()
 | 
			
		||||
                    break
 | 
			
		||||
                if message == self.eof:
 | 
			
		||||
                    self._open().close()
 | 
			
		||||
                    continue
 | 
			
		||||
                self._message_handler(message[:-1])
 | 
			
		||||
 | 
			
		||||
    def _open(self):
 | 
			
		||||
        return open(self._pipe_path, 'rb')
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        while self.is_alive():
 | 
			
		||||
            self._unblock()
 | 
			
		||||
        self.join()
 | 
			
		||||
 | 
			
		||||
    def _unblock(self):
 | 
			
		||||
        with suppress(OSError):
 | 
			
		||||
            fd = osopen(self._pipe_path, O_WRONLY | O_NONBLOCK)
 | 
			
		||||
            write(fd, self.stop_sequence)
 | 
			
		||||
            close(fd)
 | 
			
		||||
@@ -1,38 +0,0 @@
 | 
			
		||||
from .pipe import Pipe, DEFAULT_PERMISSIONS
 | 
			
		||||
from .pipe_io_thread import PipeIOThread
 | 
			
		||||
from .pipe_writer_thread import PipeWriterThread
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipeWriterServer(PipeIOThread):
 | 
			
		||||
    def __init__(
 | 
			
		||||
            self,
 | 
			
		||||
            out_pipe,
 | 
			
		||||
            permissions=DEFAULT_PERMISSIONS,
 | 
			
		||||
            manage_pipes=True,
 | 
			
		||||
            **kwargs
 | 
			
		||||
    ):
 | 
			
		||||
        super().__init__(**kwargs)
 | 
			
		||||
        self._writer_thread = None
 | 
			
		||||
        self._manage_pipes = manage_pipes
 | 
			
		||||
        self._out_pipe = out_pipe
 | 
			
		||||
        if self._manage_pipes:
 | 
			
		||||
            Pipe(self.out_pipe).recreate(permissions)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def out_pipe(self):
 | 
			
		||||
        return self._out_pipe
 | 
			
		||||
 | 
			
		||||
    def _io_threads(self):
 | 
			
		||||
        self._writer_thread = PipeWriterThread(
 | 
			
		||||
            self.out_pipe,
 | 
			
		||||
            self._stop_event
 | 
			
		||||
        )
 | 
			
		||||
        yield self._writer_thread
 | 
			
		||||
 | 
			
		||||
    def send_message(self, message):
 | 
			
		||||
        self._writer_thread.write(message)
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        super().stop()
 | 
			
		||||
        if self._manage_pipes:
 | 
			
		||||
            Pipe(self.out_pipe).remove()
 | 
			
		||||
@@ -1,50 +0,0 @@
 | 
			
		||||
from contextlib import suppress
 | 
			
		||||
from os import O_NONBLOCK, O_RDONLY, close
 | 
			
		||||
from os import open as osopen
 | 
			
		||||
from threading import Thread
 | 
			
		||||
 | 
			
		||||
from .terminate_process_on_failure import terminate_process_on_failure
 | 
			
		||||
from .deque import Deque
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipeWriterThread(Thread):
 | 
			
		||||
    def __init__(self, pipe_path, stop_event):
 | 
			
		||||
        super().__init__(daemon=True)
 | 
			
		||||
        self._pipe_path = pipe_path
 | 
			
		||||
        self._stop_event = stop_event
 | 
			
		||||
        self._write_queue = Deque()
 | 
			
		||||
 | 
			
		||||
    def write(self, message):
 | 
			
		||||
        self._write_queue.push(message)
 | 
			
		||||
 | 
			
		||||
    @terminate_process_on_failure
 | 
			
		||||
    def run(self):
 | 
			
		||||
        with self._open() as pipe:
 | 
			
		||||
            while True:
 | 
			
		||||
                message = self._write_queue.pop()
 | 
			
		||||
                if message is None:
 | 
			
		||||
                    self._stop_event.set()
 | 
			
		||||
                    break
 | 
			
		||||
                try:
 | 
			
		||||
                    pipe.write(message + b'\n')
 | 
			
		||||
                    pipe.flush()
 | 
			
		||||
                except BrokenPipeError:
 | 
			
		||||
                    try:  # pipe was reopened, close() flushed the message
 | 
			
		||||
                        pipe.close()
 | 
			
		||||
                    except BrokenPipeError:  # close() discarded the message
 | 
			
		||||
                        self._write_queue.push_front(message)
 | 
			
		||||
                    pipe = self._open()
 | 
			
		||||
 | 
			
		||||
    def _open(self):
 | 
			
		||||
        return open(self._pipe_path, 'wb')
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        while self.is_alive():
 | 
			
		||||
            self._unblock()
 | 
			
		||||
        self.join()
 | 
			
		||||
 | 
			
		||||
    def _unblock(self):
 | 
			
		||||
        with suppress(OSError):
 | 
			
		||||
            fd = osopen(self._pipe_path, O_RDONLY | O_NONBLOCK)
 | 
			
		||||
            self._write_queue.push_front(None)
 | 
			
		||||
            close(fd)
 | 
			
		||||
@@ -1,15 +0,0 @@
 | 
			
		||||
from functools import wraps
 | 
			
		||||
from os import kill, getpid
 | 
			
		||||
from signal import SIGTERM
 | 
			
		||||
from traceback import print_exc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def terminate_process_on_failure(fun):
 | 
			
		||||
    @wraps(fun)
 | 
			
		||||
    def wrapper(*args, **kwargs):
 | 
			
		||||
        try:
 | 
			
		||||
            return fun(*args, **kwargs)
 | 
			
		||||
        except:  # pylint: disable=bare-except
 | 
			
		||||
            print_exc()
 | 
			
		||||
            kill(getpid(), SIGTERM)
 | 
			
		||||
    return wrapper
 | 
			
		||||
@@ -1,187 +0,0 @@
 | 
			
		||||
# pylint: disable=redefined-outer-name
 | 
			
		||||
from os import stat, urandom
 | 
			
		||||
from os.path import exists, dirname, realpath, join
 | 
			
		||||
from stat import S_ISFIFO
 | 
			
		||||
from secrets import token_urlsafe
 | 
			
		||||
from random import randint, getrandbits, uniform
 | 
			
		||||
from threading import Thread
 | 
			
		||||
from json import dumps, loads
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from .pipe_io_server import PipeIOServer
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class EchoPipeIOServer(PipeIOServer):
 | 
			
		||||
    def handle_message(self, message):
 | 
			
		||||
        self.send_message(message)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def io_pipes():
 | 
			
		||||
    with EchoPipeIOServer(*get_test_init_params()) as pipe_io:
 | 
			
		||||
        with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes:
 | 
			
		||||
            yield io_pipes
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_test_init_params():
 | 
			
		||||
    here = dirname(realpath(__file__))
 | 
			
		||||
    return join(here, 'in_pipe_tests'), join(here, 'out_pipe_tests')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def raise_if_thread_blocks(*, target, unblock_function):
 | 
			
		||||
    thread = Thread(target=target)
 | 
			
		||||
    thread.start()
 | 
			
		||||
    unblock_function()
 | 
			
		||||
    thread.join(timeout=1)
 | 
			
		||||
    if thread.is_alive():
 | 
			
		||||
        raise RuntimeError('PipeIOServer failed to shut down!')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IOPipes:
 | 
			
		||||
    def __init__(self, in_pipe_path, out_pipe_path):
 | 
			
		||||
        self.in_pipe_path = in_pipe_path
 | 
			
		||||
        self.out_pipe_path = out_pipe_path
 | 
			
		||||
 | 
			
		||||
    def __enter__(self):
 | 
			
		||||
        # pylint: disable=attribute-defined-outside-init
 | 
			
		||||
        self.in_pipe = open(self.in_pipe_path, 'wb')
 | 
			
		||||
        self.out_pipe = open(self.out_pipe_path, 'rb')
 | 
			
		||||
        return self
 | 
			
		||||
 | 
			
		||||
    def __exit__(self, type_, value, traceback):
 | 
			
		||||
        self.close()
 | 
			
		||||
 | 
			
		||||
    def close(self):
 | 
			
		||||
        self.in_pipe.close()
 | 
			
		||||
        self.out_pipe.close()
 | 
			
		||||
 | 
			
		||||
    def send_message(self, message):
 | 
			
		||||
        self.in_pipe.write(message + b'\n')
 | 
			
		||||
        self.in_pipe.flush()
 | 
			
		||||
 | 
			
		||||
    def recv(self):
 | 
			
		||||
        return self.out_pipe.readline().rstrip(b'\n')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def pipes_exist(*paths):
 | 
			
		||||
    predicate = lambda path: exists(path) and S_ISFIFO(stat(path).st_mode)
 | 
			
		||||
    return all(predicate(path) for path in paths)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_manage_pipes():
 | 
			
		||||
    pipe_io = PipeIOServer(*get_test_init_params(), manage_pipes=True)
 | 
			
		||||
    assert pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe)
 | 
			
		||||
    pipe_io.stop()
 | 
			
		||||
    assert not pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_no_manage_pipes():
 | 
			
		||||
    pipe_io = PipeIOServer(*get_test_init_params(), manage_pipes=False)
 | 
			
		||||
    assert not pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe)
 | 
			
		||||
    pipe_io.stop()
 | 
			
		||||
    assert not pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_stop():
 | 
			
		||||
    pipe_io = EchoPipeIOServer(*get_test_init_params())
 | 
			
		||||
    pipe_io.start()
 | 
			
		||||
    raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop)
 | 
			
		||||
 | 
			
		||||
    pipe_io = EchoPipeIOServer(*get_test_init_params())
 | 
			
		||||
    pipe_io.start()
 | 
			
		||||
    with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as iopipes:
 | 
			
		||||
        raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop)
 | 
			
		||||
 | 
			
		||||
    pipe_io = EchoPipeIOServer(*get_test_init_params())
 | 
			
		||||
    pipe_io.start()
 | 
			
		||||
    with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as iopipes:
 | 
			
		||||
        test_message = token_urlsafe(randint(128, 256))
 | 
			
		||||
        iopipes.send_message(test_message.encode())
 | 
			
		||||
        assert test_message == iopipes.recv().decode()
 | 
			
		||||
        iopipes.send_message(test_message.encode())
 | 
			
		||||
        raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'test_data', [
 | 
			
		||||
        'Cats and cheese',
 | 
			
		||||
        'You ever wonder why we are here?',
 | 
			
		||||
        'Lorem ipsum dolor sit amet',
 | 
			
		||||
        'You always have a plan, Dutch!',
 | 
			
		||||
    ]
 | 
			
		||||
)
 | 
			
		||||
def test_io(io_pipes, test_data):
 | 
			
		||||
    io_pipes.send_message(test_data.encode())
 | 
			
		||||
    assert io_pipes.recv().decode() == test_data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_io_random(io_pipes):
 | 
			
		||||
    test_data = token_urlsafe(512)
 | 
			
		||||
    for _ in range(100):
 | 
			
		||||
        io_pipes.send_message(test_data.encode())
 | 
			
		||||
        assert io_pipes.recv().decode() == test_data
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'test_data_size', [
 | 
			
		||||
        1024,
 | 
			
		||||
        1024*1024,
 | 
			
		||||
        2*1024*1024,
 | 
			
		||||
        4*1024*1024,
 | 
			
		||||
        8*1024*1024,
 | 
			
		||||
        16*1024*1024,
 | 
			
		||||
        32*1024*1024
 | 
			
		||||
    ]
 | 
			
		||||
)
 | 
			
		||||
def test_io_large_data(io_pipes, test_data_size):
 | 
			
		||||
    test_data = urandom(test_data_size).replace(b'\n', b'')
 | 
			
		||||
    io_pipes.send_message(test_data)
 | 
			
		||||
    received_data = io_pipes.recv()
 | 
			
		||||
    assert received_data == test_data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_io_stress(io_pipes):
 | 
			
		||||
    for _ in range(2222):
 | 
			
		||||
        test_data = urandom(randint(1, 1024)).replace(b'\n', b'')
 | 
			
		||||
        io_pipes.send_message(test_data)
 | 
			
		||||
        assert io_pipes.recv() == test_data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_io_newlines(io_pipes):
 | 
			
		||||
    times = randint(1, 512)
 | 
			
		||||
    io_pipes.send_message(b'\n' * times)
 | 
			
		||||
    for _ in range(times + 1):  # IOPipes.send appends +1
 | 
			
		||||
        assert io_pipes.recv() == b''
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_json_io(io_pipes):
 | 
			
		||||
    for _ in range(10):
 | 
			
		||||
        test_data = {
 | 
			
		||||
            f'{token_urlsafe(8)}': randint(1, 2 ** 20),
 | 
			
		||||
            f'{token_urlsafe(9)}': [randint(1, 2 **10) for i in range(10)],
 | 
			
		||||
            f'{token_urlsafe(4)}': f'{token_urlsafe(8)}\\\n{token_urlsafe(8)}\n{randint(1, 2 ** 10)}',
 | 
			
		||||
            f'{token_urlsafe(11)}': {
 | 
			
		||||
                f'{token_urlsafe(8)}': '',
 | 
			
		||||
                f'{token_urlsafe(3)}': f'{token_urlsafe(8)}\n{token_urlsafe(8)}\n\\n{token_urlsafe(8)}',
 | 
			
		||||
                f'{token_urlsafe(44)}': f'{token_urlsafe(8)}\n{token_urlsafe(8)}  {token_urlsafe(8)}',
 | 
			
		||||
                f'{token_urlsafe(6)}\n{token_urlsafe(4)}': bool(getrandbits(1)),
 | 
			
		||||
                f'{token_urlsafe(8)}': None,
 | 
			
		||||
                f'{token_urlsafe(21)}  {token_urlsafe(4)}': None,
 | 
			
		||||
                f'{token_urlsafe(3)}': uniform(randint(1, 100), randint(1, 100)),
 | 
			
		||||
                f'{token_urlsafe(8)}': [token_urlsafe(4) for i in range(10)],
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        io_pipes.send_message(dumps(test_data).encode())
 | 
			
		||||
        assert loads(io_pipes.recv()) == test_data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_assign_message_handler():
 | 
			
		||||
    pipe_io = PipeIOServer(*get_test_init_params())
 | 
			
		||||
    pipe_io.handle_message = lambda msg: pipe_io.send_message(msg * 2)
 | 
			
		||||
    pipe_io.start()
 | 
			
		||||
    with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes:
 | 
			
		||||
        for _ in range(100):
 | 
			
		||||
            test_data = token_urlsafe(32).encode()
 | 
			
		||||
            io_pipes.send_message(test_data)
 | 
			
		||||
            assert io_pipes.recv() == test_data * 2
 | 
			
		||||
    pipe_io.stop()
 | 
			
		||||
		Reference in New Issue
	
	Block a user