From 5b0f79dbaeba33a7b5ac988c29f26f3a1c278af0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Thu, 5 Sep 2019 16:10:46 +0200 Subject: [PATCH] Move CommandHandler to a separate file --- tfw/components/pipe_io/__init__.py | 3 +- tfw/components/pipe_io/command_handler.py | 58 +++++++++++++++++++++++ tfw/components/pipe_io/pipe_io_handler.py | 56 +--------------------- 3 files changed, 61 insertions(+), 56 deletions(-) create mode 100644 tfw/components/pipe_io/command_handler.py diff --git a/tfw/components/pipe_io/__init__.py b/tfw/components/pipe_io/__init__.py index 2160960..6fc4fc0 100644 --- a/tfw/components/pipe_io/__init__.py +++ b/tfw/components/pipe_io/__init__.py @@ -1,2 +1,3 @@ -from .pipe_io_handler import PipeIOHandler, PipeIOHandlerBase, CommandHandler +from .pipe_io_handler import PipeIOHandler, PipeIOHandlerBase +from .command_handler import CommandHandler from .pipe_connector import ProxyPipeConnectorHandler diff --git a/tfw/components/pipe_io/command_handler.py b/tfw/components/pipe_io/command_handler.py new file mode 100644 index 0000000..e8d1593 --- /dev/null +++ b/tfw/components/pipe_io/command_handler.py @@ -0,0 +1,58 @@ +from subprocess import PIPE, Popen +from functools import partial +from os import getpgid, killpg +from os.path import join +from signal import SIGTERM +from secrets import token_urlsafe +from threading import Thread +from contextlib import suppress + +from pipe_io_server import terminate_process_on_failure + +from .pipe_io_handler import PipeIOHandler, DEFAULT_PERMISSIONS + + +class CommandHandler(PipeIOHandler): + def __init__(self, command, permissions=DEFAULT_PERMISSIONS): + super().__init__( + self._generate_tempfilename(), + self._generate_tempfilename(), + permissions + ) + + self._proc_stdin = open(self.pipe_io.out_pipe, 'rb') + self._proc_stdout = open(self.pipe_io.in_pipe, 'wb') + self._proc = Popen( + command, shell=True, executable='/bin/bash', + stdin=self._proc_stdin, stdout=self._proc_stdout, stderr=PIPE, + start_new_session=True + ) + self._monitor_proc_thread = self._start_monitor_proc() + + def _generate_tempfilename(self): + # pylint: disable=no-self-use + random_filename = partial(token_urlsafe, 10) + return join('/tmp', f'{type(self).__name__}.{random_filename()}') + + def _start_monitor_proc(self): + thread = Thread(target=self._monitor_proc, daemon=True) + thread.start() + return thread + + @terminate_process_on_failure + def _monitor_proc(self): + return_code = self._proc.wait() + if return_code == -int(SIGTERM): + # supervisord asked the program to terminate, this is fine + return + if return_code != 0: + _, stderr = self._proc.communicate() + raise RuntimeError(f'Subprocess failed ({return_code})! Stderr:\n{stderr.decode()}') + + def cleanup(self): + with suppress(ProcessLookupError): + process_group_id = getpgid(self._proc.pid) + killpg(process_group_id, SIGTERM) + self._proc_stdin.close() + self._proc_stdout.close() + super().cleanup() diff --git a/tfw/components/pipe_io/pipe_io_handler.py b/tfw/components/pipe_io/pipe_io_handler.py index 50201ac..bea846a 100644 --- a/tfw/components/pipe_io/pipe_io_handler.py +++ b/tfw/components/pipe_io/pipe_io_handler.py @@ -1,16 +1,8 @@ import logging from abc import abstractmethod from json import loads, dumps -from subprocess import PIPE, Popen -from functools import partial -from os import getpgid, killpg -from os.path import join -from signal import SIGTERM -from secrets import token_urlsafe -from threading import Thread -from contextlib import suppress -from pipe_io_server import PipeIOServer, terminate_process_on_failure +from pipe_io_server import PipeIOServer LOG = logging.getLogger(__name__) DEFAULT_PERMISSIONS = 0o600 @@ -53,49 +45,3 @@ class PipeIOHandler(PipeIOHandlerBase): def handle_pipe_event(self, message_bytes): json = loads(message_bytes) self.connector.send_message(json) - - -class CommandHandler(PipeIOHandler): - def __init__(self, command, permissions=DEFAULT_PERMISSIONS): - super().__init__( - self._generate_tempfilename(), - self._generate_tempfilename(), - permissions - ) - - self._proc_stdin = open(self.pipe_io.out_pipe, 'rb') - self._proc_stdout = open(self.pipe_io.in_pipe, 'wb') - self._proc = Popen( - command, shell=True, executable='/bin/bash', - stdin=self._proc_stdin, stdout=self._proc_stdout, stderr=PIPE, - start_new_session=True - ) - self._monitor_proc_thread = self._start_monitor_proc() - - def _generate_tempfilename(self): - # pylint: disable=no-self-use - random_filename = partial(token_urlsafe, 10) - return join('/tmp', f'{type(self).__name__}.{random_filename()}') - - def _start_monitor_proc(self): - thread = Thread(target=self._monitor_proc, daemon=True) - thread.start() - return thread - - @terminate_process_on_failure - def _monitor_proc(self): - return_code = self._proc.wait() - if return_code == -int(SIGTERM): - # supervisord asked the program to terminate, this is fine - return - if return_code != 0: - _, stderr = self._proc.communicate() - raise RuntimeError(f'Subprocess failed ({return_code})! Stderr:\n{stderr.decode()}') - - def cleanup(self): - with suppress(ProcessLookupError): - process_group_id = getpgid(self._proc.pid) - killpg(process_group_id, SIGTERM) - self._proc_stdin.close() - self._proc_stdout.close() - super().cleanup()