mirror of
				https://github.com/avatao-content/baseimage-tutorial-framework
				synced 2025-11-04 01:22:55 +00:00 
			
		
		
		
	Update PipeIOServer dependency
This commit is contained in:
		@@ -1,2 +1,4 @@
 | 
				
			|||||||
from .pipe_io_server import PipeIOServer
 | 
					from .pipe_io_server import PipeIOServer
 | 
				
			||||||
 | 
					from .pipe_reader_server import PipeReaderServer
 | 
				
			||||||
 | 
					from .pipe_writer_server import PipeWriterServer
 | 
				
			||||||
from .terminate_process_on_failure import terminate_process_on_failure
 | 
					from .terminate_process_on_failure import terminate_process_on_failure
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,6 +1,8 @@
 | 
				
			|||||||
from os import mkfifo, remove, chmod
 | 
					from os import mkfifo, remove, chmod
 | 
				
			||||||
from os.path import exists
 | 
					from os.path import exists
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					DEFAULT_PERMISSIONS = 0o600
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Pipe:
 | 
					class Pipe:
 | 
				
			||||||
    def __init__(self, path):
 | 
					    def __init__(self, path):
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,73 +1,25 @@
 | 
				
			|||||||
from abc import ABC, abstractmethod
 | 
					from .pipe import DEFAULT_PERMISSIONS
 | 
				
			||||||
from threading import Thread, Event
 | 
					from .pipe_reader_server import PipeReaderServer
 | 
				
			||||||
from typing import Callable
 | 
					from .pipe_writer_server import PipeWriterServer
 | 
				
			||||||
 | 
					 | 
				
			||||||
from .pipe_reader_thread import PipeReaderThread
 | 
					 | 
				
			||||||
from .pipe_writer_thread import PipeWriterThread
 | 
					 | 
				
			||||||
from .pipe import Pipe
 | 
					 | 
				
			||||||
