Merge branch 'chausie'

This commit is contained in:
Kristóf Tóth 2019-09-12 13:42:07 -04:00
commit 983f02f362
158 changed files with 3390 additions and 2336 deletions

View File

@ -10,4 +10,4 @@ pipeline:
- docker push eu.gcr.io/avatao-challengestore/tutorial-framework:${DRONE_TAG} - docker push eu.gcr.io/avatao-challengestore/tutorial-framework:${DRONE_TAG}
when: when:
event: 'tag' event: 'tag'
branch: refs/tags/ocicat-20* branch: refs/tags/chausie-20*

View File

@ -1,22 +0,0 @@
#!/usr/bin/env bash
set -eu
set -o pipefail
set -o errtrace
shopt -s expand_aliases
[ "$(uname)" == "Darwin" ] && alias readlink="greadlink" || :
GREEN='\033[0;32m'
NC='\033[0m'
here="$(dirname "$(readlink -f "$0")")"
cd "${here}/../.git/hooks"
rm -f pre-push pre-commit || :
prepush_script="../../.git-hooks/pre-push.sh"
precommit_script="../../.git-hooks/pre-commit.sh"
[ -f "${prepush_script}" ] && ln -s "${prepush_script}" pre-push
[ -f "${precommit_script}" ] && ln -s "${precommit_script}" pre-commit
echo -e "\n${GREEN}Done! Hooks applied, you can start committing and pushing!${NC}\n"

View File

@ -1,18 +0,0 @@
#!/usr/bin/env bash
set -eu
set -o pipefail
set -o errtrace
shopt -s expand_aliases
RED='\033[0;31m'
GREEN='\033[0;32m'
NC='\033[0m'
echo -e "Running pylint...\n"
if pylint lib; then
echo -e "\n${GREEN}Pylint found no errors!${NC}\n"
else
echo -e "\n${RED}Pylint failed with errors${NC}\n"
exit 1
fi

View File

@ -6,6 +6,7 @@ disable = missing-docstring, too-few-public-methods, invalid-name
[SIMILARITIES] [SIMILARITIES]
min-similarity-lines=8
ignore-comments=yes ignore-comments=yes
ignore-docstrings=yes ignore-docstrings=yes
ignore-imports=yes ignore-imports=yes

View File

@ -1,30 +1,31 @@
FROM avatao/frontend-tutorial-framework:chausie-20190912 as frontend
FROM avatao/debian:buster FROM avatao/debian:buster
RUN curl -sL https://deb.nodesource.com/setup_8.x | sudo -E bash - &&\ RUN apt-get update &&\
curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | sudo apt-key add - &&\
echo "deb https://dl.yarnpkg.com/debian/ stable main" | sudo tee /etc/apt/sources.list.d/yarn.list &&\
apt-get update &&\
apt-get install -y --no-install-recommends \ apt-get install -y --no-install-recommends \
nodejs \
yarn \
supervisor \ supervisor \
libzmq5 \ libzmq5 \
nginx \ nginx \
jq \ jq \
gettext-base &&\ gettext-base &&\
rm -rf /var/lib/apt/lists/* &&\ rm -rf /var/lib/apt/lists/* &&\
ln -sf /bin/bash /bin/sh ln -sf /bin/bash /bin/sh
COPY requirements.txt /tmp COPY requirements.txt /tmp
RUN pip3 install -r /tmp/requirements.txt RUN pip3 install -r /tmp/requirements.txt
RUN curl -Ls https://github.com/krallin/tini/releases/download/v0.18.0/tini-amd64 --output /bin/init &&\
echo "12d20136605531b09a2c2dac02ccee85e1b874eb322ef6baf7561cd93f93c855 /bin/init" |\
sha256sum --check --status &&\
chmod 755 /bin/init
ENV TFW_PUBLIC_PORT=8888 \ ENV TFW_PUBLIC_PORT=8888 \
TFW_WEB_PORT=4242 \ TFW_WEB_PORT=4242 \
TFW_LOGIN_APP_PORT=6666 \ TFW_LOGIN_APP_PORT=6666 \
TFW_TERMINADO_PORT=7878 \ TFW_TERMINADO_PORT=7878 \
TFW_SUPERVISOR_HTTP_PORT=9001 \ TFW_SUPERVISOR_HTTP_PORT=9001 \
TFW_PUBLISHER_PORT=7654 \ TFW_PUB_PORT=7654 \
TFW_RECEIVER_PORT=8765 TFW_PULL_PORT=8765
EXPOSE ${TFW_PUBLIC_PORT} EXPOSE ${TFW_PUBLIC_PORT}
@ -39,29 +40,26 @@ ENV PYTHONPATH="/usr/local/lib" \
TFW_FRONTEND_DIR="/srv/frontend" \ TFW_FRONTEND_DIR="/srv/frontend" \
TFW_DIR="/.tfw" \ TFW_DIR="/.tfw" \
TFW_SERVER_DIR="/.tfw/tfw_server" \ TFW_SERVER_DIR="/.tfw/tfw_server" \
TFW_SNAPSHOTS_DIR="/.tfw/snapshots" \
TFW_AUTH_KEY="/tmp/tfw-auth.key" \ TFW_AUTH_KEY="/tmp/tfw-auth.key" \
TFW_LOGS_DIR="/var/log/tfw" \
TFW_PIPES_DIR="/run/tfw" \
TFW_SNAPSHOTS_DIR="/tmp/tfw-snapshots" \
TFW_HISTFILE="/home/${AVATAO_USER}/.bash_history" \ TFW_HISTFILE="/home/${AVATAO_USER}/.bash_history" \
PROMPT_COMMAND="history -a" PROMPT_COMMAND="history -a"
COPY bashrc /tmp COPY bashrc supervisor/tfw_init.sh /tmp/
RUN echo "export HISTFILE=${TFW_HISTFILE}" >> /tmp/bashrc &&\
cat /tmp/bashrc >> /home/${AVATAO_USER}/.bashrc
COPY supervisor/supervisord.conf ${TFW_SUPERVISORD_CONF} COPY supervisor/supervisord.conf ${TFW_SUPERVISORD_CONF}
COPY supervisor/components/ ${TFW_SUPERVISORD_COMPONENTS} COPY supervisor/components/ ${TFW_SUPERVISORD_COMPONENTS}
COPY nginx/nginx.conf ${TFW_NGINX_CONF} COPY nginx/nginx.conf ${TFW_NGINX_CONF}
COPY nginx/default.conf ${TFW_NGINX_DEFAULT} COPY nginx/default.conf ${TFW_NGINX_DEFAULT}
COPY nginx/components/ ${TFW_NGINX_COMPONENTS} COPY nginx/components/ ${TFW_NGINX_COMPONENTS}
COPY lib LICENSE ${TFW_LIB_DIR}/ COPY tfw ${TFW_LIB_DIR}/tfw
COPY supervisor/tfw_server.py ${TFW_SERVER_DIR}/ COPY supervisor/tfw_server.py ${TFW_SERVER_DIR}/
COPY --from=frontend /srv/dist ${TFW_FRONTEND_DIR}
RUN for dir in "${TFW_LIB_DIR}"/{tfw,tao,envvars} "/etc/nginx" "/etc/supervisor"; do \ VOLUME ["${TFW_LOGS_DIR}", "${TFW_PIPES_DIR}"]
chown -R root:root "$dir" && chmod -R 700 "$dir"; \
done
ONBUILD ARG BUILD_CONTEXT="solvable" ONBUILD ARG BUILD_CONTEXT="solvable"
ONBUILD ARG NOFRONTEND=""
ONBUILD COPY ${BUILD_CONTEXT}/nginx/ ${TFW_NGINX_COMPONENTS} ONBUILD COPY ${BUILD_CONTEXT}/nginx/ ${TFW_NGINX_COMPONENTS}
ONBUILD COPY ${BUILD_CONTEXT}/supervisor/ ${TFW_SUPERVISORD_COMPONENTS} ONBUILD COPY ${BUILD_CONTEXT}/supervisor/ ${TFW_SUPERVISORD_COMPONENTS}
@ -69,11 +67,7 @@ ONBUILD COPY ${BUILD_CONTEXT}/supervisor/ ${TFW_SUPERVISORD_COMPONENTS}
ONBUILD RUN for f in "${TFW_NGINX_DEFAULT}" ${TFW_NGINX_COMPONENTS}/*.conf; do \ ONBUILD RUN for f in "${TFW_NGINX_DEFAULT}" ${TFW_NGINX_COMPONENTS}/*.conf; do \
envsubst "$(printenv | cut -d= -f1 | grep TFW_ | sed -e 's/^/$/g')" < $f > $f~ && mv $f~ $f ;\ envsubst "$(printenv | cut -d= -f1 | grep TFW_ | sed -e 's/^/$/g')" < $f > $f~ && mv $f~ $f ;\
done done
ONBUILD VOLUME ["/etc/nginx", "/var/lib/nginx", "/var/log/nginx", "${TFW_LIB_DIR}/envvars", "${TFW_LIB_DIR}/tfw"] ONBUILD VOLUME ["/etc/nginx", "/var/lib/nginx", "/var/log/nginx", "${TFW_LIB_DIR}/tfw"]
ONBUILD COPY ${BUILD_CONTEXT}/frontend /data/
ONBUILD RUN test -z "${NOFRONTEND}" && cd /data && yarn install --frozen-lockfile || :
ONBUILD RUN test -z "${NOFRONTEND}" && cd /data && yarn build --no-progress || :
ONBUILD RUN test -z "${NOFRONTEND}" && mv /data/dist ${TFW_FRONTEND_DIR} && rm -rf /data || :
ENTRYPOINT ["/bin/init", "--"]
CMD exec supervisord --nodaemon --configuration ${TFW_SUPERVISORD_CONF} CMD exec supervisord --nodaemon --configuration ${TFW_SUPERVISORD_CONF}

12
LICENSE
View File

@ -1,12 +0,0 @@
AVATAO CONFIDENTIAL
Copyright (C) 2018 Avatao.com Innovative Learning Kft.
All Rights Reserved.
All source code, configuration files and documentation contained herein
is, and remains the exclusive property of Avatao.com Innovative Learning Kft.
The intellectual and technical concepts contained herein are proprietary
to Avatao.com Innovative Learning Kft. and are protected by trade secret
or copyright law. Dissemination of this information or reproduction of
this material is strictly forbidden unless prior written permission is
obtained from Avatao.com Innovative Learning Kft.

View File

@ -1 +1 @@
ocicat chausie

View File

@ -1,2 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.

View File

@ -1,4 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from .envvars import TAOENV

View File

@ -1,6 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from envvars import LazyEnvironment
TAOENV = LazyEnvironment('AVATAO_', 'taoenvtuple').environment

View File

@ -1,2 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.

View File

@ -1,16 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from .directory_monitoring_event_handler import DirectoryMonitoringEventHandler
from .process_managing_event_handler import ProcessManagingEventHandler
from .terminal_event_handler import TerminalEventHandler
from .ide_event_handler import IdeEventHandler
from .history_monitor import HistoryMonitor, BashMonitor, GDBMonitor
from .terminal_commands import TerminalCommands
from .log_monitoring_event_handler import LogMonitoringEventHandler
from .fsm_managing_event_handler import FSMManagingEventHandler
from .snapshot_provider import SnapshotProvider
from .pipe_io_event_handler import PipeIOEventHandlerBase, PipeIOEventHandler, PipeIOServer
from .pipe_io_event_handler import TransformerPipeIOEventHandler, CommandEventHandler
from .directory_snapshotting_event_handler import DirectorySnapshottingEventHandler
from .commands_equal import CommandsEqual

View File

@ -1,81 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from functools import wraps
from watchdog.events import FileSystemEventHandler as FileSystemWatchdogEventHandler
from tfw.networking.event_handlers.server_connector import ServerUplinkConnector
from tfw.decorators.rate_limiter import RateLimiter
from tfw.mixins.observer_mixin import ObserverMixin
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class DirectoryMonitor(ObserverMixin):
def __init__(self, ide_key, directories):
self.eventhandler = IdeReloadWatchdogEventHandler(ide_key)
for directory in directories:
self.observer.schedule(self.eventhandler, directory, recursive=True)
self.pause, self.resume = self.eventhandler.pause, self.eventhandler.resume
@property
def ignore(self):
return self.eventhandler.ignore
@ignore.setter
def ignore(self, value):
self.eventhandler.ignore = value if value >= 0 else 0
@property
def pauser(self):
return DirectoryMonitor.Pauser(self)
class Pauser:
def __init__(self, directory_monitor):
self.directorymonitor = directory_monitor
def __enter__(self):
self.directorymonitor.pause()
def __exit__(self, exc_type, exc_val, exc_tb):
self.directorymonitor.resume()
class IdeReloadWatchdogEventHandler(FileSystemWatchdogEventHandler):
def __init__(self, ide_key):
super().__init__()
self.ide_key = ide_key
self.uplink = ServerUplinkConnector()
self._paused = False
self.ignore = 0
def pause(self):
self._paused = True
def resume(self):
self._paused = False
@RateLimiter(rate_per_second=2)
def on_modified(self, event):
if self._paused:
return
if self.ignore > 0:
self.ignore = self.ignore - 1
return
LOG.debug(event)
self.uplink.send({
'key': self.ide_key,
'data': {'command': 'reload'}
})
def with_monitor_paused(fun):
@wraps(fun)
def wrapper(self, *args, **kwargs):
if self.monitor:
with self.monitor.pauser:
return fun(self, *args, **kwargs)
return fun(self, *args, **kwargs)
return wrapper

View File

@ -1,70 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from os.path import isdir, exists
from tfw.event_handler_base import EventHandlerBase
from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin
from tfw.components.directory_monitor import DirectoryMonitor
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class DirectoryMonitoringEventHandler(EventHandlerBase, MonitorManagerMixin):
def __init__(self, key, directory):
super().__init__(key)
self._directory = directory
MonitorManagerMixin.__init__(
self,
DirectoryMonitor,
key,
self._directory
)
self.commands = {
'pause': self.pause,
'resume': self.resume,
'ignore': self.ignore,
'selectdir': self.selectdir
}
@property
def directory(self):
return self._directory
@directory.setter
def directory(self, directory):
if not exists(directory) or not isdir(directory):
raise EnvironmentError('No such directory!')
self._directory = directory
def handle_event(self, message):
try:
message['data'] = self.commands[message['data']['command']](message['data'])
return message
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def pause(self, data):
self.monitor.pause()
return data
def resume(self, data):
self.monitor.resume()
return data
def ignore(self, data):
self.monitor.ignore += data['ignore']
return data
def selectdir(self, data):
try:
self.directory = data['directory']
self.reload_monitor()
return data
except EnvironmentError:
LOG.error('Failed to switch directory!')
def cleanup(self):
self.monitor.stop()

View File

@ -1,98 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from tfw.event_handler_base import EventHandlerBase
from tfw.crypto import KeyManager, sign_message, verify_message
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class FSMManagingEventHandler(EventHandlerBase):
"""
EventHandler responsible for managing the state machine of
the framework (TFW FSM).
tfw.networking.TFWServer instances automatically send 'trigger'
commands to the event handler listening on the 'fsm' key,
which should be an instance of this event handler.
This event handler accepts messages that have a
data['command'] key specifying a command to be executed.
An 'fsm_update' message is broadcasted after every successful
command.
"""
def __init__(self, key, fsm_type, require_signature=False):
super().__init__(key)
self.fsm = fsm_type()
self._fsm_updater = FSMUpdater(self.fsm)
self.auth_key = KeyManager().auth_key
self._require_signature = require_signature
self.command_handlers = {
'trigger': self.handle_trigger,
'update': self.handle_update
}
def handle_event(self, message):
try:
message = self.command_handlers[message['data']['command']](message)
if message:
fsm_update_message = self._fsm_updater.fsm_update
sign_message(self.auth_key, message)
sign_message(self.auth_key, fsm_update_message)
self.server_connector.broadcast(fsm_update_message)
return message
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def handle_trigger(self, message):
"""
Attempts to step the FSM with the supplied trigger.
:param message: TFW message with a data field containing
the action to try triggering in data['value']
"""
trigger = message['data']['value']
if self._require_signature:
if not verify_message(self.auth_key, message):
LOG.error('Ignoring unsigned trigger command: %s', message)
return None
if self.fsm.step(trigger):
return message
return None
def handle_update(self, message):
"""
Does nothing, but triggers an 'fsm_update' message.
"""
# pylint: disable=no-self-use
return message
class FSMUpdater:
def __init__(self, fsm):
self.fsm = fsm
@property
def fsm_update(self):
return {
'key': 'fsm_update',
'data': self.fsm_update_data
}
@property
def fsm_update_data(self):
valid_transitions = [
{'trigger': trigger}
for trigger in self.fsm.get_triggers(self.fsm.state)
]
last_fsm_event = self.fsm.event_log[-1]
last_fsm_event['timestamp'] = last_fsm_event['timestamp'].isoformat()
return {
'current_state': self.fsm.state,
'valid_transitions': valid_transitions,
'in_accepted_state': self.fsm.in_accepted_state,
'last_event': last_fsm_event
}

View File

@ -1,255 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from os.path import isfile, join, relpath, exists, isdir, realpath
from glob import glob
from fnmatch import fnmatchcase
from typing import Iterable
from tfw.event_handler_base import EventHandlerBase
from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin
from tfw.components.directory_monitor import DirectoryMonitor
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class FileManager: # pylint: disable=too-many-instance-attributes
def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None):
self._exclude, self.exclude = None, exclude
self._allowed_directories, self.allowed_directories = None, allowed_directories
self._workdir, self.workdir = None, working_directory
self._filename, self.filename = None, selected_file or self.files[0]
@property
def exclude(self):
return self._exclude
@exclude.setter
def exclude(self, exclude):
if exclude is None:
return
if not isinstance(exclude, Iterable):
raise TypeError('Exclude must be Iterable!')
self._exclude = exclude
@property
def workdir(self):
return self._workdir
@workdir.setter
def workdir(self, directory):
if not exists(directory) or not isdir(directory):
raise EnvironmentError(f'"{directory}" is not a directory!')
if not self._is_in_allowed_dir(directory):
raise EnvironmentError(f'Directory "{directory}" is not allowed!')
self._workdir = directory
@property
def allowed_directories(self):
return self._allowed_directories
@allowed_directories.setter
def allowed_directories(self, directories):
self._allowed_directories = directories
@property
def filename(self):
return self._filename
@filename.setter
def filename(self, filename):
if filename not in self.files:
raise EnvironmentError('No such file in workdir!')
self._filename = filename
@property
def files(self):
return [
self._relpath(file)
for file in glob(join(self._workdir, '**/*'), recursive=True)
if isfile(file)
and self._is_in_allowed_dir(file)
and not self._is_blacklisted(file)
]
@property
def file_contents(self):
with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile:
return ifile.read()
@file_contents.setter
def file_contents(self, value):
with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile:
ofile.write(value)
def _is_in_allowed_dir(self, path):
return any(
realpath(path).startswith(allowed_dir)
for allowed_dir in self.allowed_directories
)
def _is_blacklisted(self, file):
return any(
fnmatchcase(file, blacklisted)
for blacklisted in self.exclude
)
def _filepath(self, filename):
return join(self._workdir, filename)
def _relpath(self, filename):
return relpath(self._filepath(filename), start=self._workdir)
class IdeEventHandler(EventHandlerBase, MonitorManagerMixin):
# pylint: disable=too-many-arguments,anomalous-backslash-in-string
"""
Event handler implementing the backend of our browser based IDE.
By default all files in the directory specified in __init__ are displayed
on the fontend. Note that this is a stateful component.
When any file in the selected directory changes they are automatically refreshed
on the frontend (this is done by listening to inotify events).
This EventHandler accepts messages that have a data['command'] key specifying
a command to be executed.
The API of each command is documented in their respective handler.
"""
def __init__(self, key, directory, allowed_directories, selected_file=None, exclude=None):
"""
:param key: the key this instance should listen to
:param directory: working directory which the EventHandler should serve files from
:param allowed_directories: list of directories that can be switched to using selectdir
:param selected_file: file that is selected by default
:param exclude: list of filenames that should not appear between files (for .o, .pyc, etc.)
"""
super().__init__(key)
try:
self.filemanager = FileManager(
allowed_directories=allowed_directories,
working_directory=directory,
selected_file=selected_file,
exclude=exclude
)
except IndexError:
raise EnvironmentError(
f'No file(s) in IdeEventHandler working_directory "{directory}"!'
)
MonitorManagerMixin.__init__(
self,
DirectoryMonitor,
self.key,
self.filemanager.allowed_directories
)
self.commands = {
'read': self.read,
'write': self.write,
'select': self.select,
'selectdir': self.select_dir,
'exclude': self.exclude
}
def read(self, data):
"""
Read the currently selected file.
:return dict: TFW message data containing key 'content'
(contents of the selected file)
"""
try:
data['content'] = self.filemanager.file_contents
except PermissionError:
data['content'] = 'You have no permission to open that file :('
except FileNotFoundError:
data['content'] = 'This file was removed :('
except Exception: # pylint: disable=broad-except
data['content'] = 'Failed to read file :('
return data
def write(self, data):
"""
Overwrites a file with the desired string.
:param data: TFW message data containing key 'content'
(new file content)
"""
self.monitor.ignore = self.monitor.ignore + 1
try:
self.filemanager.file_contents = data['content']
except Exception: # pylint: disable=broad-except
LOG.exception('Error writing file!')
del data['content']
return data
def select(self, data):
"""
Selects a file from the current directory.
:param data: TFW message data containing 'filename'
(name of file to select relative to the current directory)
"""
try:
self.filemanager.filename = data['filename']
except EnvironmentError:
LOG.exception('Failed to select file "%s"', data['filename'])
return data
def select_dir(self, data):
"""
Select a new working directory to display files from.
:param data: TFW message data containing 'directory'
(absolute path of diretory to select.
must be a path whitelisted in
self.allowed_directories)
"""
try:
self.filemanager.workdir = data['directory']
self.reload_monitor()
try:
self.filemanager.filename = self.filemanager.files[0]
self.read(data)
except IndexError:
data['content'] = 'No files in this directory :('
except EnvironmentError as err:
LOG.error('Failed to select directory "%s". Reason: %s', data['directory'], str(err))
return data
def exclude(self, data):
"""
Overwrite list of excluded files
:param data: TFW message data containing 'exclude'
(list of unix-style filename patterns to be excluded,
e.g.: ["\*.pyc", "\*.o")
"""
try:
self.filemanager.exclude = list(data['exclude'])
except TypeError:
LOG.error('Exclude must be Iterable!')
return data
def attach_fileinfo(self, data):
"""
Basic information included in every response to the frontend.
"""
data['filename'] = self.filemanager.filename
data['files'] = self.filemanager.files
data['directory'] = self.filemanager.workdir
def handle_event(self, message):
try:
data = message['data']
message['data'] = self.commands[data['command']](data)
self.attach_fileinfo(data)
return message
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def cleanup(self):
self.monitor.stop()

