mirror of
				https://github.com/avatao-content/baseimage-tutorial-framework
				synced 2025-11-04 01:12:55 +00:00 
			
		
		
		
	Remove unused but beautiful product of me being with my muse :(
This commit is contained in:
		@@ -1,2 +1,2 @@
 | 
			
		||||
from .pipe_io_handler import PipeIOHandler, PipeIOHandlerBase, TransformerPipeIOHandler, CommandHandler
 | 
			
		||||
from .pipe_io_handler import PipeIOHandler, PipeIOHandlerBase, CommandHandler
 | 
			
		||||
from .pipe_connector import ProxyPipeConnectorHandler
 | 
			
		||||
 
 | 
			
		||||
@@ -59,43 +59,6 @@ class PipeIOHandler(PipeIOHandlerBase):
 | 
			
		||||
        self.connector.send_message(json)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TransformerPipeIOHandler(PipeIOHandlerBase):
 | 
			
		||||
    # pylint: disable=too-many-arguments
 | 
			
		||||
    def __init__(
 | 
			
		||||
            self, in_pipe_path, out_pipe_path,
 | 
			
		||||
            transform_in_cmd, transform_out_cmd,
 | 
			
		||||
            permissions=DEFAULT_PERMISSIONS
 | 
			
		||||
    ):
 | 
			
		||||
        self._transform_in = partial(self._transform_message, transform_in_cmd)
 | 
			
		||||
        self._transform_out = partial(self._transform_message, transform_out_cmd)
 | 
			
		||||
        super().__init__(in_pipe_path, out_pipe_path, permissions)
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _transform_message(transform_cmd, message):
 | 
			
		||||
        proc = run(
 | 
			
		||||
            transform_cmd,
 | 
			
		||||
            input=message,
 | 
			
		||||
            stdout=PIPE,
 | 
			
		||||
            stderr=PIPE,
 | 
			
		||||
            shell=True
 | 
			
		||||
        )
 | 
			
		||||
        if proc.returncode == 0:
 | 
			
		||||
            return proc.stdout
 | 
			
		||||
        raise ValueError(f'Transforming message {message} failed!')
 | 
			
		||||
 | 
			
		||||
    def handle_event(self, message, _):
 | 
			
		||||
        json_bytes = dumps(message).encode()
 | 
			
		||||
        transformed_bytes = self._transform_out(json_bytes)
 | 
			
		||||
        if transformed_bytes:
 | 
			
		||||
            self.pipe_io.send_message(transformed_bytes)
 | 
			
		||||
 | 
			
		||||
    def handle_pipe_event(self, message_bytes):
 | 
			
		||||
        transformed_bytes = self._transform_in(message_bytes)
 | 
			
		||||
        if transformed_bytes:
 | 
			
		||||
            json_message = loads(transformed_bytes)
 | 
			
		||||
            self.connector.send_message(json_message)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class CommandHandler(PipeIOHandler):
 | 
			
		||||
    def __init__(self, command, permissions=DEFAULT_PERMISSIONS):
 | 
			
		||||
        super().__init__(
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user