from .terminate_process_on_failure import terminate_process_on_failure
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class PipeIOServer(ABC, Thread):
 | 
					class PipeIOServer(PipeReaderServer, PipeWriterServer):
 | 
				
			||||||
    def __init__(self, in_pipe=None, out_pipe=None, permissions=0o600):
 | 
					    # pylint: disable=abstract-method
 | 
				
			||||||
        super().__init__(daemon=True)
 | 
					    def __init__(
 | 
				
			||||||
        self._in_pipe, self._out_pipe = in_pipe, out_pipe
 | 
					            self,
 | 
				
			||||||
        self._create_pipes(permissions)
 | 
					            in_pipe,
 | 
				
			||||||
        self._stop_event = Event()
 | 
					            out_pipe,
 | 
				
			||||||
        self._reader_thread, self._writer_thread = self._create_io_threads()
 | 
					            permissions=DEFAULT_PERMISSIONS,
 | 
				
			||||||
        self._io_threads = (self._reader_thread, self._writer_thread)
 | 
					            manage_pipes=True
 | 
				
			||||||
        self._on_stop = lambda: None
 | 
					    ):
 | 
				
			||||||
 | 
					        super().__init__(
 | 
				
			||||||
 | 
					            in_pipe=in_pipe,
 | 
				
			||||||
 | 
					            out_pipe=out_pipe,
 | 
				
			||||||
 | 
					            permissions=permissions,
 | 
				
			||||||
 | 
					            manage_pipes=manage_pipes
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _create_pipes(self, permissions):
 | 
					    def _io_threads(self):
 | 
				
			||||||
        Pipe(self.in_pipe).recreate(permissions)
 | 
					        # pylint: disable=no-member
 | 
				
			||||||
        Pipe(self.out_pipe).recreate(permissions)
 | 
					        yield from PipeReaderServer._io_threads(self)
 | 
				
			||||||
 | 
					        yield from PipeWriterServer._io_threads(self)
 | 
				
			||||||
    @property
 | 
					 | 
				
			||||||
    def in_pipe(self):
 | 
					 | 
				
			||||||
        return self._in_pipe
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @property
 | 
					 | 
				
			||||||
    def out_pipe(self):
 | 
					 | 
				
			||||||
        return self._out_pipe
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def _create_io_threads(self):
 | 
					 | 
				
			||||||
        reader_thread = PipeReaderThread(self.in_pipe, self._stop_event, self.handle_message)
 | 
					 | 
				
			||||||
        writer_thread = PipeWriterThread(self.out_pipe, self._stop_event)
 | 
					 | 
				
			||||||
        return reader_thread, writer_thread
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @abstractmethod
 | 
					 | 
				
			||||||
    def handle_message(self, message):
 | 
					 | 
				
			||||||
        raise NotImplementedError()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def send_message(self, message):
 | 
					 | 
				
			||||||
        self._writer_thread.write(message)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @terminate_process_on_failure
 | 
					 | 
				
			||||||
    def run(self):
 | 
					 | 
				
			||||||
        for thread in self._io_threads:
 | 
					 | 
				
			||||||
            thread.start()
 | 
					 | 
				
			||||||
        self._stop_event.wait()
 | 
					 | 
				
			||||||
        self._stop_threads()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def stop(self):
 | 
					 | 
				
			||||||
        self._stop_event.set()
 | 
					 | 
				
			||||||
        if self.is_alive():
 | 
					 | 
				
			||||||
            self.join()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def _stop_threads(self):
 | 
					 | 
				
			||||||
        for thread in self._io_threads:
 | 
					 | 
				
			||||||
            if thread.is_alive():
 | 
					 | 
				
			||||||
                thread.stop()
 | 
					 | 
				
			||||||
        Pipe(self.in_pipe).remove()
 | 
					 | 
				
			||||||
        Pipe(self.out_pipe).remove()
 | 
					 | 
				
			||||||
        self._on_stop()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def _set_on_stop(self, value):
 | 
					 | 
				
			||||||
        if not isinstance(value, Callable):
 | 
					 | 
				
			||||||
            raise ValueError("Supplied object is not callable!")
 | 
					 | 
				
			||||||
        self._on_stop = value
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    on_stop = property(fset=_set_on_stop)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def wait(self):
 | 
					 | 
				
			||||||
        self._stop_event.wait()
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										45
									
								
								tfw/components/pipe_io/pipe_io_server/pipe_io_thread.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										45
									
								
								tfw/components/pipe_io/pipe_io_server/pipe_io_thread.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,45 @@
 | 
				
			|||||||
 | 
					from threading import Thread, Event
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from .terminate_process_on_failure import terminate_process_on_failure
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class PipeIOThread(Thread):
 | 
				
			||||||
 | 
					    def __init__(self):
 | 
				
			||||||
 | 
					        super().__init__(daemon=True)
 | 
				
			||||||
 | 
					        self._stop_event = Event()
 | 
				
			||||||
 | 
					        self.__io_threads = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @terminate_process_on_failure
 | 
				
			||||||
 | 
					    def run(self):
 | 
				
			||||||
 | 
					        self.__io_threads.extend(self._io_threads())
 | 
				
			||||||
 | 
					        for thread in self.__io_threads:
 | 
				
			||||||
 | 
					            thread.start()
 | 
				
			||||||
 | 
					        self._stop_event.wait()
 | 
				
			||||||
 | 
					        self._stop_threads()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _io_threads(self):
 | 
				
			||||||
 | 
					        raise NotImplementedError()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _stop_threads(self):
 | 
				
			||||||
 | 
					        for thread in self.__io_threads:
 | 
				
			||||||
 | 
					            if thread.is_alive():
 | 
				
			||||||
 | 
					                thread.stop()
 | 
				
			||||||
 | 
					        self.on_stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def on_stop(self):
 | 
				
			||||||
 | 
					        pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def stop(self):
 | 
				
			||||||
 | 
					        self._stop_event.set()
 | 
				
			||||||
 | 
					        if self.is_alive():
 | 
				
			||||||
 | 
					            self.join()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def wait(self):
 | 
				
			||||||
 | 
					        self._stop_event.wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __enter__(self):
 | 
				
			||||||
 | 
					        self.start()
 | 
				
			||||||
 | 
					        return self
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __exit__(self, type_, value, tb):
 | 
				
			||||||
 | 
					        self.stop()
 | 
				
			||||||
							
								
								
									
										39
									
								
								tfw/components/pipe_io/pipe_io_server/pipe_reader_server.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										39
									
								
								tfw/components/pipe_io/pipe_io_server/pipe_reader_server.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,39 @@
 | 
				
			|||||||
 | 
					from .pipe import Pipe, DEFAULT_PERMISSIONS
 | 
				
			||||||
 | 
					from .pipe_io_thread import PipeIOThread
 | 
				
			||||||
 | 
					from .pipe_reader_thread import PipeReaderThread
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class PipeReaderServer(PipeIOThread):
 | 
				
			||||||
 | 
					    def __init__(
 | 
				
			||||||
 | 
					            self,
 | 
				
			||||||
 | 
					            in_pipe,
 | 
				
			||||||
 | 
					            permissions=DEFAULT_PERMISSIONS,
 | 
				
			||||||
 | 
					            manage_pipes=True,
 | 
				
			||||||
 | 
					            **kwargs
 | 
				
			||||||
 | 
					    ):
 | 
				
			||||||
 | 
					        super().__init__(**kwargs)
 | 
				
			||||||
 | 
					        self._reader_thread = None
 | 
				
			||||||
 | 
					        self._manage_pipes = manage_pipes
 | 
				
			||||||
 | 
					        self._in_pipe = in_pipe
 | 
				
			||||||
 | 
					        if self._manage_pipes:
 | 
				
			||||||
 | 
					            Pipe(self.in_pipe).recreate(permissions)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @property
 | 
				
			||||||
 | 
					    def in_pipe(self):
 | 
				
			||||||
 | 
					        return self._in_pipe
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _io_threads(self):
 | 
				
			||||||
 | 
					        self._reader_thread = PipeReaderThread(
 | 
				
			||||||
 | 
					            self.in_pipe,
 | 
				
			||||||
 | 
					            self._stop_event,
 | 
				
			||||||
 | 
					            self.handle_message
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        yield self._reader_thread
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def handle_message(self, message):
 | 
				
			||||||
 | 
					        raise NotImplementedError()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def stop(self):
 | 
				
			||||||
 | 
					        super().stop()
 | 
				
			||||||
 | 
					        if self._manage_pipes:
 | 
				
			||||||
 | 
					            Pipe(self.in_pipe).remove()
 | 
				
			||||||
							
								
								
									
										38
									
								
								tfw/components/pipe_io/pipe_io_server/pipe_writer_server.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								tfw/components/pipe_io/pipe_io_server/pipe_writer_server.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,38 @@
 | 
				
			|||||||
 | 
					from .pipe import Pipe, DEFAULT_PERMISSIONS
 | 
				
			||||||
 | 
					from .pipe_io_thread import PipeIOThread
 | 
				
			||||||
 | 
					from .pipe_writer_thread import PipeWriterThread
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class PipeWriterServer(PipeIOThread):
 | 
				
			||||||
 | 
					    def __init__(
 | 
				
			||||||
 | 
					            self,
 | 
				
			||||||
 | 
					            out_pipe,
 | 
				
			||||||
 | 
					            permissions=DEFAULT_PERMISSIONS,
 | 
				
			||||||
 | 
					            manage_pipes=True,
 | 
				
			||||||
 | 
					            **kwargs
 | 
				
			||||||
 | 
					    ):
 | 
				
			||||||
 | 
					        super().__init__(**kwargs)
 | 
				
			||||||
 | 
					        self._writer_thread = None
 | 
				
			||||||
 | 
					        self._manage_pipes = manage_pipes
 | 
				
			||||||
 | 
					        self._out_pipe = out_pipe
 | 
				
			||||||
 | 
					        if self._manage_pipes:
 | 
				
			||||||
 | 
					            Pipe(self.out_pipe).recreate(permissions)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @property
 | 
				
			||||||
 | 
					    def out_pipe(self):
 | 
				
			||||||
 | 
					        return self._out_pipe
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _io_threads(self):
 | 
				
			||||||
 | 
					        self._writer_thread = PipeWriterThread(
 | 
				
			||||||
 | 
					            self.out_pipe,
 | 
				
			||||||
 | 
					            self._stop_event
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        yield self._writer_thread
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def send_message(self, message):
 | 
				
			||||||
 | 
					        self._writer_thread.write(message)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def stop(self):
 | 
				
			||||||
 | 
					        super().stop()
 | 
				
			||||||
 | 
					        if self._manage_pipes:
 | 
				
			||||||
 | 
					            Pipe(self.out_pipe).remove()
 | 
				
			||||||
							
								
								
									
										187
									
								
								tfw/components/pipe_io/pipe_io_server/test_pipe_io_server.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										187
									
								
								tfw/components/pipe_io/pipe_io_server/test_pipe_io_server.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,187 @@
 | 
				
			|||||||
 | 
					# pylint: disable=redefined-outer-name
 | 
				
			||||||
 | 
					from os import stat, urandom
 | 
				
			||||||
 | 
					from os.path import exists, dirname, realpath, join
 | 
				
			||||||
 | 
					from stat import S_ISFIFO
 | 
				
			||||||
 | 
					from secrets import token_urlsafe
 | 
				
			||||||
 | 
					from random import randint, getrandbits, uniform
 | 
				
			||||||
 | 
					from threading import Thread
 | 
				
			||||||
 | 
					from json import dumps, loads
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import pytest
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from .pipe_io_server import PipeIOServer
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class EchoPipeIOServer(PipeIOServer):
 | 
				
			||||||
 | 
					    def handle_message(self, message):
 | 
				
			||||||
 | 
					        self.send_message(message)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@pytest.fixture
 | 
				
			||||||
 | 
					def io_pipes():
 | 
				
			||||||
 | 
					    with EchoPipeIOServer(*get_test_init_params()) as pipe_io:
 | 
				
			||||||
 | 
					        with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes:
 | 
				
			||||||
 | 
					            yield io_pipes
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_test_init_params():
 | 
				
			||||||
 | 
					    here = dirname(realpath(__file__))
 | 
				
			||||||
 | 
					    return join(here, 'in_pipe_tests'), join(here, 'out_pipe_tests')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def raise_if_thread_blocks(*, target, unblock_function):
 | 
				
			||||||
 | 
					    thread = Thread(target=target)
 | 
				
			||||||
 | 
					    thread.start()
 | 
				
			||||||
 | 
					    unblock_function()
 | 
				
			||||||
 | 
					    thread.join(timeout=1)
 | 
				
			||||||
 | 
					    if thread.is_alive():
 | 
				
			||||||
 | 
					        raise RuntimeError('PipeIOServer failed to shut down!')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class IOPipes:
 | 
				
			||||||
 | 
					    def __init__(self, in_pipe_path, out_pipe_path):
 | 
				
			||||||
 | 
					        self.in_pipe_path = in_pipe_path
 | 
				
			||||||
 | 
					        self.out_pipe_path = out_pipe_path
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __enter__(self):
 | 
				
			||||||
 | 
					        # pylint: disable=attribute-defined-outside-init
 | 
				
			||||||
 | 
					        self.in_pipe = open(self.in_pipe_path, 'wb')
 | 
				
			||||||
 | 
					        self.out_pipe = open(self.out_pipe_path, 'rb')
 | 
				
			||||||
 | 
					        return self
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __exit__(self, type_, value, traceback):
 | 
				
			||||||
 | 
					        self.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def close(self):
 | 
				
			||||||
 | 
					        self.in_pipe.close()
 | 
				
			||||||
 | 
					        self.out_pipe.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def send_message(self, message):
 | 
				
			||||||
 | 
					        self.in_pipe.write(message + b'\n')
 | 
				
			||||||
 | 
					        self.in_pipe.flush()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def recv(self):
 | 
				
			||||||
 | 
					        return self.out_pipe.readline().rstrip(b'\n')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def pipes_exist(*paths):
 | 
				
			||||||
 | 
					    predicate = lambda path: exists(path) and S_ISFIFO(stat(path).st_mode)
 | 
				
			||||||
 | 
					    return all(predicate(path) for path in paths)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_manage_pipes():
 | 
				
			||||||
 | 
					    pipe_io = PipeIOServer(*get_test_init_params(), manage_pipes=True)
 | 
				
			||||||
 | 
					    assert pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe)
 | 
				
			||||||
 | 
					    pipe_io.stop()
 | 
				
			||||||
 | 
					    assert not pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_no_manage_pipes():
 | 
				
			||||||
 | 
					    pipe_io = PipeIOServer(*get_test_init_params(), manage_pipes=False)
 | 
				
			||||||
 | 
					    assert not pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe)
 | 
				
			||||||
 | 
					    pipe_io.stop()
 | 
				
			||||||
 | 
					    assert not pipes_exist(pipe_io.in_pipe, pipe_io.out_pipe)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_stop():
 | 
				
			||||||
 | 
					    pipe_io = EchoPipeIOServer(*get_test_init_params())
 | 
				
			||||||
 | 
					    pipe_io.start()
 | 
				
			||||||
 | 
					    raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pipe_io = EchoPipeIOServer(*get_test_init_params())
 | 
				
			||||||
 | 
					    pipe_io.start()
 | 
				
			||||||
 | 
					    with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as iopipes:
 | 
				
			||||||
 | 
					        raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pipe_io = EchoPipeIOServer(*get_test_init_params())
 | 
				
			||||||
 | 
					    pipe_io.start()
 | 
				
			||||||
 | 
					    with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as iopipes:
 | 
				
			||||||
 | 
					        test_message = token_urlsafe(randint(128, 256))
 | 
				
			||||||
 | 
					        iopipes.send_message(test_message.encode())
 | 
				
			||||||
 | 
					        assert test_message == iopipes.recv().decode()
 | 
				
			||||||
 | 
					        iopipes.send_message(test_message.encode())
 | 
				
			||||||
 | 
					        raise_if_thread_blocks(target=pipe_io.wait, unblock_function=pipe_io.stop)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@pytest.mark.parametrize(
 | 
				
			||||||
 | 
					    'test_data', [
 | 
				
			||||||
 | 
					        'Cats and cheese',
 | 
				
			||||||
 | 
					        'You ever wonder why we are here?',
 | 
				
			||||||
 | 
					        'Lorem ipsum dolor sit amet',
 | 
				
			||||||
 | 
					        'You always have a plan, Dutch!',
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					def test_io(io_pipes, test_data):
 | 
				
			||||||
 | 
					    io_pipes.send_message(test_data.encode())
 | 
				
			||||||
 | 
					    assert io_pipes.recv().decode() == test_data
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_io_random(io_pipes):
 | 
				
			||||||
 | 
					    test_data = token_urlsafe(512)
 | 
				
			||||||
 | 
					    for _ in range(100):
 | 
				
			||||||
 | 
					        io_pipes.send_message(test_data.encode())
 | 
				
			||||||
 | 
					        assert io_pipes.recv().decode() == test_data
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@pytest.mark.parametrize(
 | 
				
			||||||
 | 
					    'test_data_size', [
 | 
				
			||||||
 | 
					        1024,
 | 
				
			||||||
 | 
					        1024*1024,
 | 
				
			||||||
 | 
					        2*1024*1024,
 | 
				
			||||||
 | 
					        4*1024*1024,
 | 
				
			||||||
 | 
					        8*1024*1024,
 | 
				
			||||||
 | 
					        16*1024*1024,
 | 
				
			||||||
 | 
					        32*1024*1024
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					def test_io_large_data(io_pipes, test_data_size):
 | 
				
			||||||
 | 
					    test_data = urandom(test_data_size).replace(b'\n', b'')
 | 
				
			||||||
 | 
					    io_pipes.send_message(test_data)
 | 
				
			||||||
 | 
					    received_data = io_pipes.recv()
 | 
				
			||||||
 | 
					    assert received_data == test_data
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_io_stress(io_pipes):
 | 
				
			||||||
 | 
					    for _ in range(2222):
 | 
				
			||||||
 | 
					        test_data = urandom(randint(1, 1024)).replace(b'\n', b'')
 | 
				
			||||||
 | 
					        io_pipes.send_message(test_data)
 | 
				
			||||||
 | 
					        assert io_pipes.recv() == test_data
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_io_newlines(io_pipes):
 | 
				
			||||||
 | 
					    times = randint(1, 512)
 | 
				
			||||||
 | 
					    io_pipes.send_message(b'\n' * times)
 | 
				
			||||||
 | 
					    for _ in range(times + 1):  # IOPipes.send appends +1
 | 
				
			||||||
 | 
					        assert io_pipes.recv() == b''
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_json_io(io_pipes):
 | 
				
			||||||
 | 
					    for _ in range(10):
 | 
				
			||||||
 | 
					        test_data = {
 | 
				
			||||||
 | 
					            f'{token_urlsafe(8)}': randint(1, 2 ** 20),
 | 
				
			||||||
 | 
					            f'{token_urlsafe(9)}': [randint(1, 2 **10) for i in range(10)],
 | 
				
			||||||
 | 
					            f'{token_urlsafe(4)}': f'{token_urlsafe(8)}\\\n{token_urlsafe(8)}\n{randint(1, 2 ** 10)}',
 | 
				
			||||||
 | 
					            f'{token_urlsafe(11)}': {
 | 
				
			||||||
 | 
					                f'{token_urlsafe(8)}': '',
 | 
				
			||||||
 | 
					                f'{token_urlsafe(3)}': f'{token_urlsafe(8)}\n{token_urlsafe(8)}\n\\n{token_urlsafe(8)}',
 | 
				
			||||||
 | 
					                f'{token_urlsafe(44)}': f'{token_urlsafe(8)}\n{token_urlsafe(8)}  {token_urlsafe(8)}',
 | 
				
			||||||
 | 
					                f'{token_urlsafe(6)}\n{token_urlsafe(4)}': bool(getrandbits(1)),
 | 
				
			||||||
 | 
					                f'{token_urlsafe(8)}': None,
 | 
				
			||||||
 | 
					                f'{token_urlsafe(21)}  {token_urlsafe(4)}': None,
 | 
				
			||||||
 | 
					                f'{token_urlsafe(3)}': uniform(randint(1, 100), randint(1, 100)),
 | 
				
			||||||
 | 
					                f'{token_urlsafe(8)}': [token_urlsafe(4) for i in range(10)],
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        io_pipes.send_message(dumps(test_data).encode())
 | 
				
			||||||
 | 
					        assert loads(io_pipes.recv()) == test_data
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_assign_message_handler():
 | 
				
			||||||
 | 
					    pipe_io = PipeIOServer(*get_test_init_params())
 | 
				
			||||||
 | 
					    pipe_io.handle_message = lambda msg: pipe_io.send_message(msg * 2)
 | 
				
			||||||
 | 
					    pipe_io.start()
 | 
				
			||||||
 | 
					    with IOPipes(pipe_io.in_pipe, pipe_io.out_pipe) as io_pipes:
 | 
				
			||||||
 | 
					        for _ in range(100):
 | 
				
			||||||
 | 
					            test_data = token_urlsafe(32).encode()
 | 
				
			||||||
 | 
					            io_pipes.send_message(test_data)
 | 
				
			||||||
 | 
					            assert io_pipes.recv() == test_data * 2
 | 
				
			||||||
 | 
					    pipe_io.stop()
 | 
				
			||||||
		Reference in New Issue
	
	Block a user