View File

@ -1,57 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
import logging
from os.path import dirname
from watchdog.events import PatternMatchingEventHandler as PatternMatchingWatchdogEventHandler
from tfw.networking.event_handlers.server_connector import ServerUplinkConnector
from tfw.decorators.rate_limiter import RateLimiter
from tfw.mixins.observer_mixin import ObserverMixin
from tfw.mixins.supervisor_mixin import SupervisorLogMixin
class LogMonitor(ObserverMixin):
def __init__(self, process_name, log_tail=0):
self.prevent_log_recursion()
event_handler = SendLogWatchdogEventHandler(
process_name,
log_tail=log_tail
)
self.observer.schedule(
event_handler,
event_handler.path
)
@staticmethod
def prevent_log_recursion():
# This is done to prevent inotify event logs triggering themselves (infinite log recursion)
logging.getLogger('watchdog.observers.inotify_buffer').propagate = False
class SendLogWatchdogEventHandler(PatternMatchingWatchdogEventHandler, SupervisorLogMixin):
def __init__(self, process_name, log_tail=0):
self.process_name = process_name
self.procinfo = self.supervisor.getProcessInfo(self.process_name)
super().__init__([
self.procinfo['stdout_logfile'],
self.procinfo['stderr_logfile']
])
self.uplink = ServerUplinkConnector()
self.log_tail = log_tail
@property
def path(self):
return dirname(self.procinfo['stdout_logfile'])
@RateLimiter(rate_per_second=5)
def on_modified(self, event):
self.uplink.send({
'key': 'processlog',
'data': {
'command': 'new_log',
'stdout': self.read_stdout(self.process_name, tail=self.log_tail),
'stderr': self.read_stderr(self.process_name, tail=self.log_tail)
}
})

View File

@ -1,67 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from tfw.event_handler_base import EventHandlerBase
from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin
from tfw.components.log_monitor import LogMonitor
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class LogMonitoringEventHandler(EventHandlerBase, MonitorManagerMixin):
"""
Monitors the output of a supervisor process (stdout, stderr) and
sends the results to the frontend.
Accepts messages that have a data['command'] key specifying
a command to be executed.
The API of each command is documented in their respective handler.
"""
def __init__(self, key, process_name, log_tail=0):
super().__init__(key)
self.process_name = process_name
self.log_tail = log_tail
MonitorManagerMixin.__init__(
self,
LogMonitor,
self.process_name,
self.log_tail
)
self.command_handlers = {
'process_name': self.handle_process_name,
'log_tail': self.handle_log_tail
}
def handle_event(self, message):
try:
data = message['data']
self.command_handlers[data['command']](data)
self.reload_monitor()
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def handle_process_name(self, data):
"""
Changes the monitored process.
:param data: TFW message data containing 'value'
(name of the process to monitor)
"""
self.set_monitor_args(data['value'], self.log_tail)
def handle_log_tail(self, data):
"""
Sets tail length of the log the monitor will send
to the frontend (the monitor will send back the last
'value' characters of the log).
:param data: TFW message data containing 'value'
(new tail length)
"""
self.set_monitor_args(self.process_name, data['value'])
def cleanup(self):
self.monitor.stop()

View File

