from abc import abstractmethod from json import loads, dumps from subprocess import run, PIPE, Popen from functools import partial from os import getpgid, killpg from os.path import join from signal import SIGTERM from secrets import token_urlsafe from threading import Thread from contextlib import suppress from tfw import EventHandlerBase from tfw.config.logs import logging from .pipe_io_server import PipeIOServer, terminate_process_on_failure LOG = logging.getLogger(__name__) DEFAULT_PERMISSIONS = 0o600 class PipeIOEventHandlerBase(EventHandlerBase): def __init__(self, key, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS): super().__init__(key) self.pipe_io = CallbackPipeIOServer( in_pipe_path, out_pipe_path, self.handle_pipe_event, permissions ) self.pipe_io.start() @abstractmethod def handle_pipe_event(self, message_bytes): raise NotImplementedError() def cleanup(self): self.pipe_io.stop() class CallbackPipeIOServer(PipeIOServer): def __init__(self, in_pipe_path, out_pipe_path, callback, permissions): super().__init__(in_pipe_path, out_pipe_path, permissions) self.callback = callback def handle_message(self, message): try: self.callback(message) except: # pylint: disable=bare-except LOG.exception('Failed to handle message %s from pipe %s!', message, self.in_pipe) class PipeIOEventHandler(PipeIOEventHandlerBase): def handle_event(self, message): json_bytes = dumps(message).encode() self.pipe_io.send_message(json_bytes) def handle_pipe_event(self, message_bytes): json = loads(message_bytes) self.server_connector.send(json) class TransformerPipeIOEventHandler(PipeIOEventHandlerBase): # pylint: disable=too-many-arguments def __init__( self, key, in_pipe_path, out_pipe_path, transform_in_cmd, transform_out_cmd, permissions=DEFAULT_PERMISSIONS ): self._transform_in = partial(self._transform_message, transform_in_cmd) self._transform_out = partial(self._transform_message, transform_out_cmd) super().__init__(key, in_pipe_path, out_pipe_path, permissions) @staticmethod def _transform_message(transform_cmd, message): proc = run( transform_cmd, input=message, stdout=PIPE, stderr=PIPE, shell=True ) if proc.returncode == 0: return proc.stdout raise ValueError(f'Transforming message {message} failed!') def handle_event(self, message): json_bytes = dumps(message).encode() transformed_bytes = self._transform_out(json_bytes) if transformed_bytes: self.pipe_io.send_message(transformed_bytes) def handle_pipe_event(self, message_bytes): transformed_bytes = self._transform_in(message_bytes) if transformed_bytes: json_message = loads(transformed_bytes) self.server_connector.send(json_message) class CommandEventHandler(PipeIOEventHandler): def __init__(self, key, command, permissions=DEFAULT_PERMISSIONS): super().__init__( key, self._generate_tempfilename(), self._generate_tempfilename(), permissions ) self._proc_stdin = open(self.pipe_io.out_pipe, 'rb') self._proc_stdout = open(self.pipe_io.in_pipe, 'wb') self._proc = Popen( command, shell=True, executable='/bin/bash', stdin=self._proc_stdin, stdout=self._proc_stdout, stderr=PIPE, start_new_session=True ) self._monitor_proc_thread = self._start_monitor_proc() def _generate_tempfilename(self): # pylint: disable=no-self-use random_filename = partial(token_urlsafe, 10) return join('/tmp', f'{type(self).__name__}.{random_filename()}') def _start_monitor_proc(self): thread = Thread(target=self._monitor_proc, daemon=True) thread.start() return thread @terminate_process_on_failure def _monitor_proc(self): return_code = self._proc.wait() if return_code != 0: _, stderr = self._proc.communicate() raise RuntimeError(f'Subprocess failed: {stderr.decode()}') def cleanup(self): with suppress(ProcessLookupError): process_group_id = getpgid(self._proc.pid) killpg(process_group_id, SIGTERM) self._proc_stdin.close() self._proc_stdout.close() super().cleanup()