from subprocess import PIPE, Popen from functools import partial from os import getpgid, killpg from os.path import join from signal import SIGTERM from secrets import token_urlsafe from threading import Thread from contextlib import suppress from pipe_io_server import terminate_process_on_failure from .pipe_io_handler import PipeIOHandler, DEFAULT_PERMISSIONS class CommandHandler(PipeIOHandler): def __init__(self, command, permissions=DEFAULT_PERMISSIONS): super().__init__( self._generate_tempfilename(), self._generate_tempfilename(), permissions ) self._proc_stdin = open(self.pipe_io.out_pipe, 'rb') self._proc_stdout = open(self.pipe_io.in_pipe, 'wb') self._proc = Popen( command, shell=True, executable='/bin/bash', stdin=self._proc_stdin, stdout=self._proc_stdout, stderr=PIPE, start_new_session=True ) self._monitor_proc_thread = self._start_monitor_proc() def _generate_tempfilename(self): # pylint: disable=no-self-use random_filename = partial(token_urlsafe, 10) return join('/tmp', f'{type(self).__name__}.{random_filename()}') def _start_monitor_proc(self): thread = Thread(target=self._monitor_proc, daemon=True) thread.start() return thread @terminate_process_on_failure def _monitor_proc(self): return_code = self._proc.wait() if return_code == -int(SIGTERM): # supervisord asked the program to terminate, this is fine return if return_code != 0: _, stderr = self._proc.communicate() raise RuntimeError(f'Subprocess failed ({return_code})! Stderr:\n{stderr.decode()}') def cleanup(self): with suppress(ProcessLookupError): process_group_id = getpgid(self._proc.pid) killpg(process_group_id, SIGTERM) self._proc_stdin.close() self._proc_stdout.close() super().cleanup()