import logging from abc import abstractmethod from json import loads, dumps from subprocess import run, PIPE, Popen from functools import partial from os import getpgid, killpg from os.path import join from signal import SIGTERM from secrets import token_urlsafe from threading import Thread from contextlib import suppress from pipe_io_server import PipeIOServer, terminate_process_on_failure LOG = logging.getLogger(__name__) DEFAULT_PERMISSIONS = 0o600 class PipeIOHandlerBase: keys = [''] def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS): self.connector = None self.pipe_io = CallbackPipeIOServer( in_pipe_path, out_pipe_path, self.handle_pipe_event, permissions ) self.pipe_io.start() @abstractmethod def handle_pipe_event(self, message_bytes): raise NotImplementedError() def cleanup(self): self.pipe_io.stop() class CallbackPipeIOServer(PipeIOServer): def __init__(self, in_pipe_path, out_pipe_path, callback, permissions): super().__init__(in_pipe_path, out_pipe_path, permissions) self.callback = callback def handle_message(self, message): try: self.callback(message) except: # pylint: disable=bare-except LOG.exception('Failed to handle message %s from pipe %s!', message, self.in_pipe) class PipeIOHandler(PipeIOHandlerBase): def handle_event(self, message, _): json_bytes = dumps(message).encode() self.pipe_io.send_message(json_bytes) def handle_pipe_event(self, message_bytes): json = loads(message_bytes) self.connector.send_message(json) class TransformerPipeIOHandler(PipeIOHandlerBase): # pylint: disable=too-many-arguments def __init__( self, in_pipe_path, out_pipe_path, transform_in_cmd, transform_out_cmd, permissions=DEFAULT_PERMISSIONS ): self._transform_in = partial(self._transform_message, transform_in_cmd) self._transform_out = partial(self._transform_message, transform_out_cmd) super().__init__(in_pipe_path, out_pipe_path, permissions) @staticmethod def _transform_message(transform_cmd, message): proc = run( transform_cmd, input=message, stdout=PIPE, stderr=PIPE, shell=True ) if proc.returncode == 0: return proc.stdout raise ValueError(f'Transforming message {message} failed!') def handle_event(self, message, _): json_bytes = dumps(message).encode() transformed_bytes = self._transform_out(json_bytes) if transformed_bytes: self.pipe_io.send_message(transformed_bytes) def handle_pipe_event(self, message_bytes): transformed_bytes = self._transform_in(message_bytes) if transformed_bytes: json_message = loads(transformed_bytes) self.connector.send_message(json_message) class CommandHandler(PipeIOHandler): def __init__(self, command, permissions=DEFAULT_PERMISSIONS): super().__init__( self._generate_tempfilename(), self._generate_tempfilename(), permissions ) self._proc_stdin = open(self.pipe_io.out_pipe, 'rb') self._proc_stdout = open(self.pipe_io.in_pipe, 'wb') self._proc = Popen( command, shell=True, executable='/bin/bash', stdin=self._proc_stdin, stdout=self._proc_stdout, stderr=PIPE, start_new_session=True ) self._monitor_proc_thread = self._start_monitor_proc() def _generate_tempfilename(self): # pylint: disable=no-self-use random_filename = partial(token_urlsafe, 10) return join('/tmp', f'{type(self).__name__}.{random_filename()}') def _start_monitor_proc(self): thread = Thread(target=self._monitor_proc, daemon=True) thread.start() return thread @terminate_process_on_failure def _monitor_proc(self): return_code = self._proc.wait() if return_code == -int(SIGTERM): # supervisord asked the program to terminate, this is fine return if return_code != 0: _, stderr = self._proc.communicate() raise RuntimeError(f'Subprocess failed ({return_code})! Stderr:\n{stderr.decode()}') def cleanup(self): with suppress(ProcessLookupError): process_group_id = getpgid(self._proc.pid) killpg(process_group_id, SIGTERM) self._proc_stdin.close() self._proc_stdout.close() super().cleanup()