@ -1,143 +0,0 @@
from abc import abstractmethod
from json import loads, dumps
from subprocess import run, PIPE, Popen
from functools import partial
from os import getpgid, killpg
from os.path import join
from signal import SIGTERM
from secrets import token_urlsafe
from threading import Thread
from contextlib import suppress
from tfw.event_handler_base import EventHandlerBase
from tfw.config.logs import logging
from .pipe_io_server import PipeIOServer, terminate_process_on_failure
LOG = logging.getLogger(__name__)
DEFAULT_PERMISSIONS = 0o600
class PipeIOEventHandlerBase(EventHandlerBase):
def __init__(self, key, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS):
super().__init__(key)
self.pipe_io = CallbackPipeIOServer(
in_pipe_path,
out_pipe_path,
self.handle_pipe_event,
permissions
)
self.pipe_io.start()
@abstractmethod
def handle_pipe_event(self, message_bytes):
raise NotImplementedError()
def cleanup(self):
self.pipe_io.stop()
class CallbackPipeIOServer(PipeIOServer):
def __init__(self, in_pipe_path, out_pipe_path, callback, permissions):
super().__init__(in_pipe_path, out_pipe_path, permissions)
self.callback = callback
def handle_message(self, message):
try:
self.callback(message)
except: # pylint: disable=bare-except
LOG.exception('Failed to handle message %s from pipe %s!', message, self.in_pipe)
class PipeIOEventHandler(PipeIOEventHandlerBase):
def handle_event(self, message):
json_bytes = dumps(message).encode()
self.pipe_io.send_message(json_bytes)
def handle_pipe_event(self, message_bytes):
json = loads(message_bytes)
self.server_connector.send(json)
class TransformerPipeIOEventHandler(PipeIOEventHandlerBase):
# pylint: disable=too-many-arguments
def __init__(
self, key, in_pipe_path, out_pipe_path,
transform_in_cmd, transform_out_cmd,
permissions=DEFAULT_PERMISSIONS
):
self._transform_in = partial(self._transform_message, transform_in_cmd)
self._transform_out = partial(self._transform_message, transform_out_cmd)
super().__init__(key, in_pipe_path, out_pipe_path, permissions)
@staticmethod
def _transform_message(transform_cmd, message):
proc = run(
transform_cmd,
input=message,
stdout=PIPE,
stderr=PIPE,
shell=True
)
if proc.returncode == 0:
return proc.stdout
raise ValueError(f'Transforming message {message} failed!')
def handle_event(self, message):
json_bytes = dumps(message).encode()
transformed_bytes = self._transform_out(json_bytes)
if transformed_bytes:
self.pipe_io.send_message(transformed_bytes)
def handle_pipe_event(self, message_bytes):
transformed_bytes = self._transform_in(message_bytes)
if transformed_bytes:
json_message = loads(transformed_bytes)
self.server_connector.send(json_message)
class CommandEventHandler(PipeIOEventHandler):
def __init__(self, key, command, permissions=DEFAULT_PERMISSIONS):
super().__init__(
key,
self._generate_tempfilename(),
self._generate_tempfilename(),
permissions
)
self._proc_stdin = open(self.pipe_io.out_pipe, 'rb')
self._proc_stdout = open(self.pipe_io.in_pipe, 'wb')
self._proc = Popen(
command, shell=True, executable='/bin/bash',
stdin=self._proc_stdin, stdout=self._proc_stdout, stderr=PIPE,
start_new_session=True
)
self._monitor_proc_thread = self._start_monitor_proc()
def _generate_tempfilename(self):
# pylint: disable=no-self-use
random_filename = partial(token_urlsafe, 10)
return join('/tmp', f'{type(self).__name__}.{random_filename()}')
def _start_monitor_proc(self):
thread = Thread(target=self._monitor_proc, daemon=True)
thread.start()
return thread
@terminate_process_on_failure
def _monitor_proc(self):
return_code = self._proc.wait()
if return_code == -int(SIGTERM):
# supervisord asked the program to terminate, this is fine
return
if return_code != 0:
_, stderr = self._proc.communicate()
raise RuntimeError(f'Subprocess failed ({return_code})! Stderr:\n{stderr.decode()}')
def cleanup(self):
with suppress(ProcessLookupError):
process_group_id = getpgid(self._proc.pid)
killpg(process_group_id, SIGTERM)
self._proc_stdin.close()
self._proc_stdout.close()
super().cleanup()

View File

@ -1,2 +0,0 @@
from .pipe_io_server import PipeIOServer
from .terminate_process_on_failure import terminate_process_on_failure

View File

@ -1,27 +0,0 @@
from collections import deque
from threading import Lock, Condition
class Deque:
def __init__(self):
self._queue = deque()
self._mutex = Lock()
self._not_empty = Condition(self._mutex)
def pop(self):
with self._mutex:
while not self._queue:
self._not_empty.wait()
return self._queue.pop()
def push(self, item):
self._push(item, self._queue.appendleft)
def push_front(self, item):
self._push(item, self._queue.append)
def _push(self, item, put_method):
with self._mutex:
put_method(item)
self._not_empty.notify()

View File

@ -1,16 +0,0 @@
from os import mkfifo, remove, chmod
from os.path import exists
class Pipe:
def __init__(self, path):
self.path = path
def recreate(self, permissions):
self.remove()
mkfifo(self.path)
chmod(self.path, permissions) # use chmod to ignore umask
def remove(self):
if exists(self.path):
remove(self.path)

View File

@ -1,73 +0,0 @@
from abc import ABC, abstractmethod
from threading import Thread, Event
from typing import Callable
from .pipe_reader_thread import PipeReaderThread
from .pipe_writer_thread import PipeWriterThread
from .pipe import Pipe
from .terminate_process_on_failure import terminate_process_on_failure
class PipeIOServer(ABC, Thread):
def __init__(self, in_pipe=None, out_pipe=None, permissions=0o600):
super().__init__(daemon=True)
self._in_pipe, self._out_pipe = in_pipe, out_pipe
self._create_pipes(permissions)
self._stop_event = Event()
self._reader_thread, self._writer_thread = self._create_io_threads()
self._io_threads = (self._reader_thread, self._writer_thread)
self._on_stop = lambda: None
def _create_pipes(self, permissions):
Pipe(self.in_pipe).recreate(permissions)
Pipe(self.out_pipe).recreate(permissions)
@property
def in_pipe(self):
return self._in_pipe
@property
def out_pipe(self):
return self._out_pipe
def _create_io_threads(self):
reader_thread = PipeReaderThread(self.in_pipe, self._stop_event, self.handle_message)
writer_thread = PipeWriterThread(self.out_pipe, self._stop_event)
return reader_thread, writer_thread
@abstractmethod
def handle_message(self, message):
raise NotImplementedError()
def send_message(self, message):
self._writer_thread.write(message)
@terminate_process_on_failure
def run(self):
for thread in self._io_threads:
thread.start()
self._stop_event.wait()
self._stop_threads()
def stop(self):
self._stop_event.set()
if self.is_alive():
self.join()
def _stop_threads(self):
for thread in self._io_threads:
if thread.is_alive():
thread.stop()
Pipe(self.in_pipe).remove()
Pipe(self.out_pipe).remove()
self._on_stop()
def _set_on_stop(self, value):
if not isinstance(value, Callable):
raise ValueError("Supplied object is not callable!")
self._on_stop = value
on_stop = property(fset=_set_on_stop)
def wait(self):
self._stop_event.wait()

View File

@ -1,44 +0,0 @@
from contextlib import suppress
from os import open as osopen
from os import write, close, O_WRONLY, O_NONBLOCK
from threading import Thread
from .terminate_process_on_failure import terminate_process_on_failure
class PipeReaderThread(Thread):
eof = b''
stop_sequence = b'stop_reading\n'
def __init__(self, pipe_path, stop_event, message_handler):
super().__init__(daemon=True)
self._message_handler = message_handler
self._pipe_path = pipe_path
self._stop_event = stop_event
@terminate_process_on_failure
def run(self):
with self._open() as pipe:
while True:
message = pipe.readline()
if message == self.stop_sequence:
self._stop_event.set()
break
if message == self.eof:
self._open().close()
continue
self._message_handler(message[:-1])
def _open(self):
return open(self._pipe_path, 'rb')
def stop(self):
while self.is_alive():
self._unblock()
self.join()
def _unblock(self):
with suppress(OSError):
fd = osopen(self._pipe_path, O_WRONLY | O_NONBLOCK)
write(fd, self.stop_sequence)
close(fd)

View File

@ -1,50 +0,0 @@
from contextlib import suppress
from os import O_NONBLOCK, O_RDONLY, close
from os import open as osopen
from threading import Thread
from .terminate_process_on_failure import terminate_process_on_failure
from .deque import Deque
class PipeWriterThread(Thread):
def __init__(self, pipe_path, stop_event):
super().__init__(daemon=True)
self._pipe_path = pipe_path
self._stop_event = stop_event
self._write_queue = Deque()
def write(self, message):
self._write_queue.push(message)
@terminate_process_on_failure
def run(self):
with self._open() as pipe:
while True:
message = self._write_queue.pop()
if message is None:
self._stop_event.set()
break
try:
pipe.write(message + b'\n')
pipe.flush()
except BrokenPipeError:
try: # pipe was reopened, close() flushed the message
pipe.close()
except BrokenPipeError: # close() discarded the message
self._write_queue.push_front(message)
pipe = self._open()
def _open(self):
return open(self._pipe_path, 'wb')
def stop(self):
while self.is_alive():
self._unblock()
self.join()
def _unblock(self):
with suppress(OSError):
fd = osopen(self._pipe_path, O_RDONLY | O_NONBLOCK)
self._write_queue.push_front(None)
close(fd)

View File

@ -1,15 +0,0 @@
from functools import wraps
from os import kill, getpid
from signal import SIGTERM
from traceback import print_exc
def terminate_process_on_failure(fun):
@wraps(fun)
def wrapper(*args, **kwargs):
try:
return fun(*args, **kwargs)
except: # pylint: disable=bare-except
print_exc()
kill(getpid(), SIGTERM)
return wrapper

View File

@ -1,64 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from xmlrpc.client import Fault as SupervisorFault
from tfw.event_handler_base import EventHandlerBase
from tfw.mixins.supervisor_mixin import SupervisorMixin, SupervisorLogMixin
from tfw.components.directory_monitor import with_monitor_paused
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class ProcessManager(SupervisorMixin, SupervisorLogMixin):
def __init__(self):
self.commands = {
'start': self.start_process,
'stop': self.stop_process,
'restart': self.restart_process
}
def __call__(self, command, process_name):
return self.commands[command](process_name)
class ProcessManagingEventHandler(EventHandlerBase):
"""
Event handler that can manage processes managed by supervisor.
This EventHandler accepts messages that have a data['command'] key specifying
a command to be executed.
Every message must contain a data['process_name'] field with the name of the
process to manage. This is the name specified in supervisor config files like so:
[program:someprogram]
Commands available: start, stop, restart, readlog
(the names are as self-documenting as it gets)
"""
def __init__(self, key, dirmonitor=None, log_tail=0):
super().__init__(key)
self.monitor = dirmonitor
self.processmanager = ProcessManager()
self.log_tail = log_tail
@with_monitor_paused
def handle_event(self, message):
try:
data = message['data']
try:
self.processmanager(data['command'], data['process_name'])
except SupervisorFault as fault:
message['data']['error'] = fault.faultString
finally:
message['data']['stdout'] = self.processmanager.read_stdout(
data['process_name'],
self.log_tail
)
message['data']['stderr'] = self.processmanager.read_stderr(
data['process_name'],
self.log_tail
)
return message
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -1,87 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from tfw.event_handler_base import EventHandlerBase
from tfw.components.terminado_mini_server import TerminadoMiniServer
from tfw.config import TFWENV
from tfw.config.logs import logging
from tao.config import TAOENV
LOG = logging.getLogger(__name__)
class TerminalEventHandler(EventHandlerBase):
"""
Event handler responsible for managing terminal sessions for frontend xterm
sessions to connect to. You need to instanciate this in order for frontend
terminals to work.
This EventHandler accepts messages that have a data['command'] key specifying
a command to be executed.
The API of each command is documented in their respective handler.
"""
def __init__(self, key, monitor):
"""
:param key: key this EventHandler listens to
:param monitor: tfw.components.HistoryMonitor instance to read command history from
"""
super().__init__(key)
self._historymonitor = monitor
bash_as_user_cmd = ['sudo', '-u', TAOENV.USER, 'bash']
self.terminado_server = TerminadoMiniServer(
'/terminal',
TFWENV.TERMINADO_PORT,
TFWENV.TERMINADO_WD,
bash_as_user_cmd
)
self.commands = {
'write': self.write,
'read': self.read
}
if self._historymonitor:
self._historymonitor.watch()
self.terminado_server.listen()
@property
def historymonitor(self):
return self._historymonitor
def handle_event(self, message):
LOG.debug('TerminadoEventHandler received event: %s', message)
try:
data = message['data']
message['data'] = self.commands[data['command']](data)
return message
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def write(self, data):
"""
Writes a string to the terminal session (on the pty level).
Useful for pre-typing and executing commands for the user.
:param data: TFW message data containing 'value'
(command to be written to the pty)
"""
self.terminado_server.pty.write(data['value'])
return data
def read(self, data):
"""
Reads the history of commands executed.
:param data: TFW message data containing 'count'
(the number of history elements to return)
:return dict: message with list of commands in data['history']
"""
data['count'] = int(data.get('count', 1))
if self.historymonitor:
data['history'] = self.historymonitor.history[-data['count']:]
return data
def cleanup(self):
if self.historymonitor:
self.historymonitor.stop()

View File

@ -1,4 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from .envvars import TFWENV

View File

@ -1,6 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from envvars import LazyEnvironment
TFWENV = LazyEnvironment('TFW_', 'tfwenvtuple').environment

View File

@ -1,6 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
import logging
logging.basicConfig(level=logging.DEBUG)

View File

@ -1,2 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.

View File

