import logging from abc import abstractmethod from json import loads, dumps from subprocess import run, PIPE, Popen from functools import partial from os import getpgid, killpg from os.path import join from signal import SIGTERM from secrets import token_urlsafe from threading import Thread from contextlib import suppress from tfw.event_handlers import EventHandlerBase from .pipe_io_server import PipeIOServer, terminate_process_on_failure LOG = logging.getLogger(__name__) DEFAULT_PERMISSIONS = 0o600 class PipeIOEventHandlerBase(EventHandlerBase): def __init__(self, key, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS): super().__init__(key) self.pipe_io = CallbackPipeIOServer( in_pipe_path, out_pipe_path, self.handle_pipe_event, permissions ) self.pipe_io.start() @abstractmethod def handle_pipe_event(self, message_bytes): raise NotImplementedError() def cleanup(self): self.pipe_io.stop() class CallbackPipeIOServer(PipeIOServer): def __init__(self, in_pipe_path, out_pipe_path, callback, permissions): super().__init__(in_pipe_path, out_pipe_path, permissions) self.callback = callback def handle_message(self, message): try: self.callback(message) except: # pylint: disable=bare-except LOG.exception('Failed to handle message %s from pipe %s!', message, self.in_pipe) class PipeIOEventHandler(PipeIOEventHandlerBase): def handle_event(self, message): json_bytes = dumps(message).encode() self.pipe_io.send_message(json_bytes) def handle_pipe_event(self, message_bytes): json = loads(message_bytes) self.send_message(json) class TransformerPipeIOEventHandler(PipeIOEventHandlerBase): # pylint: disable=too-many-arguments def __init__( self, key, in_pipe_path, out_pipe_path, transform_in_cmd, transform_out_cmd, permissions=DEFAULT_PERMISSIONS ): self._transform_in = partial(self._transform_message, transform_in_cmd) self._transform_out = partial(self._transform_message, transform_out_cmd) super().__init__(key, in_pipe_path, out_pipe_path, permissions) @staticmethod def _transform_message(transform_cmd, message): proc = run( transform_cmd, input=message, stdout=PIPE, stderr=PIPE, shell=True ) if proc.returncode == 0: return proc.stdout raise ValueError(f'Transforming message {message} failed!') def handle_event(self, message): json_bytes = dumps(message).encode() transformed_bytes = self._transform_out(json_bytes) if transformed_bytes: self.pipe_io.send_message(transformed_bytes) def handle_pipe_event(self, message_bytes): transformed_bytes = self._transform_in(message_bytes) if transformed_bytes: json_message = loads(transformed_bytes) self.send_message(json_message) class CommandEventHandler(PipeIOEventHandler): def __init__(self, key, command, permissions=DEFAULT_PERMISSIONS): super().__init__( key, self._generate_tempfilename(), self._generate_tempfilename(), permissions ) self._proc_stdin = open(self.pipe_io.out_pipe, 'rb') self._proc_stdout = open(self.pipe_io.in_pipe, 'wb') self._proc = Popen( command, shell=True, executable='/bin/bash', stdin=self._proc_stdin, stdout=self._proc_stdout, stderr=PIPE, start_new_session=True ) self._monitor_proc_thread = self._start_monitor_proc() def _generate_tempfilename(self): # pylint: disable=no-self-use random_filename = partial(token_urlsafe, 10) return join('/tmp', f'{type(self).__name__}.{random_filename()}') def _start_monitor_proc(self): thread = Thread(target=self._monitor_proc, daemon=True) thread.start() return thread @terminate_process_on_failure def _monitor_proc(self): return_code = self._proc.wait() if return_code == -int(SIGTERM): # supervisord asked the program to terminate, this is fine return if return_code != 0: _, stderr = self._proc.communicate() raise RuntimeError(f'Subprocess failed ({return_code})! Stderr:\n{stderr.decode()}') def cleanup(self): with suppress(ProcessLookupError): process_group_id = getpgid(self._proc.pid) killpg(process_group_id, SIGTERM) self._proc_stdin.close() self._proc_stdout.close() super().cleanup()