Move CommandHandler to a separate file

This commit is contained in:
Kristóf Tóth 2019-09-05 16:10:46 +02:00
parent 8890e67b86
commit 5b0f79dbae
3 changed files with 61 additions and 56 deletions

View File

@ -1,2 +1,3 @@
from .pipe_io_handler import PipeIOHandler, PipeIOHandlerBase, CommandHandler from .pipe_io_handler import PipeIOHandler, PipeIOHandlerBase
from .command_handler import CommandHandler
from .pipe_connector import ProxyPipeConnectorHandler from .pipe_connector import ProxyPipeConnectorHandler

View File

@ -0,0 +1,58 @@
from subprocess import PIPE, Popen
from functools import partial
from os import getpgid, killpg
from os.path import join
from signal import SIGTERM
from secrets import token_urlsafe
from threading import Thread
from contextlib import suppress
from pipe_io_server import terminate_process_on_failure
from .pipe_io_handler import PipeIOHandler, DEFAULT_PERMISSIONS
class CommandHandler(PipeIOHandler):
def __init__(self, command, permissions=DEFAULT_PERMISSIONS):
super().__init__(
self._generate_tempfilename(),
self._generate_tempfilename(),
permissions
)
self._proc_stdin = open(self.pipe_io.out_pipe, 'rb')
self._proc_stdout = open(self.pipe_io.in_pipe, 'wb')
self._proc = Popen(
command, shell=True, executable='/bin/bash',
stdin=self._proc_stdin, stdout=self._proc_stdout, stderr=PIPE,
start_new_session=True
)
self._monitor_proc_thread = self._start_monitor_proc()
def _generate_tempfilename(self):
# pylint: disable=no-self-use
random_filename = partial(token_urlsafe, 10)
return join('/tmp', f'{type(self).__name__}.{random_filename()}')
def _start_monitor_proc(self):
thread = Thread(target=self._monitor_proc, daemon=True)
thread.start()
return thread
@terminate_process_on_failure
def _monitor_proc(self):
return_code = self._proc.wait()
if return_code == -int(SIGTERM):
# supervisord asked the program to terminate, this is fine
return
if return_code != 0:
_, stderr = self._proc.communicate()
raise RuntimeError(f'Subprocess failed ({return_code})! Stderr:\n{stderr.decode()}')
def cleanup(self):
with suppress(ProcessLookupError):
process_group_id = getpgid(self._proc.pid)
killpg(process_group_id, SIGTERM)
self._proc_stdin.close()
self._proc_stdout.close()
super().cleanup()

View File

@ -1,16 +1,8 @@
import logging import logging
from abc import abstractmethod from abc import abstractmethod
from json import loads, dumps from json import loads, dumps
from subprocess import PIPE, Popen
from functools import partial
from os import getpgid, killpg
from os.path import join
from signal import SIGTERM
from secrets import token_urlsafe
from threading import Thread
from contextlib import suppress
from pipe_io_server import PipeIOServer, terminate_process_on_failure from pipe_io_server import PipeIOServer
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
DEFAULT_PERMISSIONS = 0o600 DEFAULT_PERMISSIONS = 0o600
@ -53,49 +45,3 @@ class PipeIOHandler(PipeIOHandlerBase):
def handle_pipe_event(self, message_bytes): def handle_pipe_event(self, message_bytes):
json = loads(message_bytes) json = loads(message_bytes)
self.connector.send_message(json) self.connector.send_message(json)
class CommandHandler(PipeIOHandler):
def __init__(self, command, permissions=DEFAULT_PERMISSIONS):
super().__init__(
self._generate_tempfilename(),
self._generate_tempfilename(),
permissions
)
self._proc_stdin = open(self.pipe_io.out_pipe, 'rb')
self._proc_stdout = open(self.pipe_io.in_pipe, 'wb')
self._proc = Popen(
command, shell=True, executable='/bin/bash',
stdin=self._proc_stdin, stdout=self._proc_stdout, stderr=PIPE,
start_new_session=True
)
self._monitor_proc_thread = self._start_monitor_proc()
def _generate_tempfilename(self):
# pylint: disable=no-self-use
random_filename = partial(token_urlsafe, 10)
return join('/tmp', f'{type(self).__name__}.{random_filename()}')
def _start_monitor_proc(self):
thread = Thread(target=self._monitor_proc, daemon=True)
thread.start()
return thread
@terminate_process_on_failure
def _monitor_proc(self):
return_code = self._proc.wait()
if return_code == -int(SIGTERM):
# supervisord asked the program to terminate, this is fine
return
if return_code != 0:
_, stderr = self._proc.communicate()
raise RuntimeError(f'Subprocess failed ({return_code})! Stderr:\n{stderr.decode()}')
def cleanup(self):
with suppress(ProcessLookupError):
process_group_id = getpgid(self._proc.pid)
killpg(process_group_id, SIGTERM)
self._proc_stdin.close()
self._proc_stdout.close()
super().cleanup()