@ -1,101 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from functools import wraps, partial
from time import time, sleep
from tfw.decorators.lazy_property import lazy_property
class RateLimiter:
"""
Decorator class for rate limiting, blocking.
When applied to a function this decorator will apply rate limiting
if the function is invoked more frequently than rate_per_seconds.
By default rate limiting means sleeping until the next invocation time
as per __init__ parameter rate_per_seconds.
Note that this decorator BLOCKS THE THREAD it is being executed on,
so it is only acceptable for stuff running on a separate thread.
If this is no good for you please refer to AsyncRateLimiter in this module,
which is designed not to block and use the IOLoop it is being called from.
"""
def __init__(self, rate_per_second):
"""
:param rate_per_second: max frequency the decorated method should be
invoked with
"""
self.min_interval = 1 / float(rate_per_second)
self.fun = None
self.last_call = time()
def action(self, seconds_to_next_call):
if seconds_to_next_call:
sleep(seconds_to_next_call)
self.fun()
def __call__(self, fun):
@wraps(fun)
def wrapper(*args, **kwargs):
self.fun = partial(fun, *args, **kwargs)
limit_seconds = self._limit_rate()
self.action(limit_seconds)
return wrapper
def _limit_rate(self):
seconds_since_last_call = time() - self.last_call
seconds_to_next_call = self.min_interval - seconds_since_last_call
if seconds_to_next_call > 0:
return seconds_to_next_call
self.last_call = time()
return 0
class AsyncRateLimiter(RateLimiter):
"""
Decorator class for rate limiting, non-blocking.
The semantics of the rate limiting:
- unlike RateLimiter this decorator never blocks, instead it adds an async
callback version of the decorated function to the IOLoop
(to be executed after the rate limiting has expired).
- the timing works similarly to RateLimiter
"""
def __init__(self, rate_per_second, ioloop_factory):
"""
:param rate_per_second: max frequency the decorated method should be
invoked with
:param ioloop_factory: callable that should return an instance of the
IOLoop of the application
"""
self._ioloop_factory = ioloop_factory
self._ioloop = None
self._last_callback = None
self._make_action_thread_safe()
super().__init__(rate_per_second=rate_per_second)
def _make_action_thread_safe(self):
self.action = partial(self.ioloop.add_callback, self.action)
@lazy_property
def ioloop(self):
return self._ioloop_factory()
def action(self, seconds_to_next_call):
# pylint: disable=method-hidden
if self._last_callback:
self.ioloop.remove_timeout(self._last_callback)
self._last_callback = self.ioloop.call_later(
seconds_to_next_call,
self.fun_with_debounce
)
def fun_with_debounce(self):
self.last_call = time()
self.fun()

View File

@ -1,6 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from .event_handler_base import EventHandlerBase
from .boradcasting_event_handler import BroadcastingEventHandler
from .fsm_aware_event_handler import FSMAwareEventHandler

View File

@ -1,30 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from abc import ABC
from tfw.event_handler_base.event_handler_base import EventHandlerBase
from tfw.crypto import message_checksum
class BroadcastingEventHandler(EventHandlerBase, ABC):
# pylint: disable=abstract-method
"""
Abstract base class for EventHandlers which broadcast responses
and intelligently ignore their own broadcasted messages they receive.
"""
def __init__(self, key):
super().__init__(key)
self.own_message_hashes = []
def event_handler_callback(self, message):
message_hash = message_checksum(message)
if message_hash in self.own_message_hashes:
self.own_message_hashes.remove(message_hash)
return
response = self.dispatch_handling(message)
if response:
self.own_message_hashes.append(message_checksum(response))
self.server_connector.broadcast(response)

View File

@ -1,126 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from abc import ABC, abstractmethod
from inspect import currentframe
from typing import Iterable
from tfw.networking.event_handlers.server_connector import ServerConnector
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class EventHandlerBase(ABC):
"""
Abstract base class for all Python based EventHandlers. Useful implementation template
for other languages.
Derived classes must implement the handle_event() method
"""
def __init__(self, key):
self.server_connector = ServerConnector()
self.keys = []
if isinstance(key, str):
self.keys.append(key)
elif isinstance(key, Iterable):
self.keys = list(key)
self.subscribe(*self.keys)
self.server_connector.register_callback(self.event_handler_callback)
@property
def key(self):
"""
Returns the oldest key this EventHandler was subscribed to.
"""
return self.keys[0]
def event_handler_callback(self, message):
"""
Callback that is invoked when receiving a message.
Dispatches messages to handler methods and sends
a response back in case the handler returned something.
This is subscribed in __init__().
"""
if not self.check_key(message):
return
response = self.dispatch_handling(message)
if response:
self.server_connector.send(response)
def check_key(self, message):
"""
Checks whether the message is intended for this
EventHandler.
This is necessary because ZMQ handles PUB - SUB
connetions with pattern matching (e.g. someone
subscribed to 'fsm' will receive 'fsm_update'
messages as well.
"""
if '' in self.keys:
return True
return message['key'] in self.keys
def dispatch_handling(self, message):
"""
Used to dispatch messages to their specific handlers.
:param message: the message received
:returns: the message to send back
"""
return self.handle_event(message)
@abstractmethod
def handle_event(self, message):
"""
Abstract method that implements the handling of messages.
:param message: the message received
:returns: the message to send back
"""
raise NotImplementedError
def subscribe(self, *keys):
"""
Subscribe this EventHandler to receive events for given keys.
Note that you can subscribe to the same key several times in which
case you will need to unsubscribe multiple times in order to stop
receiving events.
:param keys: list of keys to subscribe to
"""
for key in keys:
self.server_connector.subscribe(key)
self.keys.append(key)
def unsubscribe(self, *keys):
"""
Unsubscribe this eventhandler from the given keys.
:param keys: list of keys to unsubscribe from
"""
for key in keys:
self.server_connector.unsubscribe(key)
self.keys.remove(key)
def cleanup(self):
"""
Perform cleanup actions such as releasing database
connections and stuff like that.
"""
pass
@classmethod
def get_local_instances(cls):
frame = currentframe()
if frame is None:
raise EnvironmentError('inspect.currentframe() is not supported!')
locals_values = frame.f_back.f_locals.values()
return {
instance for instance in locals_values
if isinstance(instance, cls)
}

View File

@ -1,24 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from abc import ABC
from tfw.event_handler_base.event_handler_base import EventHandlerBase
from tfw.networking.fsm_aware import FSMAware
class FSMAwareEventHandler(EventHandlerBase, FSMAware, ABC):
# pylint: disable=abstract-method
"""
Abstract base class for EventHandlers which automatically
keep track of the state of the TFW FSM.
"""
def __init__(self, key):
EventHandlerBase.__init__(self, key)
FSMAware.__init__(self)
self.subscribe('fsm_update')
def dispatch_handling(self, message):
if self.update_fsm_data(message):
return None
return super().dispatch_handling(message)

View File

@ -1,6 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from .fsm_base import FSMBase
from .linear_fsm import LinearFSM
from .yaml_fsm import YamlFSM

View File

@ -1,2 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.

View File

@ -1,30 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class MonitorManagerMixin:
def __init__(self, monitor_type, *monitor_args):
self._monitor_type = monitor_type
self._monitor = None
self.monitor_args = monitor_args
self.reload_monitor()
@property
def monitor(self):
return self._monitor
def set_monitor_args(self, *monitor_args):
self.monitor_args = monitor_args
def reload_monitor(self):
if self._monitor:
try:
self._monitor.stop()
except KeyError:
LOG.debug('Working directory was removed ignoring...')
self._monitor = self._monitor_type(*self.monitor_args)
self._monitor.watch() # This runs on a separate thread

View File

@ -1,20 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from watchdog.observers import Observer
from tfw.decorators.lazy_property import lazy_property
class ObserverMixin:
@lazy_property
def observer(self):
# pylint: disable=no-self-use
return Observer()
def watch(self):
self.observer.start()
def stop(self):
self.observer.stop()
self.observer.join()

View File

@ -1,6 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from .message_sender import MessageSender
from .event_handlers.server_connector import ServerUplinkConnector as TFWServerConnector
from .server.tfw_server import TFWServer

View File

@ -1,2 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.

View File

@ -1,79 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from functools import partial
import zmq
from zmq.eventloop.zmqstream import ZMQStream
from tfw.networking.zmq_connector_base import ZMQConnectorBase
from tfw.networking.serialization import serialize_tfw_msg, with_deserialize_tfw_msg
from tfw.config import TFWENV
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class ServerDownlinkConnector(ZMQConnectorBase):
def __init__(self, zmq_context=None):
super(ServerDownlinkConnector, self).__init__(zmq_context)
self._zmq_sub_socket = self._zmq_context.socket(zmq.SUB)
self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUBLISHER_PORT}')
self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket)
self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE)
self.unsubscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.UNSUBSCRIBE)
def register_callback(self, callback):
callback = with_deserialize_tfw_msg(callback)
self._zmq_sub_stream.on_recv(callback)
class ServerUplinkConnector(ZMQConnectorBase):
"""
Class capable of sending messages to the TFW server and event handlers.
"""
def __init__(self, zmq_context=None):
super(ServerUplinkConnector, self).__init__(zmq_context)
self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH)
self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.RECEIVER_PORT}')
def send_to_eventhandler(self, message):
"""
Send a message to an event handler through the TFW server.
This envelopes the desired message in the 'data' field of the message to
TFWServer, which will mirror it to event handlers.
:param message: JSON message you want to send
"""
self.send({
'key': 'mirror',
'data': message
})
def send(self, message):
"""
Send a message to the frontend through the TFW server.
:param message: JSON message you want to send
"""
self._zmq_push_socket.send_multipart(serialize_tfw_msg(message))
def broadcast(self, message):
"""
Broadast a message through the TFW server.
This envelopes the desired message in the 'data' field of the message to
TFWServer, which will broadast it.
:param message: JSON message you want to send
"""
self.send({
'key': 'broadcast',
'data': message
})
class ServerConnector(ServerUplinkConnector, ServerDownlinkConnector):
pass

View File

@ -1,46 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from tfw.crypto import KeyManager, verify_message
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class FSMAware:
"""
Base class for stuff that has to be aware of the framework FSM.
This is done by processing 'fsm_update' messages.
"""
def __init__(self):
self.fsm_state = None
self.fsm_in_accepted_state = False
self.fsm_event_log = []
self._auth_key = KeyManager().auth_key
def update_fsm_data(self, message):
if message['key'] == 'fsm_update' and verify_message(self._auth_key, message):
self._handle_fsm_update(message)
return True
return False
def _handle_fsm_update(self, message):
try:
update_data = message['data']
new_state = update_data['current_state']
if self.fsm_state != new_state:
self.handle_fsm_step(**update_data)
self.fsm_state = new_state
self.fsm_in_accepted_state = update_data['in_accepted_state']
self.fsm_event_log.append(update_data)
except KeyError:
LOG.error('Invalid fsm_update message received!')
def handle_fsm_step(self, **kwargs):
"""
Called in case the TFW FSM has stepped.
:param kwargs: fsm_update 'data' field
"""
pass

View File

@ -1,54 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from tfw.networking.event_handlers.server_connector import ServerUplinkConnector
class MessageSender:
"""
Provides mechanisms to send messages to our frontend messaging component.
"""
def __init__(self):
self.server_connector = ServerUplinkConnector()
self.key = 'message'
self.queue_key = 'queueMessages'
def send(self, originator, message):
"""
Sends a message.
:param originator: name of sender to be displayed on the frontend
:param message: message to send
"""
data = {
'originator': originator,
'message': message
}
self.server_connector.send({
'key': self.key,
'data': data
})
def queue_messages(self, originator, messages):
"""
Queues a list of messages to be displayed in a chatbot-like manner.
:param originator: name of sender to be displayed on the frontend
:param messages: list of messages to queue
"""
data = {
'messages': [
{'message': message, 'originator': originator}
for message in messages
]
}
self.server_connector.send({
'key': self.queue_key,
'data': data
})
@staticmethod
def generate_messages_from_queue(queue_message):
for message in queue_message['data']['messages']:
yield {
'key': 'message',
'data': message
}

View File

@ -1,2 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.

View File

@ -1,40 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
import zmq
from zmq.eventloop.zmqstream import ZMQStream
from tfw.networking.zmq_connector_base import ZMQConnectorBase
from tfw.networking.serialization import serialize_tfw_msg, with_deserialize_tfw_msg
from tfw.config import TFWENV
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class EventHandlerDownlinkConnector(ZMQConnectorBase):
def __init__(self, zmq_context=None):
super(EventHandlerDownlinkConnector, self).__init__(zmq_context)
self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL)
self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket)
address = f'tcp://*:{TFWENV.RECEIVER_PORT}'
self._zmq_pull_socket.bind(address)
LOG.debug('Pull socket bound to %s', address)
class EventHandlerUplinkConnector(ZMQConnectorBase):
def __init__(self, zmq_context=None):
super(EventHandlerUplinkConnector, self).__init__(zmq_context)
self._zmq_pub_socket = self._zmq_context.socket(zmq.PUB)
address = f'tcp://*:{TFWENV.PUBLISHER_PORT}'
self._zmq_pub_socket.bind(address)
LOG.debug('Pub socket bound to %s', address)
class EventHandlerConnector(EventHandlerDownlinkConnector, EventHandlerUplinkConnector):
def register_callback(self, callback):
callback = with_deserialize_tfw_msg(callback)
self._zmq_pull_stream.on_recv(callback)
def send_message(self, message: dict):
self._zmq_pub_socket.send_multipart(serialize_tfw_msg(message))

View File

