Merge branch 'pipeio'

This commit is contained in:
Kristóf Tóth 2019-05-13 16:31:57 +02:00
commit f43c463692
12 changed files with 396 additions and 2 deletions

View File

@ -1,6 +1,6 @@
pipeline:
build:
image: eu.gcr.io/avatao-public/docker:el7
image: eu.gcr.io/avatao-challengestore/challenge-builder
volumes:
- /etc/docker:/etc/docker:ro
- /root/.docker:/root/.docker:ro

View File

@ -10,6 +10,7 @@ RUN curl -sL https://deb.nodesource.com/setup_8.x | sudo -E bash -
supervisor \
libzmq5 \
nginx \
jq \
gettext-base &&\
rm -rf /var/lib/apt/lists/* &&\
ln -sf /bin/bash /bin/sh

View File

@ -10,3 +10,5 @@ from .terminal_commands import TerminalCommands
from .log_monitoring_event_handler import LogMonitoringEventHandler
from .fsm_managing_event_handler import FSMManagingEventHandler
from .snapshot_provider import SnapshotProvider
from .pipe_io_event_handler import PipeIOEventHandlerBase, PipeIOEventHandler, PipeIOServer
from .pipe_io_event_handler import TransformerPipeIOEventHandler, CommandEventHandler

View File

@ -0,0 +1,143 @@
from abc import abstractmethod
from json import loads, dumps
from subprocess import run, PIPE, Popen
from functools import partial
from os import getpgid, killpg
from os.path import join
from signal import SIGTERM
from secrets import token_urlsafe
from threading import Thread
from contextlib import suppress
from tfw import EventHandlerBase
from tfw.config.logs import logging
from .pipe_io_server import PipeIOServer, terminate_process_on_failure
LOG = logging.getLogger(__name__)
DEFAULT_PERMISSIONS = 0o600
class PipeIOEventHandlerBase(EventHandlerBase):
def __init__(self, key, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS):
super().__init__(key)
self.pipe_io = CallbackPipeIOServer(
in_pipe_path,
out_pipe_path,
self.handle_pipe_event,
permissions
)
self.pipe_io.start()
@abstractmethod
def handle_pipe_event(self, message_bytes):
raise NotImplementedError()
def cleanup(self):
self.pipe_io.stop()
class CallbackPipeIOServer(PipeIOServer):
def __init__(self, in_pipe_path, out_pipe_path, callback, permissions):
super().__init__(in_pipe_path, out_pipe_path, permissions)
self.callback = callback
def handle_message(self, message):
try:
self.callback(message)
except: # pylint: disable=bare-except
LOG.exception('Failed to handle message %s from pipe %s!', message, self.in_pipe)
class PipeIOEventHandler(PipeIOEventHandlerBase):
def handle_event(self, message):
json_bytes = dumps(message).encode()
self.pipe_io.send_message(json_bytes)
def handle_pipe_event(self, message_bytes):
json = loads(message_bytes)
self.server_connector.send(json)
class TransformerPipeIOEventHandler(PipeIOEventHandlerBase):
# pylint: disable=too-many-arguments
def __init__(
self, key, in_pipe_path, out_pipe_path,
transform_in_cmd, transform_out_cmd,
permissions=DEFAULT_PERMISSIONS
):
self._transform_in = partial(self._transform_message, transform_in_cmd)
self._transform_out = partial(self._transform_message, transform_out_cmd)
super().__init__(key, in_pipe_path, out_pipe_path, permissions)
@staticmethod
def _transform_message(transform_cmd, message):
proc = run(
transform_cmd,
input=message,
stdout=PIPE,
stderr=PIPE,
shell=True
)
if proc.returncode == 0:
return proc.stdout
raise ValueError(f'Transforming message {message} failed!')
def handle_event(self, message):
json_bytes = dumps(message).encode()
transformed_bytes = self._transform_out(json_bytes)
if transformed_bytes:
self.pipe_io.send_message(transformed_bytes)
def handle_pipe_event(self, message_bytes):
transformed_bytes = self._transform_in(message_bytes)
if transformed_bytes:
json_message = loads(transformed_bytes)
self.server_connector.send(json_message)
class CommandEventHandler(PipeIOEventHandler):
def __init__(self, key, command, permissions=DEFAULT_PERMISSIONS):
super().__init__(
key,
self._generate_tempfilename(),
self._generate_tempfilename(),
permissions
)
self._proc_stdin = open(self.pipe_io.out_pipe, 'rb')
self._proc_stdout = open(self.pipe_io.in_pipe, 'wb')
self._proc = Popen(
command, shell=True, executable='/bin/bash',
stdin=self._proc_stdin, stdout=self._proc_stdout, stderr=PIPE,
start_new_session=True
)
self._monitor_proc_thread = self._start_monitor_proc()
def _generate_tempfilename(self):
# pylint: disable=no-self-use
random_filename = partial(token_urlsafe, 10)
return join('/tmp', f'{type(self).__name__}.{random_filename()}')
def _start_monitor_proc(self):
thread = Thread(target=self._monitor_proc, daemon=True)
thread.start()
return thread
@terminate_process_on_failure
def _monitor_proc(self):
return_code = self._proc.wait()
if return_code == -int(SIGTERM):
# supervisord asked the program to terminate, this is fine
return
if return_code != 0:
_, stderr = self._proc.communicate()
raise RuntimeError(f'Subprocess failed ({return_code})! Stderr:\n{stderr.decode()}')
def cleanup(self):
with suppress(ProcessLookupError):
process_group_id = getpgid(self._proc.pid)
killpg(process_group_id, SIGTERM)
self._proc_stdin.close()
self._proc_stdout.close()
super().cleanup()

View File

@ -0,0 +1,2 @@
from .pipe_io_server import PipeIOServer
from .terminate_process_on_failure import terminate_process_on_failure

View File

@ -0,0 +1,27 @@
from collections import deque
from threading import Lock, Condition
class Deque:
def __init__(self):
self._queue = deque()
self._mutex = Lock()
self._not_empty = Condition(self._mutex)
def pop(self):
with self._mutex:
while not self._queue:
self._not_empty.wait()
return self._queue.pop()
def push(self, item):
self._push(item, self._queue.appendleft)
def push_front(self, item):
self._push(item, self._queue.append)
def _push(self, item, put_method):
with self._mutex:
put_method(item)
self._not_empty.notify()

View File

@ -0,0 +1,16 @@
from os import mkfifo, remove, chmod
from os.path import exists
class Pipe:
def __init__(self, path):
self.path = path
def recreate(self, permissions):
self.remove()
mkfifo(self.path)
chmod(self.path, permissions) # use chmod to ignore umask
def remove(self):
if exists(self.path):
remove(self.path)

View File

@ -0,0 +1,73 @@
from abc import ABC, abstractmethod
from threading import Thread, Event
from typing import Callable
from .pipe_reader_thread import PipeReaderThread
from .pipe_writer_thread import PipeWriterThread
from .pipe import Pipe
from .terminate_process_on_failure import terminate_process_on_failure
class PipeIOServer(ABC, Thread):
def __init__(self, in_pipe=None, out_pipe=None, permissions=0o600):
super().__init__(daemon=True)
self._in_pipe, self._out_pipe = in_pipe, out_pipe
self._create_pipes(permissions)
self._stop_event = Event()
self._reader_thread, self._writer_thread = self._create_io_threads()
self._io_threads = (self._reader_thread, self._writer_thread)
self._on_stop = lambda: None
def _create_pipes(self, permissions):
Pipe(self.in_pipe).recreate(permissions)
Pipe(self.out_pipe).recreate(permissions)
@property
def in_pipe(self):
return self._in_pipe
@property
def out_pipe(self):
return self._out_pipe
def _create_io_threads(self):
reader_thread = PipeReaderThread(self.in_pipe, self._stop_event, self.handle_message)
writer_thread = PipeWriterThread(self.out_pipe, self._stop_event)
return reader_thread, writer_thread
@abstractmethod
def handle_message(self, message):
raise NotImplementedError()
def send_message(self, message):
self._writer_thread.write(message)
@terminate_process_on_failure
def run(self):
for thread in self._io_threads:
thread.start()
self._stop_event.wait()
self._stop_threads()
def stop(self):
self._stop_event.set()
if self.is_alive():
self.join()
def _stop_threads(self):
for thread in self._io_threads:
if thread.is_alive():
thread.stop()
Pipe(self.in_pipe).remove()
Pipe(self.out_pipe).remove()
self._on_stop()
def _set_on_stop(self, value):
if not isinstance(value, Callable):
raise ValueError("Supplied object is not callable!")
self._on_stop = value
on_stop = property(fset=_set_on_stop)
def wait(self):
self._stop_event.wait()

View File

@ -0,0 +1,44 @@
from contextlib import suppress
from os import open as osopen
from os import write, close, O_WRONLY, O_NONBLOCK
from threading import Thread
from .terminate_process_on_failure import terminate_process_on_failure
class PipeReaderThread(Thread):
eof = b''
stop_sequence = b'stop_reading\n'
def __init__(self, pipe_path, stop_event, message_handler):
super().__init__(daemon=True)
self._message_handler = message_handler
self._pipe_path = pipe_path
self._stop_event = stop_event
@terminate_process_on_failure
def run(self):
with self._open() as pipe:
while True:
message = pipe.readline()
if message == self.stop_sequence:
self._stop_event.set()
break
if message == self.eof:
self._open().close()
continue
self._message_handler(message[:-1])
def _open(self):
return open(self._pipe_path, 'rb')
def stop(self):
while self.is_alive():
self._unblock()
self.join()
def _unblock(self):
with suppress(OSError):
fd = osopen(self._pipe_path, O_WRONLY | O_NONBLOCK)
write(fd, self.stop_sequence)
close(fd)

View File

@ -0,0 +1,50 @@
from contextlib import suppress
from os import O_NONBLOCK, O_RDONLY, close
from os import open as osopen
from threading import Thread
from .terminate_process_on_failure import terminate_process_on_failure
from .deque import Deque
class PipeWriterThread(Thread):
def __init__(self, pipe_path, stop_event):
super().__init__(daemon=True)
self._pipe_path = pipe_path
self._stop_event = stop_event
self._write_queue = Deque()
def write(self, message):
self._write_queue.push(message)
@terminate_process_on_failure
def run(self):
with self._open() as pipe:
while True:
message = self._write_queue.pop()
if message is None:
self._stop_event.set()
break
try:
pipe.write(message + b'\n')
pipe.flush()
except BrokenPipeError:
try: # pipe was reopened, close() flushed the message
pipe.close()
except BrokenPipeError: # close() discarded the message
self._write_queue.push_front(message)
pipe = self._open()
def _open(self):
return open(self._pipe_path, 'wb')
def stop(self):
while self.is_alive():
self._unblock()
self.join()
def _unblock(self):
with suppress(OSError):
fd = osopen(self._pipe_path, O_RDONLY | O_NONBLOCK)
self._write_queue.push_front(None)
close(fd)

View File

@ -0,0 +1,15 @@
from functools import wraps
from os import kill, getpid
from signal import SIGTERM
from traceback import print_exc
def terminate_process_on_failure(fun):
@wraps(fun)
def wrapper(*args, **kwargs):
try:
return fun(*args, **kwargs)
except: # pylint: disable=bare-except
print_exc()
kill(getpid(), SIGTERM)
return wrapper

View File

@ -2,6 +2,8 @@
# All Rights Reserved. See LICENSE file for details.
from abc import ABC, abstractmethod
from inspect import currentframe
from typing import Iterable
from tfw.networking.event_handlers import ServerConnector
from tfw.crypto import message_checksum, KeyManager, verify_message
@ -19,7 +21,12 @@ class EventHandlerBase(ABC):
"""
def __init__(self, key):
self.server_connector = ServerConnector()
self.keys = [key]
self.keys = []
if isinstance(key, str):
self.keys.append(key)
elif isinstance(key, Iterable):
self.keys = list(key)
self.subscribe(*self.keys)
self.server_connector.register_callback(self.event_handler_callback)
@ -54,6 +61,8 @@ class EventHandlerBase(ABC):
subscribed to 'fsm' will receive 'fsm_update'
messages as well.
"""
if '' in self.keys:
return True
return message['key'] in self.keys
def dispatch_handling(self, message):
@ -105,6 +114,18 @@ class EventHandlerBase(ABC):
"""
pass
@classmethod
def get_local_instances(cls):
frame = currentframe()
if frame is None:
raise EnvironmentError('inspect.currentframe() is not supported!')
locals_values = frame.f_back.f_locals.values()
return {
instance for instance in locals_values
if isinstance(instance, cls)
}
class FSMAwareEventHandler(EventHandlerBase, ABC):
# pylint: disable=abstract-method