@ -1,114 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from abc import ABC, abstractmethod
from contextlib import suppress
from tornado.web import Application
from tfw.networking.server.zmq_websocket_proxy import ZMQWebSocketProxy
from tfw.networking.event_handlers.server_connector import ServerUplinkConnector
from tfw.networking.server.event_handler_connector import EventHandlerConnector
from tfw.networking.message_sender import MessageSender
from tfw.networking.fsm_aware import FSMAware
from tfw.crypto import KeyManager, verify_message, sign_message
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class TFWServer(FSMAware):
"""
This class handles the proxying of messages between the frontend and event handers.
It proxies messages from the "/ws" route to all event handlers subscribed to a ZMQ
SUB socket.
"""
def __init__(self):
super().__init__()
self._event_handler_connector = EventHandlerConnector()
self._uplink_connector = ServerUplinkConnector()
self._auth_key = KeyManager().auth_key
self.application = Application([(
r'/ws', ZMQWebSocketProxy, {
'event_handler_connector': self._event_handler_connector,
'proxy_filters_and_callbacks': {
'message_handlers': [
self.handle_trigger,
self.handle_recover,
self.handle_fsm_update
],
'frontend_message_handlers': [self.save_frontend_messages]
}
}
)])
self._frontend_messages = FrontendMessageStorage()
def handle_trigger(self, message):
if 'trigger' in message:
LOG.debug('Executing handler for trigger "%s"', message.get('trigger', ''))
fsm_eh_command = {
'key': 'fsm',
'data': {
'command': 'trigger',
'value': message['trigger']
}
}
if verify_message(self._auth_key, message):
sign_message(self._auth_key, fsm_eh_command)
self._uplink_connector.send_to_eventhandler(fsm_eh_command)
def handle_recover(self, message):
if message['key'] == 'recover':
self._frontend_messages.replay_messages(self._uplink_connector)
self._frontend_messages.clear()
def handle_fsm_update(self, message):
self.update_fsm_data(message)
def save_frontend_messages(self, message):
self._frontend_messages.save_message(message)
def listen(self, port):
self.application.listen(port)
class MessageStorage(ABC):
def __init__(self):
self.saved_messages = []
def save_message(self, message):
with suppress(KeyError, AttributeError):
if self.filter_message(message):
self.saved_messages.extend(self.transform_message(message))
@abstractmethod
def filter_message(self, message):
raise NotImplementedError
def transform_message(self, message): # pylint: disable=no-self-use
yield message
def clear(self):
self.saved_messages.clear()
class FrontendMessageStorage(MessageStorage):
def filter_message(self, message):
key = message['key']
command = message.get('data', {}).get('command')
return (
key in ('message', 'dashboard', 'queueMessages')
or key == 'ide' and command in ('select', 'read')
)
def transform_message(self, message):
if message['key'] == 'queueMessages':
yield from MessageSender.generate_messages_from_queue(message)
else:
yield message
def replay_messages(self, connector):
for message in self.saved_messages:
connector.send(message)

View File

@ -1,155 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
import json
from tornado.websocket import WebSocketHandler
from tfw.mixins.callback_mixin import CallbackMixin
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class ZMQWebSocketProxy(WebSocketHandler):
# pylint: disable=abstract-method
instances = set()
sequence_number = 0
def initialize(self, **kwargs): # pylint: disable=arguments-differ
self._event_handler_connector = kwargs['event_handler_connector']
self._proxy_filters_and_callbacks = kwargs.get('proxy_filters_and_callbacks', {})
self.proxy_eventhandler_to_websocket = TFWProxy(
self.send_eventhandler_message,
self.send_websocket_message
)
self.proxy_websocket_to_eventhandler = TFWProxy(
self.send_websocket_message,
self.send_eventhandler_message
)
self.subscribe_proxy_callbacks()
def subscribe_proxy_callbacks(self):
eventhandler_message_handlers = self._proxy_filters_and_callbacks.get('eventhandler_message_handlers', [])
frontend_message_handlers = self._proxy_filters_and_callbacks.get('frontend_message_handlers', [])
message_handlers = self._proxy_filters_and_callbacks.get('message_handlers', [])
proxy_filters = self._proxy_filters_and_callbacks.get('proxy_filters', [])
self.proxy_websocket_to_eventhandler.subscribe_proxy_callbacks_and_filters(
eventhandler_message_handlers + message_handlers,
proxy_filters
)
self.proxy_eventhandler_to_websocket.subscribe_proxy_callbacks_and_filters(
frontend_message_handlers + message_handlers,
proxy_filters
)
def prepare(self):
ZMQWebSocketProxy.instances.add(self)
def on_close(self):
ZMQWebSocketProxy.instances.remove(self)
def open(self, *args, **kwargs):
LOG.debug('WebSocket connection initiated')
self._event_handler_connector.register_callback(self.eventhander_callback)
def eventhander_callback(self, message):
"""
Invoked on ZMQ messages from event handlers.
"""
self.sequence_message(message)
LOG.debug('Received on pull socket: %s', message)
self.proxy_eventhandler_to_websocket(message)
@classmethod
def sequence_message(cls, message):
cls.sequence_number += 1
message['seq'] = cls.sequence_number
def on_message(self, message):
"""
Invoked on WS messages from frontend.
"""
message = json.loads(message)
self.sequence_message(message)
LOG.debug('Received on WebSocket: %s', message)
self.proxy_websocket_to_eventhandler(message)
def send_eventhandler_message(self, message):
self._event_handler_connector.send_message(message)
@staticmethod
def send_websocket_message(message):
for instance in ZMQWebSocketProxy.instances:
instance.write_message(message)
# much secure, very cors, wow
def check_origin(self, origin):
return True
class TFWProxy:
# pylint: disable=protected-access
def __init__(self, to_source, to_destination):
self.to_source = to_source
self.to_destination = to_destination
self.proxy_filters = CallbackMixin()
self.proxy_callbacks = CallbackMixin()
self.proxy_filters.subscribe_callback(self.validate_message)
self.keyhandlers = {
'mirror': self.mirror,
'broadcast': self.broadcast
}
@staticmethod
def validate_message(message):
if 'key' not in message:
raise ValueError('Invalid TFW message format!')
def __call__(self, message):
if not self.filter_and_execute_callbacks(message):
return
if message['key'] not in self.keyhandlers:
self.to_destination(message)
else:
handler = self.keyhandlers[message['key']]
try:
handler(message)
except KeyError:
LOG.error('Invalid "%s" message format! Ignoring.', handler.__name__)
def filter_and_execute_callbacks(self, message):
try:
self.proxy_filters._execute_callbacks(message)
self.proxy_callbacks._execute_callbacks(message)
return True
except ValueError:
LOG.exception('Invalid TFW message received!')
return False
def mirror(self, message):
message = message['data']
if not self.filter_and_execute_callbacks(message):
return
LOG.debug('Mirroring message: %s', message)
self.to_source(message)
def broadcast(self, message):
message = message['data']
if not self.filter_and_execute_callbacks(message):
return
LOG.debug('Broadcasting message: %s', message)
self.to_source(message)
self.to_destination(message)
def subscribe_proxy_callbacks_and_filters(self, proxy_callbacks, proxy_filters):
self.proxy_callbacks.subscribe_callbacks(*proxy_callbacks)
self.proxy_filters.subscribe_callbacks(*proxy_filters)

View File

@ -1,9 +0,0 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
import zmq
class ZMQConnectorBase:
def __init__(self, zmq_context=None):
self._zmq_context = zmq_context or zmq.Context.instance()

4
pytest.ini Normal file
View File

@ -0,0 +1,4 @@
[pytest]
filterwarnings =
ignore::DeprecationWarning:zmq
ignore::DeprecationWarning:watchdog

View File

@ -1,9 +1,15 @@
tornado==5.1 tornado>=6.0.0,<7.0.0
pyzmq==17.1.2 pyzmq>=17.0.0,<18.0.0
transitions==0.6.6 transitions>=0.0.0,<1.0.0
terminado==0.8.1 terminado>=0.0.0,<1.0.0
watchdog==0.8.3 watchdog>=0.0.0,<1.0.0
PyYAML==3.13 PyYAML>=5.0.0,<6.0.0
Jinja2==2.10 Jinja2>=2.0.0,<3.0.0
cryptography==2.3.1 cryptography>=2.0.0,<3.0.0
python-dateutil==2.7.3 python-dateutil>=2.0.0,<3.0.0
SQLAlchemy>=1.0.0,<2.0.0
python-dateutil>=2.0.0,<3.0.0
pytest>=5.0.0,<6.0.0
pylint>=2.0.0,<3.0.0
rope>=0.0.0,<1.0.0
pipe-io-server>=1.0.0,<2.0.0

View File

@ -1,6 +1,6 @@
from os.path import dirname, realpath, join from os.path import dirname, realpath, join
from setuptools import setup, find_packages from setuptools import setup
here = dirname(realpath(__file__)) here = dirname(realpath(__file__))
@ -10,20 +10,20 @@ with open(join(here, 'requirements.txt'), 'r') as ifile:
requirements = ifile.read().splitlines() requirements = ifile.read().splitlines()
setup( setup(
name = 'tfw', name='tfw',
version = version, version=version,
description = 'Avatao tutorial-framework', description='Avatao tutorial-framework',
url = 'https://github.com/avatao-content/baseimage-tutorial-framework', url='https://github.com/avatao-content/baseimage-tutorial-framework',
author = 'Avatao.com Innovative Learning Kft.', author='Avatao.com Innovative Learning Kft.',
author_email = 'support@avatao.com', author_email='support@avatao.com',
license = 'custom', license='custom',
packages = find_packages('lib'), packages=['tfw'],
package_dir = {'': 'lib'}, package_dir={'tfw': 'tfw'},
install_requires = requirements, install_requires=requirements,
extras_require = { extras_require={
'docs': [ 'docs': [
'sphinx >= 1.7.0', 'sphinx >= 1.7.0',
], ],
}, },
zip_safe = False, zip_safe=False,
) )

View File

@ -0,0 +1,6 @@
[program:tfw_init]
user=root
directory=/tmp
command=bash tfw_init.sh
autorestart=false
startsecs=0

View File

@ -1,4 +1,4 @@
[program:tfwserver] [program:tfw_server]
user=root user=root
directory=%(ENV_TFW_SERVER_DIR)s directory=%(ENV_TFW_SERVER_DIR)s
command=python3 tfw_server.py command=python3 -u tfw_server.py

15
supervisor/tfw_init.sh Normal file
View File

@ -0,0 +1,15 @@
#!/bin/bash
set -euo pipefail
echo "export HISTFILE=\"${TFW_HISTFILE}\"" >> /tmp/bashrc &&
cat /tmp/bashrc >> "/home/${AVATAO_USER}/.bashrc"
if [[ -z "${HOTRELOAD-}" ]]; then
for dir in "${TFW_LIB_DIR}/tfw" "/etc/nginx" "/etc/supervisor"; do
chown -R root:root "${dir}" && chmod -R 700 "${dir}";
done
fi
chmod 777 "${TFW_PIPES_DIR}"
rm -f bashrc requirements.txt tfw_init.sh

View File

@ -1,9 +1,11 @@
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
from tfw.networking import TFWServer from tfw.main import TFWServer, setup_logger, setup_signal_handlers
from tfw.config import TFWENV
if __name__ == '__main__': if __name__ == '__main__':
TFWServer().listen(TFWENV.WEB_PORT) setup_logger(__file__)
TFWServer().listen()
setup_signal_handlers()
IOLoop.instance().start() IOLoop.instance().start()

0
tfw/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,6 @@
from .console_logs_handler import ConsoleLogsHandler
from .frontend_proxy_handler import FrontendProxyHandler
from .frontend_ready_handler import FrontendReadyHandler
from .message_queue_handler import MessageQueueHandler
from .message_sender import MessageSender
from .frontend_config_handler import FrontendConfigHandler

View File

@ -0,0 +1,19 @@
import logging
LOG = logging.getLogger(__name__)
class ConsoleLogsHandler:
keys = ['process.log.new']
def __init__(self, *, stream):
self.stream = stream
def handle_event(self, message, connector):
try:
connector.send_message({
'key': 'console.write',
'content': message[self.stream]
})
except KeyError:
LOG.error('Invalid %s message received: %s', self.keys, message)

View File

@ -0,0 +1 @@
from .frontend_config_handler import FrontendConfigHandler

View File

@ -0,0 +1,36 @@
from yaml import safe_load
from tfw.internals.networking import Scope
class FrontendConfigHandler:
keys = ['frontend.ready']
def __init__(self, config_path):
self._config_path = config_path
def handle_event(self, _, connector):
# pylint: disable=no-self-use
for message in self._config_messages:
connector.send_message(message)
connector.send_message({'key': 'frontend.ready'}, scope=Scope.WEBSOCKET)
@property
def _config_messages(self):
config = safe_load(self._config_str)
return [
self._create_config_message(k, v)
for k, v in config.items()
]
@property
def _config_str(self):
with open(self._config_path, 'r') as ifile:
return ifile.read()
@staticmethod
def _create_config_message(key, value):
return {
'key': f'frontend.{key}',
**value
}

View File

@ -0,0 +1,40 @@
from textwrap import dedent
from .frontend_config_handler import FrontendConfigHandler
class MockFrontendConfigHandler(FrontendConfigHandler):
@property
def _config_str(self):
return dedent('''
cat:
someConfigKey: lel
otherStuff: 42
foo:
hey: yaaay
''')
class MockConnector:
def __init__(self):
self.messages = []
def send_message(self, message, **_):
self.messages.append(message)
def test_frontend_config_handler():
connector = MockConnector()
handler = MockFrontendConfigHandler('')
handler.handle_event({'key': 'frontend.ready'}, connector)
assert connector.messages[0] == {
'key': 'frontend.cat',
'someConfigKey': 'lel',
'otherStuff': 42
}
assert connector.messages[1] == {
'key': 'frontend.foo',
'hey': 'yaaay'
}
assert connector.messages[-1] == {'key': 'frontend.ready'}

View File

@ -0,0 +1,34 @@
from tfw.internals.networking import Scope
from .message_storage import FrontendMessageStorage
class FrontendProxyHandler:
keys = ['console', 'dashboard', 'frontend', 'message', 'ide.read', 'deploy.finish']
type_id = 'ControlEventHandler'
def __init__(self):
self.connector = None
self._frontend_message_storage = FrontendMessageStorage()
def send_message(self, message):
self.connector.send_message(message, scope=Scope.WEBSOCKET)
def handle_event(self, message, _):
self._frontend_message_storage.save_message(message)
if message['key'] == 'frontend.ready':
self.recover_frontend()
return
if self._filter_message(message):
self.send_message(message)
@staticmethod
def _filter_message(message):
return not message['key'].startswith((
'ide',
'message.queue'
))
def recover_frontend(self):
for message in self._frontend_message_storage.messages:
self.send_message(message)

View File

@ -0,0 +1,39 @@
import logging
from tfw.internals.crypto import KeyManager, sign_message
LOG = logging.getLogger(__name__)
class FrontendReadyHandler:
keys = ['frontend.ready', 'fsm.update']
def __init__(self, initial_trigger):
self.connector = None
self._auth_key = KeyManager().auth_key
self.initial_trigger = initial_trigger
self.commands = {
'frontend.ready': self.handle_ready,
'fsm.update': self.handle_update
}
def handle_event(self, message, _):
try:
self.commands[message['key']]()
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def handle_ready(self):
trigger = {
'key': 'fsm.trigger',
'transition': self.initial_trigger
}
sign_message(self._auth_key, trigger)
self.connector.send_message(trigger)
def handle_update(self):
self.stop()
def stop(self):
pass

View File

@ -0,0 +1 @@
from .message_queue_handler import MessageQueueHandler

View File

@ -0,0 +1,43 @@
from time import sleep
from queue import Queue
from threading import Thread
class MessageQueueHandler:
keys = ['message.queue']
type_id = 'ControlEventHandler'
def __init__(self, wpm):
self.connector = None
self.wpm = wpm
self._queue = Queue()
self._thread = Thread(target=self._dispatch_messages)
def _dispatch_messages(self):
for message in iter(self._queue.get, None):
wpm = message['wpm'] if 'wpm' in message else self.wpm
cps = 5 * wpm / 60
sleep(len(message['message']) / cps)
self.connector.send_message(message)
def handle_event(self, message, _):
for unpacked in self._generate_messages_from_queue(message):
self._queue.put(unpacked)
@staticmethod
def _generate_messages_from_queue(queue_message):
last = queue_message['messages'][-1]
for message in queue_message['messages']:
yield {
'key': 'message.send',
'typing': message is not last,
**message
}
def start(self):
self._thread.start()
def cleanup(self):
self._queue.queue.clear()
self._queue.put(None)
self._thread.join()

View File

@ -0,0 +1,67 @@
# pylint: disable=redefined-outer-name
from math import inf
from time import sleep
from os import urandom
from random import randint
import pytest
from .message_queue_handler import MessageQueueHandler
class MockConnector:
def __init__(self):
self.callback = None
self.messages = []
def raise_event(self, message):
self.callback(message, self)
sleep(0.01)
def send_message(self, message):
self.messages.append(message)
@pytest.fixture
def handler():
connector = MockConnector()
handler = MessageQueueHandler(inf)
handler.connector = connector
connector.callback = handler.handle_event
handler.start()
yield handler
handler.cleanup()
@pytest.fixture
def queue():
yield {
'key': 'message.queue',
'messages': [
{'originator': urandom(4).hex(), 'message': urandom(16).hex()}
for _ in range(randint(5, 10))
]
}
def test_message_order(handler, queue):
handler.connector.raise_event(queue)
old_list = queue['messages']
new_list = handler.connector.messages
length = len(old_list)
assert len(new_list) == length
for i in range(length):
unpacked = new_list[i]
assert unpacked['key'] == 'message.send'
assert unpacked['originator'] == old_list[i]['originator']
assert unpacked['typing'] == (i < length-1)
def test_wpm(handler, queue):
handler.wpm = 10000
handler.connector.raise_event(queue)
assert not handler.connector.messages
handler.wpm = 100000000
handler.connector.raise_event(queue)
sleep(0.25)
assert len(handler.connector.messages) == 2*len(queue['messages'])

View File

@ -0,0 +1,30 @@
class MessageSender:
def __init__(self, uplink):
self.uplink = uplink
def send(self, message, originator=None):
message = {
'key': 'message.send',
'message': message
}
if originator:
message['originator'] = originator
self.uplink.send_message(message)
def queue_messages(self, messages, originator=None):
message_queue = {
'key': 'message.queue',
'messages': []
}
for message in messages:
next_message = {'message': message}
if originator:
next_message['originator'] = originator
message_queue['messages'].append(next_message)
self.uplink.send_message(message_queue)
def set_originator(self, originator):
self.uplink.send_message({
'key': 'message.config',
'originator': originator
})

View File

@ -0,0 +1,49 @@
from abc import ABC, abstractmethod
from contextlib import suppress
class MessageStorage(ABC):
def __init__(self):
self._messages = []
def save_message(self, message):
with suppress(KeyError, AttributeError):
if self._filter_message(message):
self._messages.extend(self._transform_message(message))
@abstractmethod
def _filter_message(self, message):
raise NotImplementedError
def _transform_message(self, message): # pylint: disable=no-self-use
yield message
def clear(self):
self._messages.clear()
@property
def messages(self):
yield from self._messages
class FrontendMessageStorage(MessageStorage):
def _filter_message(self, message):
return message['key'].startswith((
'console.write',
'message.send',
'ide.read'
))
def _transform_message(self, message):
transformations = {
'ide.read': self._delete_ide_content
}
if message['key'] in transformations:
yield from transformations[message['key']](message)
else:
yield message
@staticmethod
def _delete_ide_content(message):
del message['content']
yield message

View File

@ -0,0 +1 @@
from .fsm_handler import FSMHandler

View File

@ -0,0 +1,42 @@
import logging
from tfw.internals.crypto import KeyManager, sign_message
from tfw.internals.networking import Scope, Intent
from .fsm_updater import FSMUpdater
LOG = logging.getLogger(__name__)
class FSMHandler:
keys = ['fsm']
type_id = 'ControlEventHandler'
def __init__(self, *, fsm_type):
self.connector = None
self.fsm = fsm_type()
self._fsm_updater = FSMUpdater(self.fsm)
self.auth_key = KeyManager().auth_key
self.command_handlers = {
'fsm.trigger': self.handle_trigger,
'fsm.update' : self.handle_update
}
def handle_event(self, message, _):
try:
message = self.command_handlers[message['key']](message)
if message:
fsm_update_message = self._fsm_updater.fsm_update
sign_message(self.auth_key, fsm_update_message)
self.connector.send_message(fsm_update_message, Scope.BROADCAST, Intent.EVENT)
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def handle_trigger(self, message): # pylint: disable=inconsistent-return-statements
if self.fsm.step(message['transition']):
return message
def handle_update(self, message):
# pylint: disable=no-self-use
return message

View File

@ -0,0 +1,26 @@
class FSMUpdater:
def __init__(self, fsm):
self.fsm = fsm
@property
def fsm_update(self):
return {
'key': 'fsm.update',
**self.fsm_update_data
}
@property
def fsm_update_data(self):
valid_transitions = [
{'trigger': trigger}
for trigger in self.fsm.get_triggers(self.fsm.state)
]
if not self.fsm.event_log:
return None
last_fsm_event = self.fsm.event_log[-1]
return {
'current_state': self.fsm.state,
'valid_transitions': valid_transitions,
'in_accepted_state': self.fsm.in_accepted_state,
'last_event': last_fsm_event
}

View File

@ -0,0 +1,2 @@
from .ide_handler import IdeHandler
from .deploy_handler import DeployHandler

View File

@ -0,0 +1,34 @@
import logging
LOG = logging.getLogger(__name__)
class DeployHandler:
keys = ['deploy.start', 'process.restart']
def __init__(self, process_name='webservice'):
self.connector = None
self.process_name = process_name
self.commands = {
'deploy.start': self.handle_deploy,
'process.restart': self.handle_process
}
def handle_event(self, message, _):
try:
self.commands[message['key']](message)
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def handle_deploy(self, _):
self.connector.send_message({
'key': 'process.restart',
'name': self.process_name
})
def handle_process(self, message):
response = {'key': 'deploy.finish'}
if 'error' in message:
response['error'] = message['error']
self.connector.send_message(response)

View File

@ -0,0 +1 @@
from .file_manager import FileManager

View File

@ -0,0 +1,66 @@
import logging
from functools import wraps
from glob import glob
from fnmatch import fnmatchcase
from os.path import dirname, isdir, isfile, realpath, isabs
LOG = logging.getLogger(__name__)
def _with_is_allowed(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if self.is_allowed(args[0]):
return func(self, *args, **kwargs)
raise ValueError('Forbidden path.')
return wrapper
class FileManager: # pylint: disable=too-many-instance-attributes
def __init__(self, patterns):
self.patterns = patterns
@property
def files(self):
return list(set(
path
for pattern in self.patterns
for path in glob(pattern, recursive=True)
if isfile(path) and self.is_allowed(path)
))
@property
def parents(self):
return list(set(
self._find_directory(pattern)
for pattern in self.patterns
))
@staticmethod
def _find_directory(pattern):
while pattern and not isdir(pattern):
pattern = dirname(pattern)
return pattern
def is_allowed(self, filepath):
return any(
fnmatchcase(realpath(filepath), pattern)
for pattern in self.patterns
)
def find_file(self, filename):
if not isabs(filename):
for filepath in self.files:
if filepath.endswith(filename):
return filepath
return filename
@_with_is_allowed
def read_file(self, filepath): # pylint: disable=no-self-use
with open(filepath, 'rb', buffering=0) as ifile:
return ifile.read().decode(errors='surrogateescape')
@_with_is_allowed
def write_file(self, filepath, contents): # pylint: disable=no-self-use
with open(filepath, 'wb', buffering=0) as ofile:
ofile.write(contents.encode())

View File

@ -0,0 +1,94 @@
# pylint: disable=redefined-outer-name
from dataclasses import dataclass
from secrets import token_urlsafe
from os import mkdir, symlink
from os.path import join, realpath, basename
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from .file_manager import FileManager
@dataclass
class ManagerContext:
workdir: str
subdir: str
subfile: str
manager: FileManager
def create_random_file(self, dirname, extension):
filename = self.join(f'{dirname}/{generate_name()}{extension}')
Path(filename).touch()
return filename
def create_random_folder(self, basepath):
dirname = self.join(f'{basepath}/{generate_name()}')
mkdir(dirname)
return dirname
def create_random_link(self, source, dirname, extension):
linkname = self.join(f'{dirname}/{generate_name()}{extension}')
symlink(source, linkname)
return linkname
def join(self, path):
return join(self.workdir, path)
def generate_name():
return token_urlsafe(16)
@pytest.fixture()
def context():
with TemporaryDirectory() as workdir:
workdir = realpath(workdir) # macOS uses a symlinked TMPDIR
subdir = join(workdir, generate_name())
subfile = join(subdir, generate_name() + '.txt')
mkdir(subdir)
Path(subfile).touch()
manager = FileManager([join(workdir, '**/*.txt')])
yield ManagerContext(workdir, subdir, subfile, manager)
def test_matching_files(context):
newdir = context.create_random_folder(context.subdir)
newfile = context.create_random_file(newdir, '.txt')
newlink = context.create_random_link(newfile, newdir, '.txt')
assert set(context.manager.files) == {context.subfile, newfile, newlink}
def test_unmatching_files(context):
newtxt = context.create_random_file(context.workdir, '.txt')
newbin = context.create_random_file(context.subdir, '.bin')
context.create_random_link(newtxt, context.subdir, '.txt')
context.create_random_link(newbin, context.subdir, '.txt')
assert context.manager.files == [context.subfile]
def test_parents(context):
newdir = context.create_random_folder(context.workdir)
context.manager.patterns += [f'{newdir}/[!/@]*/**/?.c']
assert set(context.manager.parents) == {context.workdir, newdir}
def test_read_write_file(context):
for _ in range(128):
content = token_urlsafe(32)
context.manager.write_file(context.subfile, content)
assert context.manager.read_file(context.subfile) == content
with open(context.subfile, 'r') as ifile:
assert ifile.read() == content
def test_regular_ide_actions(context):
newfile1 = context.create_random_file(context.subdir, '.txt')
newfile2 = context.create_random_file(context.subdir, '.txt')
for _ in range(4):
context.manager.filename = newfile1
content1, content2 = token_urlsafe(32), token_urlsafe(32)
context.manager.write_file(newfile1, content1)
context.manager.write_file(newfile2, content2)
assert context.manager.read_file(newfile1) == content1
assert context.manager.read_file(newfile2) == content2
def test_find_file(context):
for _ in range(5):
file_abs = context.create_random_file(context.subdir, '.txt')
file_base = basename(file_abs)
assert context.manager.find_file(file_base) == file_abs

View File

@ -0,0 +1,101 @@
import logging
from tfw.internals.networking import Scope
from tfw.internals.inotify import InotifyObserver
from .file_manager import FileManager
LOG = logging.getLogger(__name__)
BUILD_ARTIFACTS = (
"*.a",
"*.class",
"*.dll",
"*.dylib",
"*.elf",
"*.exe",
"*.jar",
"*.ko",
"*.la",
"*.lib",
"*.lo",
"*.o",
"*.obj",
"*.out",
"*.py[cod]",
"*.so",
"*.so.*",
"*.tar.gz",
"*.zip",
"*__pycache__*"
)
class IdeHandler:
keys = ['ide']
type_id = 'ControlEventHandler'
def __init__(self, *, patterns, initial_file=None):
self.connector = None
self.filemanager = FileManager(patterns)
self._initial_file = initial_file or ''
self.monitor = InotifyObserver(
path=self.filemanager.parents,
exclude=BUILD_ARTIFACTS
)
self.monitor.on_modified = self._reload_frontend
self.monitor.start()
self.commands = {
'ide.read' : self.read,
'ide.write' : self.write
}
def _reload_frontend(self, event): # pylint: disable=unused-argument
self.send_message({'key': 'ide.reload'})
@property
def initial_file(self):
if not self.filemanager.is_allowed(self._initial_file):
self._initial_file = self.filemanager.files[0]
return self._initial_file
def send_message(self, message):
self.connector.send_message(message, scope=Scope.WEBSOCKET)
def handle_event(self, message, _):
try:
self.commands[message['key']](message)
message['files'] = self.filemanager.files
self.send_message(message)
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def read(self, message):
if 'patterns' in message:
self.filemanager.patterns = message['patterns']
try:
message['filename'] = self.filemanager.find_file(
message.get('filename') or self.initial_file
)
message['content'] = self.filemanager.read_file(message['filename'])
except (PermissionError, ValueError):
message['content'] = 'You have no permission to open that file :('
except FileNotFoundError:
message['content'] = 'This file does not exist :('
except Exception: # pylint: disable=broad-except
message['content'] = 'Failed to read file :('
LOG.exception('Error reading file!')
def write(self, message):
try:
self.filemanager.write_file(message['filename'], message['content'])
except KeyError:
LOG.error('You must provide a filename to write!')
except Exception: # pylint: disable=broad-except
LOG.exception('Error writing file!')
del message['content']
def cleanup(self):
self.monitor.stop()

View File

@ -0,0 +1,3 @@
from .pipe_io_handler import PipeIOHandler, PipeIOHandlerBase
from .command_handler import CommandHandler
from .pipe_connector import ProxyPipeConnectorHandler

View File

@ -0,0 +1,58 @@
from subprocess import PIPE, Popen
from functools import partial
from os import getpgid, killpg
from os.path import join
from signal import SIGTERM
from secrets import token_urlsafe
from threading import Thread
from contextlib import suppress
from pipe_io_server import terminate_process_on_failure
from .pipe_io_handler import PipeIOHandler, DEFAULT_PERMISSIONS
class CommandHandler(PipeIOHandler):
def __init__(self, command, permissions=DEFAULT_PERMISSIONS):
super().__init__(
self._generate_tempfilename(),
self._generate_tempfilename(),
permissions
)
self._proc_stdin = open(self.pipe_io.out_pipe, 'rb')
self._proc_stdout = open(self.pipe_io.in_pipe, 'wb')
self._proc = Popen(
command, shell=True, executable='/bin/bash',
stdin=self._proc_stdin, stdout=self._proc_stdout, stderr=PIPE,
start_new_session=True
)
self._monitor_proc_thread = self._start_monitor_proc()
def _generate_tempfilename(self):
# pylint: disable=no-self-use
random_filename = partial(token_urlsafe, 10)
return join('/tmp', f'{type(self).__name__}.{random_filename()}')
def _start_monitor_proc(self):
thread = Thread(target=self._monitor_proc, daemon=True)
thread.start()
return thread
@terminate_process_on_failure
def _monitor_proc(self):
return_code = self._proc.wait()
if return_code == -int(SIGTERM):
# supervisord asked the program to terminate, this is fine
return
if return_code != 0:
_, stderr = self._proc.communicate()
raise RuntimeError(f'Subprocess failed ({return_code})! Stderr:\n{stderr.decode()}')
def cleanup(self):
with suppress(ProcessLookupError):
process_group_id = getpgid(self._proc.pid)
killpg(process_group_id, SIGTERM)
self._proc_stdin.close()
self._proc_stdout.close()
super().cleanup()

View File

@ -0,0 +1 @@
from .proxy_pipe_connector_handler import ProxyPipeConnectorHandler

View File

@ -0,0 +1,90 @@
import logging
from stat import S_ISFIFO
from os import access, listdir, mkdir, stat, R_OK
from os.path import exists, join
from pipe_io_server import PipeWriterServer
from tfw.internals.inotify import InotifyObserver, InotifyFileCreatedEvent, InotifyFileDeletedEvent
LOG = logging.getLogger(__name__)
class PipeConnector:
reader_pattern, writer_pattern = 'send', 'recv'
def __init__(self, path):
self.recv_pipes, self.send_pipes = {}, {}
self.observer = self._build_observer(path)
self.observer.on_any_event = self._on_any_event
self.observer.start()
self._connect_existing_pipes(path)
def _build_observer(self, path): # pylint: disable=no-self-use
if not exists(path):
mkdir(path)
if not access(path, R_OK):
raise ValueError('Path does not exist or is not accessible.')
return InotifyObserver(path, patterns=[f'*{self.reader_pattern}*', f'*{self.writer_pattern}*'])
def _connect_existing_pipes(self, path):
for node in listdir(path):
node_path = join(path, node)
if self._is_pipe(node_path):
self._create_pipe(node_path)
LOG.debug('Connected to existing pipe "%s"', node_path)
def _on_any_event(self, event):
path = event.src_path
if self._is_pipe(path) and isinstance(event, InotifyFileCreatedEvent):
self._create_pipe(path)
LOG.debug('Connected to new pipe "%s"', path)
elif isinstance(event, InotifyFileDeletedEvent):
self._delete_pipe(path)
LOG.debug('Disconnected from deleted pipe "%s"', path)
@staticmethod
def _is_pipe(path):
return exists(path) and S_ISFIFO(stat(path).st_mode)
def _create_pipe(self, path):
if self._find_pipe(path):
return
server = None
if self.writer_pattern in path:
pipes, server = self.recv_pipes, self.build_writer(path)
elif self.reader_pattern in path:
pipes, server = self.send_pipes, self.build_reader(path)
if server:
server.start()
pipes[path] = server
def _find_pipe(self, path):
pipes = None
if path in self.recv_pipes.keys():
pipes = self.recv_pipes
if path in self.send_pipes.keys():
pipes = self.send_pipes
return pipes
def build_reader(self, path): # pylint: disable=no-self-use
raise NotImplementedError()
def build_writer(self, path): # pylint: disable=no-self-use
return PipeWriterServer(path, manage_pipes=False)
def _delete_pipe(self, path):
pipes = self._find_pipe(path)
if pipes:
pipes[path].stop()
del pipes[path]
def broadcast(self, message):
for server in self.recv_pipes.values():
server.send_message(message)
def stop(self):
for pipe in self.recv_pipes.values():
pipe.stop()
for pipe in self.send_pipes.values():
pipe.stop()

View File

@ -0,0 +1,46 @@
import logging
from threading import Lock
from json import dumps, loads, JSONDecodeError
from pipe_io_server import PipeReaderServer
from .pipe_connector import PipeConnector
LOG = logging.getLogger(__name__)
class ProxyPipeConnectorHandler:
keys = ['']
def __init__(self, path):
self.connector, self.pipes = None, None
self.path = path
def start(self):
self.pipes = ProxyPipeConnector(self.path, self.connector)
def handle_event(self, message, _):
self.pipes.broadcast(dumps(message).encode())
def cleanup(self):
self.pipes.stop()
class ProxyPipeConnector(PipeConnector):
def __init__(self, path, connector):
self.connector = connector
self.mutex = Lock()
super().__init__(path)
def build_reader(self, path):
reader = PipeReaderServer(path, manage_pipes=False)
reader.handle_message = self._handle_message
return reader
def _handle_message(self, message):
try:
json_object = loads(message)
with self.mutex:
self.connector.send_message(json_object)
except JSONDecodeError:
LOG.error('Received invalid JSON message: %s', message)

View File

@ -0,0 +1,157 @@
# pylint: disable=redefined-outer-name
from enum import Enum
from dataclasses import dataclass
from json import dumps
from secrets import token_urlsafe
from os import urandom, mkfifo, mkdir, remove
from os.path import join
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from tfw.internals.inotify import InotifyFileCreatedEvent, InotifyFileDeletedEvent
from .proxy_pipe_connector_handler import ProxyPipeConnector
class Action(Enum):
SEND = 'send'
RECV = 'recv'
@dataclass
class PipeContext:
workdir: str
pipes: ProxyPipeConnector
def emit_pipe_creation_event(self, action, inode_creator):
filename = self.join(f'{self.generate_name()}_{action.value}')
inode_creator(filename)
self.pipes.observer.on_any_event(InotifyFileCreatedEvent(filename))
return filename
def join(self, path):
return join(self.workdir, path)
@staticmethod
def generate_name():
return urandom(4).hex()
class MockPipeConnector(ProxyPipeConnector):
def __init__(self, path, connector):
self.reader_events, self.writer_events = [], []
super().__init__(path, connector)
def _build_observer(self, path):
return MockObserver()
def build_reader(self, path):
self.reader_events.append(path)
reader = MockPipeServer()
reader.handle_message = self._handle_message
return reader
def build_writer(self, path):
self.writer_events.append(path)
return MockPipeServer()
class MockObserver:
def start(self):
pass
def on_any_event(self, event):
pass
class MockPipeServer:
def __init__(self):
self.messages = []
def handle_message(self, message):
pass
def send_message(self, message):
self.messages.append(message)
def start(self):
pass
def stop(self):
pass
class MockConnector: # pylint: disable=too-few-public-methods
def __init__(self):
self.messages = []
def send_message(self, message):
self.messages.append(message)
@pytest.fixture
def workdir():
with TemporaryDirectory() as workdir:
yield workdir
@pytest.fixture
def context(workdir):
mkfifo(join(workdir, Action.SEND.value))
mkfifo(join(workdir, Action.RECV.value))
yield PipeContext(workdir, MockPipeConnector(workdir, MockConnector()))
def test_existing_pipe_connection(context):
assert join(context.workdir, Action.SEND.value) in context.pipes.send_pipes.keys()
assert join(context.workdir, Action.RECV.value) in context.pipes.recv_pipes.keys()
def test_pipe_creation_deletion(context):
cases = [
(Action.RECV, context.pipes.recv_pipes, context.pipes.writer_events),
(Action.SEND, context.pipes.send_pipes, context.pipes.reader_events)
]
for action, pipes, events in cases:
path = context.emit_pipe_creation_event(action, mkfifo)
assert events[-1] == path
assert path in pipes.keys()
remove(path)
context.pipes.observer.on_any_event(InotifyFileDeletedEvent(path))
assert path not in pipes.keys()
def test_handle_message(context):
path = context.emit_pipe_creation_event(Action.SEND, mkfifo)
payload = {'key': token_urlsafe(16)}
context.pipes.send_pipes[path].handle_message(dumps(payload))
assert context.pipes.connector.messages[-1] == payload
context.pipes.send_pipes[path].handle_message(token_urlsafe(32))
assert len(context.pipes.connector.messages) == 1
def test_broadcast(context):
paths = [
context.emit_pipe_creation_event(Action.RECV, mkfifo)
for _ in range(32)
]
payload = {'key': token_urlsafe(16)}
context.pipes.broadcast(payload)
for path in paths:
assert context.pipes.recv_pipes[path].messages[-1] == payload
def test_inode_types(context):
touch = lambda path: Path(path).touch()
cases = [
(Action.RECV, context.pipes.recv_pipes, mkdir),
(Action.SEND, context.pipes.send_pipes, mkdir),
(Action.RECV, context.pipes.recv_pipes, touch),
(Action.SEND, context.pipes.send_pipes, touch)
]
for action, pipes, creator in cases:
path = context.emit_pipe_creation_event(action, creator)
assert path not in pipes.keys()

View File

@ -0,0 +1,47 @@
import logging
from abc import abstractmethod
from json import loads, dumps
from pipe_io_server import PipeIOServer
LOG = logging.getLogger(__name__)
DEFAULT_PERMISSIONS = 0o600
class PipeIOHandlerBase:
keys = ['']
def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS):
self.connector = None
self.in_pipe = in_pipe_path
self.out_pipe = out_pipe_path
self.pipe_io = PipeIOServer(
in_pipe_path,
out_pipe_path,
permissions
)
self.pipe_io.handle_message = self._server_handle_message
self.pipe_io.start()
def _server_handle_message(self, message):
try:
self.handle_pipe_event(message)
except: # pylint: disable=bare-except
LOG.exception('Failed to handle message %s from pipe %s!', message, self.in_pipe)
@abstractmethod
def handle_pipe_event(self, message_bytes):
raise NotImplementedError()
def cleanup(self):
self.pipe_io.stop()
class PipeIOHandler(PipeIOHandlerBase):
def handle_event(self, message, _):
json_bytes = dumps(message).encode()
self.pipe_io.send_message(json_bytes)
def handle_pipe_event(self, message_bytes):
json = loads(message_bytes)
self.connector.send_message(json)

View File

@ -0,0 +1,45 @@
# pylint: disable=redefined-outer-name
from tempfile import TemporaryDirectory
from os.path import join
from queue import Queue
from json import dumps, loads
import pytest
from .pipe_io_handler import PipeIOHandler
@pytest.fixture
def pipe_io():
with TemporaryDirectory() as tmpdir:
pipeio = PipeIOHandler(
join(tmpdir, 'in'),
join(tmpdir, 'out'),
permissions=0o600
)
pipeio.connector = MockConnector()
yield pipeio
pipeio.cleanup()
class MockConnector:
def __init__(self):
self.messages = Queue()
def send_message(self, msg):
self.messages.put(msg)
def test_pipe_io_handler_recv(pipe_io):
test_msg = {'key': 'cica'}
with open(pipe_io.in_pipe, 'w') as ofile:
ofile.write(dumps(test_msg) + '\n')
assert pipe_io.connector.messages.get() == test_msg
def test_pipe_io_handler_send(pipe_io):
test_msg = {'key': 'cica'}
pipe_io.handle_event(test_msg, None)
with open(pipe_io.out_pipe, 'r') as ifile:
assert loads(ifile.readline()) == test_msg

View File

@ -0,0 +1,2 @@
from .process_handler import ProcessHandler
from .process_log_handler import ProcessLogHandler

View File

@ -0,0 +1,34 @@
from tfw.internals.networking import Scope, Intent
from tfw.internals.inotify import InotifyObserver
from .supervisor import ProcessLogManager
class LogInotifyObserver(InotifyObserver, ProcessLogManager):
def __init__(self, connector, process_name, supervisor_uri, log_tail=0):
self._connector = connector
self._process_name = process_name
self.log_tail = log_tail
self._procinfo = None
ProcessLogManager.__init__(self, supervisor_uri)
InotifyObserver.__init__(self, self._get_logfiles())
def _get_logfiles(self):
self._procinfo = self.supervisor.getProcessInfo(self._process_name)
return self._procinfo['stdout_logfile'], self._procinfo['stderr_logfile']
@property
def process_name(self):
return self._process_name
@process_name.setter
def process_name(self, process_name):
self._process_name = process_name
self.paths = self._get_logfiles()
def on_modified(self, event):
self._connector.send_message({
'key': 'process.log.new',
'stdout': self.read_stdout(self.process_name, tail=self.log_tail),
'stderr': self.read_stderr(self.process_name, tail=self.log_tail)
}, Scope.BROADCAST, Intent.EVENT)

View File

@ -0,0 +1,33 @@
import logging
from xmlrpc.client import Fault as SupervisorFault
from tfw.internals.networking import Scope, Intent
from .supervisor import ProcessManager
LOG = logging.getLogger(__name__)
class ProcessHandler(ProcessManager):
keys = ['process']
type_id = 'ControlEventHandler'
def __init__(self, *, supervisor_uri):
ProcessManager.__init__(self, supervisor_uri)
self.commands = {
'process.start': self.start_process,
'process.stop': self.stop_process,
'process.restart': self.restart_process
}
def handle_event(self, message, connector):
try:
try:
self.commands[message['key']](message['name'])
except SupervisorFault as fault:
message['error'] = fault.faultString
connector.send_message(message, scope=Scope.BROADCAST, intent=Intent.EVENT)
except KeyError:
if not message['key'].startswith('process.log'):
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -0,0 +1,44 @@
import logging
from .log_inotify_observer import LogInotifyObserver
LOG = logging.getLogger(__name__)
class ProcessLogHandler:
keys = ['process.log']
type_id = 'ControlEventHandler'
def __init__(self, *, process_name, supervisor_uri, log_tail=0):
self.connector, self._monitor = None, None
self.process_name = process_name
self._supervisor_uri = supervisor_uri
self._initial_log_tail = log_tail
self.command_handlers = {
'process.log.set': self.handle_set
}
def start(self):
self._monitor = LogInotifyObserver(
connector=self.connector,
process_name=self.process_name,
supervisor_uri=self._supervisor_uri,
log_tail=self._initial_log_tail
)
self._monitor.start()
def handle_event(self, message, _):
try:
self.command_handlers[message['key']](message)
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def handle_set(self, data):
if data.get('name'):
self._monitor.process_name = data['name']
if data.get('tail'):
self._monitor.log_tail = data['tail']
def cleanup(self):
self._monitor.stop()

View File

@ -1,23 +1,15 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft. from os import remove
# All Rights Reserved. See LICENSE file for details. from contextlib import suppress
import xmlrpc.client import xmlrpc.client
from xmlrpc.client import Fault as SupervisorFault from xmlrpc.client import Fault as SupervisorFault
from contextlib import suppress
from os import remove
from tfw.decorators.lazy_property import lazy_property
from tfw.config import TFWENV
class SupervisorBaseMixin: class SupervisorBase:
@lazy_property def __init__(self, supervisor_uri):
def supervisor(self): self.supervisor = xmlrpc.client.ServerProxy(supervisor_uri).supervisor
# pylint: disable=no-self-use
return xmlrpc.client.ServerProxy(TFWENV.SUPERVISOR_HTTP_URI).supervisor
class SupervisorMixin(SupervisorBaseMixin): class ProcessManager(SupervisorBase):
def stop_process(self, process_name): def stop_process(self, process_name):
with suppress(SupervisorFault): with suppress(SupervisorFault):
self.supervisor.stopProcess(process_name) self.supervisor.stopProcess(process_name)
@ -30,7 +22,7 @@ class SupervisorMixin(SupervisorBaseMixin):
self.start_process(process_name) self.start_process(process_name)
class SupervisorLogMixin(SupervisorBaseMixin): class ProcessLogManager(SupervisorBase):
def read_stdout(self, process_name, tail=0): def read_stdout(self, process_name, tail=0):
return self.supervisor.readProcessStdoutLog(process_name, -tail, 0) return self.supervisor.readProcessStdoutLog(process_name, -tail, 0)

View File

@ -0,0 +1 @@
from .snapshot_handler import SnapshotHandler

View File

@ -1,6 +1,4 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft. import logging
# All Rights Reserved. See LICENSE file for details.
from os.path import join as joinpath from os.path import join as joinpath
from os.path import basename from os.path import basename
from os import makedirs from os import makedirs
@ -8,25 +6,24 @@ from datetime import datetime
from dateutil import parser as dateparser from dateutil import parser as dateparser
from tfw.event_handler_base import EventHandlerBase from .snapshot_provider import SnapshotProvider
from tfw.components.snapshot_provider import SnapshotProvider
from tfw.config import TFWENV
from tfw.config.logs import logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class DirectorySnapshottingEventHandler(EventHandlerBase): class SnapshotHandler:
def __init__(self, key, directories, exclude_unix_patterns=None): keys = ['snapshot']
super().__init__(key)
def __init__(self, *, directories, snapshots_dir, exclude_unix_patterns=None):
self._snapshots_dir = snapshots_dir
self.snapshot_providers = {} self.snapshot_providers = {}
self._exclude_unix_patterns = exclude_unix_patterns self._exclude_unix_patterns = exclude_unix_patterns
self.init_snapshot_providers(directories) self.init_snapshot_providers(directories)
self.command_handlers = { self.command_handlers = {
'take_snapshot': self.handle_take_snapshot, 'snapshot.take': self.handle_take_snapshot,
'restore_snapshot': self.handle_restore_snapshot, 'snapshot.restore': self.handle_restore_snapshot,
'exclude': self.handle_exclude 'snapshot.exclude': self.handle_exclude
} }
def init_snapshot_providers(self, directories): def init_snapshot_providers(self, directories):
@ -38,32 +35,28 @@ class DirectorySnapshottingEventHandler(EventHandlerBase):
self._exclude_unix_patterns self._exclude_unix_patterns
) )
@staticmethod def init_git_dir(self, index, directory):
def init_git_dir(index, directory):
git_dir = joinpath( git_dir = joinpath(
TFWENV.SNAPSHOTS_DIR, self._snapshots_dir,
f'{basename(directory)}-{index}' f'{basename(directory)}-{index}'
) )
makedirs(git_dir, exist_ok=True) makedirs(git_dir, exist_ok=True)
return git_dir return git_dir
def handle_event(self, message): def handle_event(self, message, _):
try: try:
data = message['data'] self.command_handlers[message['key']](message)
message['data'] = self.command_handlers[data['command']](data)
return message
except KeyError: except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def handle_take_snapshot(self, data): def handle_take_snapshot(self, _):
LOG.debug('Taking snapshots of directories %s', self.snapshot_providers.keys()) LOG.debug('Taking snapshots of directories %s', self.snapshot_providers.keys())
for provider in self.snapshot_providers.values(): for provider in self.snapshot_providers.values():
provider.take_snapshot() provider.take_snapshot()
return data
def handle_restore_snapshot(self, data): def handle_restore_snapshot(self, message):
date = dateparser.parse( date = dateparser.parse(
data.get( message.get(
'value', 'value',
datetime.now().isoformat() datetime.now().isoformat()
) )
@ -75,13 +68,11 @@ class DirectorySnapshottingEventHandler(EventHandlerBase):
) )
for provider in self.snapshot_providers.values(): for provider in self.snapshot_providers.values():
provider.restore_snapshot(date) provider.restore_snapshot(date)
return data
def handle_exclude(self, data): def handle_exclude(self, message):
exclude_unix_patterns = data['value'] exclude_unix_patterns = message['value']
if not isinstance(exclude_unix_patterns, list): if not isinstance(exclude_unix_patterns, list):
raise KeyError raise KeyError
for provider in self.snapshot_providers.values(): for provider in self.snapshot_providers.values():
provider.exclude = exclude_unix_patterns provider.exclude = exclude_unix_patterns
return data

View File

@ -1,6 +1,3 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
import re import re
from subprocess import run, CalledProcessError, PIPE from subprocess import run, CalledProcessError, PIPE
from getpass import getuser from getpass import getuser

View File

@ -0,0 +1,3 @@
from .terminal_handler import TerminalHandler
from .terminal_commands_handler import TerminalCommandsHandler
from .commands_equal import CommandsEqual

View File

@ -1,10 +1,7 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from shlex import split from shlex import split
from re import search from re import search
from tfw.decorators.lazy_property import lazy_property from tfw.internals.lazy import lazy_property
class CommandsEqual: class CommandsEqual:

View File

@ -1,30 +1,12 @@
# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from os.path import dirname
from re import findall from re import findall
from re import compile as compileregex from re import compile as compileregex
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from watchdog.events import PatternMatchingEventHandler from tfw.internals.networking import Intent
from tfw.internals.inotify import InotifyObserver
from tfw.mixins.callback_mixin import CallbackMixin
from tfw.mixins.observer_mixin import ObserverMixin
from tfw.decorators.rate_limiter import RateLimiter
class CallbackEventHandler(PatternMatchingEventHandler, ABC): class HistoryMonitor(ABC, InotifyObserver):
def __init__(self, files, *callbacks):
super().__init__(files)
self.callbacks = callbacks
@RateLimiter(rate_per_second=2)
def on_modified(self, event):
for callback in self.callbacks:
callback()
class HistoryMonitor(CallbackMixin, ObserverMixin, ABC):
""" """
Abstract class capable of monitoring and parsing a history file such as Abstract class capable of monitoring and parsing a history file such as
bash HISTFILEs. Monitoring means detecting when the file was changed and bash HISTFILEs. Monitoring means detecting when the file was changed and
@ -36,29 +18,30 @@ class HistoryMonitor(CallbackMixin, ObserverMixin, ABC):
command pattern property and optionally the sanitize_command method. command pattern property and optionally the sanitize_command method.
See examples below. See examples below.
""" """
def __init__(self, histfile): def __init__(self, uplink, histfile):
self.histfile = histfile self.histfile = histfile
self._history = [] self.history = []
self._last_length = len(self._history) self._last_length = len(self.history)
self.observer.schedule( self.uplink = uplink
CallbackEventHandler( super().__init__(self.histfile)
[self.histfile],
self._fetch_history,
self._invoke_callbacks
),
dirname(self.histfile)
)
@property @property
def history(self): @abstractmethod
return self._history def domain(self):
raise NotImplementedError()
def on_modified(self, event):
self._fetch_history()
if self._last_length < len(self.history):
for command in self.history[self._last_length:]:
self.send_message(command)
def _fetch_history(self): def _fetch_history(self):
self._last_length = len(self._history) self._last_length = len(self.history)
with open(self.histfile, 'r') as ifile: with open(self.histfile, 'r') as ifile:
pattern = compileregex(self.command_pattern) pattern = compileregex(self.command_pattern)
data = ifile.read() data = ifile.read()
self._history = [ self.history = [
self.sanitize_command(command) self.sanitize_command(command)
for command in findall(pattern, data) for command in findall(pattern, data)
] ]
@ -72,9 +55,11 @@ class HistoryMonitor(CallbackMixin, ObserverMixin, ABC):
# pylint: disable=no-self-use # pylint: disable=no-self-use
return command return command
def _invoke_callbacks(self): def send_message(self, command):
if self._last_length < len(self._history): self.uplink.send_message({
self._execute_callbacks(self.history) 'key': f'history.{self.domain}',
'command': command
}, intent=Intent.EVENT)
class BashMonitor(HistoryMonitor): class BashMonitor(HistoryMonitor):
@ -87,6 +72,10 @@ class BashMonitor(HistoryMonitor):
shopt -s histappend shopt -s histappend
unset HISTCONTROL unset HISTCONTROL
""" """
@property
def domain(self):
return 'bash'
@property @property
def command_pattern(self): def command_pattern(self):
return r'.+' return r'.+'
@ -100,6 +89,10 @@ class GDBMonitor(HistoryMonitor):
HistoryMonitor to monitor GDB sessions. HistoryMonitor to monitor GDB sessions.
For this to work "set trace-commands on" must be set in GDB. For this to work "set trace-commands on" must be set in GDB.
""" """
@property
def domain(self):
return 'gdb'
@property @property
def command_pattern(self): def command_pattern(self):
return r'(?<=\n)\+(.+)\n' return r'(?<=\n)\+(.+)\n'

Some files were not shown because too many files have changed in this diff Show More