Merge branch 'eventhandler-rework' into chausie

This commit is contained in:
Kristóf Tóth 2019-07-25 12:10:43 +02:00
commit 6069ea5089
93 changed files with 608 additions and 526 deletions

View File

@ -54,10 +54,10 @@ COPY supervisor/components/ ${TFW_SUPERVISORD_COMPONENTS}
COPY nginx/nginx.conf ${TFW_NGINX_CONF} COPY nginx/nginx.conf ${TFW_NGINX_CONF}
COPY nginx/default.conf ${TFW_NGINX_DEFAULT} COPY nginx/default.conf ${TFW_NGINX_DEFAULT}
COPY nginx/components/ ${TFW_NGINX_COMPONENTS} COPY nginx/components/ ${TFW_NGINX_COMPONENTS}
COPY lib ${TFW_LIB_DIR}/ COPY tfw ${TFW_LIB_DIR}/tfw
COPY supervisor/tfw_server.py ${TFW_SERVER_DIR}/ COPY supervisor/tfw_server.py ${TFW_SERVER_DIR}/
RUN for dir in "${TFW_LIB_DIR}"/{tfw,tao,envvars} "/etc/nginx" "/etc/supervisor"; do \ RUN for dir in "${TFW_LIB_DIR}"/tfw "/etc/nginx" "/etc/supervisor"; do \
chown -R root:root "$dir" && chmod -R 700 "$dir"; \ chown -R root:root "$dir" && chmod -R 700 "$dir"; \
done done
@ -70,7 +70,7 @@ ONBUILD COPY ${BUILD_CONTEXT}/supervisor/ ${TFW_SUPERVISORD_COMPONENTS}
ONBUILD RUN for f in "${TFW_NGINX_DEFAULT}" ${TFW_NGINX_COMPONENTS}/*.conf; do \ ONBUILD RUN for f in "${TFW_NGINX_DEFAULT}" ${TFW_NGINX_COMPONENTS}/*.conf; do \
envsubst "$(printenv | cut -d= -f1 | grep TFW_ | sed -e 's/^/$/g')" < $f > $f~ && mv $f~ $f ;\ envsubst "$(printenv | cut -d= -f1 | grep TFW_ | sed -e 's/^/$/g')" < $f > $f~ && mv $f~ $f ;\
done done
ONBUILD VOLUME ["/etc/nginx", "/var/lib/nginx", "/var/log/nginx", "${TFW_LIB_DIR}/envvars", "${TFW_LIB_DIR}/tfw"] ONBUILD VOLUME ["/etc/nginx", "/var/lib/nginx", "/var/log/nginx", "${TFW_LIB_DIR}/tfw"]
ONBUILD COPY ${BUILD_CONTEXT}/frontend /data/ ONBUILD COPY ${BUILD_CONTEXT}/frontend /data/
ONBUILD RUN test -z "${NOFRONTEND}" && cd /data && yarn install --frozen-lockfile || : ONBUILD RUN test -z "${NOFRONTEND}" && cd /data && yarn install --frozen-lockfile || :

View File

@ -1 +0,0 @@
from .envvars import TAOENV

View File

@ -1,3 +0,0 @@
from envvars import LazyEnvironment
TAOENV = LazyEnvironment('AVATAO_', 'taoenvtuple').environment

View File

@ -1,13 +0,0 @@
from .directory_snapshotting_event_handler import DirectorySnapshottingEventHandler
from .event_handler import EventHandler
from .frontend_event_handler import FrontendEventHandler
from .fsm_aware_event_handler import FSMAwareEventHandler
from .fsm_managing_event_handler import FSMManagingEventHandler
from .ide_event_handler import IdeEventHandler
from .log_monitoring_event_handler import LogMonitoringEventHandler
from .pipe_io_event_handler import PipeIOEventHandlerBase, PipeIOEventHandler
from .pipe_io_event_handler import TransformerPipeIOEventHandler, CommandEventHandler
from .process_managing_event_handler import ProcessManagingEventHandler
from .terminal_commands_event_handler import TerminalCommandsEventHandler
from .terminal_event_handler import TerminalEventHandler
from .tfw_server_connector import TFWServerUplinkConnector, TFWServerConnector

View File

@ -1,9 +0,0 @@
from tfw.event_handlers import EventHandlerBase
from .tfw_server_connector import TFWServerConnector
class EventHandler(EventHandlerBase):
# pylint: disable=abstract-method
def _build_server_connector(self):
return TFWServerConnector()

View File

@ -1,21 +0,0 @@
from tfw.networking import Scope
from tfw.components import FrontendMessageStorage
from .event_handler import EventHandler
class FrontendEventHandler(EventHandler):
def __init__(self):
frontend_keys = ('message', 'queueMessages', 'dashboard', 'console')
self._frontend_message_storage = FrontendMessageStorage(frontend_keys)
super().__init__((*frontend_keys, 'recover'), scope=Scope.WEBSOCKET)
def handle_event(self, message):
self._frontend_message_storage.save_message(message)
if message['key'] == 'recover':
self.recover_frontend()
self.send_message(message)
def recover_frontend(self):
for message in self._frontend_message_storage.messages:
self.send_message(message)

View File

@ -1,20 +0,0 @@
from tfw.components import FSMAware
from tfw.networking import Scope
from .event_handler import EventHandler
class FSMAwareEventHandler(EventHandler, FSMAware):
# pylint: disable=abstract-method
"""
Abstract base class for EventHandlers which automatically
keep track of the state of the TFW FSM.
"""
def __init__(self, key, scope=Scope.ZMQ):
EventHandler.__init__(self, key, scope=scope)
FSMAware.__init__(self)
self.subscribe('fsm_update')
def dispatch_handling(self, message):
if not self.refresh_on_fsm_update(message):
super().dispatch_handling(message)

View File

@ -1,14 +0,0 @@
from tfw.components import TerminalCommands
from tfw.networking import Scope
from .event_handler import EventHandler
class TerminalCommandsEventHandler(EventHandler, TerminalCommands):
def __init__(self, key, scope=Scope.ZMQ, bashrc=None):
EventHandler.__init__(self, key, scope)
TerminalCommands.__init__(self, bashrc)
def handle_event(self, message):
command = message['value']
self.callback(command)

View File

@ -1,12 +0,0 @@
from .commands_equal import CommandsEqual
from .file_manager import FileManager
from .fsm_aware import FSMAware
from .fsm_updater import FSMUpdater
from .history_monitor import BashMonitor, GDBMonitor
from .log_inotify_observer import LogInotifyObserver
from .message_sender import MessageSender
from .message_storage import FrontendMessageStorage
from .snapshot_provider import SnapshotProvider
from .supervisor import ProcessManager, LogManager
from .terminado_mini_server import TerminadoMiniServer
from .terminal_commands import TerminalCommands

View File

@ -1,42 +0,0 @@
import logging
from tfw.crypto import KeyManager, verify_message
LOG = logging.getLogger(__name__)
class FSMAware:
"""
Base class for stuff that has to be aware of the framework FSM.
This is done by processing 'fsm_update' messages.
"""
def __init__(self):
self.fsm_state = None
self.fsm_in_accepted_state = False
self.fsm_event_log = []
self._auth_key = KeyManager().auth_key
def refresh_on_fsm_update(self, message):
if message['key'] == 'fsm_update' and verify_message(self._auth_key, message):
self._handle_fsm_update(message)
return True
return False
def _handle_fsm_update(self, message):
try:
update_data = message['data']
new_state = update_data['current_state']
if self.fsm_state != new_state:
self.handle_fsm_step(**update_data)
self.fsm_state = new_state
self.fsm_in_accepted_state = update_data['in_accepted_state']
self.fsm_event_log.append(update_data)
except KeyError:
LOG.error('Invalid fsm_update message received!')
def handle_fsm_step(self, **kwargs):
"""
Called in case the TFW FSM has stepped.
:param kwargs: fsm_update 'data' field
"""

View File

@ -1 +0,0 @@
from .envvars import TFWENV

View File

@ -1,3 +0,0 @@
from envvars import LazyEnvironment
TFWENV = LazyEnvironment('TFW_', 'tfwenvtuple').environment

View File

@ -1,98 +0,0 @@
from functools import wraps, partial
from time import time, sleep
from tfw.decorators.lazy_property import lazy_property
class RateLimiter:
"""
Decorator class for rate limiting, blocking.
When applied to a function this decorator will apply rate limiting
if the function is invoked more frequently than rate_per_seconds.
By default rate limiting means sleeping until the next invocation time
as per __init__ parameter rate_per_seconds.
Note that this decorator BLOCKS THE THREAD it is being executed on,
so it is only acceptable for stuff running on a separate thread.
If this is no good for you please refer to AsyncRateLimiter in this module,
which is designed not to block and use the IOLoop it is being called from.
"""
def __init__(self, rate_per_second):
"""
:param rate_per_second: max frequency the decorated method should be
invoked with
"""
self.min_interval = 1 / float(rate_per_second)
self.fun = None
self.last_call = time()
def action(self, seconds_to_next_call):
if seconds_to_next_call:
sleep(seconds_to_next_call)
self.fun()
def __call__(self, fun):
@wraps(fun)
def wrapper(*args, **kwargs):
self.fun = partial(fun, *args, **kwargs)
limit_seconds = self._limit_rate()
self.action(limit_seconds)
return wrapper
def _limit_rate(self):
seconds_since_last_call = time() - self.last_call
seconds_to_next_call = self.min_interval - seconds_since_last_call
if seconds_to_next_call > 0:
return seconds_to_next_call
self.last_call = time()
return 0
class AsyncRateLimiter(RateLimiter):
"""
Decorator class for rate limiting, non-blocking.
The semantics of the rate limiting:
- unlike RateLimiter this decorator never blocks, instead it adds an async
callback version of the decorated function to the IOLoop
(to be executed after the rate limiting has expired).
- the timing works similarly to RateLimiter
"""
def __init__(self, rate_per_second, ioloop_factory):
"""
:param rate_per_second: max frequency the decorated method should be
invoked with
:param ioloop_factory: callable that should return an instance of the
IOLoop of the application
"""
self._ioloop_factory = ioloop_factory
self._ioloop = None
self._last_callback = None
self._make_action_thread_safe()
super().__init__(rate_per_second=rate_per_second)
def _make_action_thread_safe(self):
self.action = partial(self.ioloop.add_callback, self.action)
@lazy_property
def ioloop(self):
return self._ioloop_factory()
def action(self, seconds_to_next_call):
# pylint: disable=method-hidden
if self._last_callback:
self.ioloop.remove_timeout(self._last_callback)
self._last_callback = self.ioloop.call_later(
seconds_to_next_call,
self.fun_with_debounce
)
def fun_with_debounce(self):
self.last_call = time()
self.fun()

View File

@ -1 +0,0 @@
from .event_handler_base import EventHandlerBase

View File

@ -1,117 +0,0 @@
import logging
from abc import ABC, abstractmethod
from typing import Iterable
from tfw.networking import Scope
LOG = logging.getLogger(__name__)
class EventHandlerBase(ABC):
"""
Abstract base class for all Python based EventHandlers. Useful implementation template
for other languages.
Derived classes must implement the handle_event() method
"""
_instances = set()
def __init__(self, key, scope=Scope.ZMQ):
type(self)._instances.add(self)
self.server_connector = self._build_server_connector()
self.scope = scope
self.keys = []
if isinstance(key, str):
self.keys.append(key)
elif isinstance(key, Iterable):
self.keys = list(key)
self.subscribe(*self.keys)
self.server_connector.register_callback(self.event_handler_callback)
@abstractmethod
def _build_server_connector(self):
raise NotImplementedError()
def subscribe(self, *keys):
"""
Subscribe this EventHandler to receive events for given keys.
Note that you can subscribe to the same key several times in which
case you will need to unsubscribe multiple times in order to stop
receiving events.
:param keys: list of keys to subscribe to
"""
for key in keys:
self.server_connector.subscribe(key)
self.keys.append(key)
def event_handler_callback(self, message):
"""
Callback that is invoked when receiving a message.
Dispatches messages to handler methods and sends
a response back in case the handler returned something.
This is subscribed in __init__().
"""
if self.check_key(message):
self.dispatch_handling(message)
def check_key(self, message):
"""
Checks whether the message is intended for this
EventHandler.
This is necessary because ZMQ handles PUB - SUB
connetions with pattern matching (e.g. someone
subscribed to 'fsm' will receive 'fsm_update'
messages as well.
"""
if '' in self.keys:
return True
return message['key'] in self.keys
def dispatch_handling(self, message):
"""
Used to dispatch messages to their specific handlers.
:param message: the message received
:returns: the message to send back
"""
self.handle_event(message)
def handle_event(self, message):
"""
Abstract method that implements the handling of messages.
:param message: the message received
:returns: the message to send back
"""
raise NotImplementedError()
def send_message(self, message):
self.server_connector.send_message(message, self.scope)
def unsubscribe(self, *keys):
"""
Unsubscribe this eventhandler from the given keys.
:param keys: list of keys to unsubscribe from
"""
for key in keys:
self.server_connector.unsubscribe(key)
self.keys.remove(key)
@classmethod
def stop_all_instances(cls):
for instance in cls._instances:
instance.stop()
def stop(self):
self.server_connector.close()
self.cleanup()
def cleanup(self):
"""
Perform cleanup actions such as releasing database
connections and stuff like that.
"""

View File

@ -1 +0,0 @@
from .tfw_server import TFWServer

View File

@ -7,4 +7,8 @@ PyYAML>=5.0.0,<6.0.0
Jinja2>=2.0.0,<3.0.0 Jinja2>=2.0.0,<3.0.0
cryptography>=2.0.0,<3.0.0 cryptography>=2.0.0,<3.0.0
python-dateutil>=2.0.0,<3.0.0 python-dateutil>=2.0.0,<3.0.0
SQLAlchemy==1.3.4 SQLAlchemy>=1.0.0,<2.0.0
python-dateutil>=2.0.0,<3.0.0
pytest>=5.0.0,<6.0.0
pylint>=2.0.0,<3.0.0
rope>=0.0.0,<1.0.0

View File

@ -1,6 +1,6 @@
from os.path import dirname, realpath, join from os.path import dirname, realpath, join
from setuptools import setup, find_packages from setuptools import setup
here = dirname(realpath(__file__)) here = dirname(realpath(__file__))
@ -10,20 +10,20 @@ with open(join(here, 'requirements.txt'), 'r') as ifile:
requirements = ifile.read().splitlines() requirements = ifile.read().splitlines()
setup( setup(
name = 'tfw', name='tfw',
version = version, version=version,
description = 'Avatao tutorial-framework', description='Avatao tutorial-framework',
url = 'https://github.com/avatao-content/baseimage-tutorial-framework', url='https://github.com/avatao-content/baseimage-tutorial-framework',
author = 'Avatao.com Innovative Learning Kft.', author='Avatao.com Innovative Learning Kft.',
author_email = 'support@avatao.com', author_email='support@avatao.com',
license = 'custom', license='custom',
packages = find_packages('lib'), packages=['tfw'],
package_dir = {'': 'lib'}, package_dir={'tfw': 'tfw'},
install_requires = requirements, install_requires=requirements,
extras_require = { extras_require={
'docs': [ 'docs': [
'sphinx >= 1.7.0', 'sphinx >= 1.7.0',
], ],
}, },
zip_safe = False, zip_safe=False,
) )

View File

@ -2,7 +2,7 @@ from sys import stderr
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
from tfw.server import TFWServer from tfw.main import TFWServer, setup_signal_handlers
from tfw.config import TFWENV from tfw.config import TFWENV
from tfw.logging import Log, Logger, LogFormatter, VerboseLogFormatter from tfw.logging import Log, Logger, LogFormatter, VerboseLogFormatter
@ -13,4 +13,6 @@ if __name__ == '__main__':
Log(TFWENV.LOGFILE, VerboseLogFormatter()) Log(TFWENV.LOGFILE, VerboseLogFormatter())
]).start() ]).start()
TFWServer().listen() TFWServer().listen()
setup_signal_handlers()
IOLoop.instance().start() IOLoop.instance().start()

View File

@ -0,0 +1,2 @@
from .frontend_handler import FrontendHandler
from .message_sender import MessageSender

View File

@ -0,0 +1,25 @@
from tfw.internals.networking import Scope
from .message_storage import FrontendMessageStorage
class FrontendHandler:
keys = ['message', 'queueMessages', 'dashboard', 'console']
def __init__(self):
self.server_connector = None
self.keys = [*type(self).keys, 'recover']
self._frontend_message_storage = FrontendMessageStorage(type(self).keys)
def send_message(self, message):
self.server_connector.send_message(message, scope=Scope.WEBSOCKET)
def handle_event(self, message, _):
self._frontend_message_storage.save_message(message)
if message['key'] == 'recover':
self.recover_frontend()
self.send_message(message)
def recover_frontend(self):
for message in self._frontend_message_storage.messages:
self.send_message(message)

View File

@ -0,0 +1 @@
from .fsm_handler import FSMHandler

View File

@ -1,15 +1,15 @@
import logging import logging
from tfw.crypto import KeyManager, sign_message, verify_message from tfw.internals.crypto import KeyManager, sign_message, verify_message
from tfw.networking import Scope from tfw.internals.networking import Scope
from tfw.components import FSMUpdater
from .event_handler import EventHandler from .fsm_updater import FSMUpdater
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class FSMManagingEventHandler(EventHandler): class FSMHandler:
keys = ['fsm']
""" """
EventHandler responsible for managing the state machine of EventHandler responsible for managing the state machine of
the framework (TFW FSM). the framework (TFW FSM).
@ -24,8 +24,7 @@ class FSMManagingEventHandler(EventHandler):
An 'fsm_update' message is broadcasted after every successful An 'fsm_update' message is broadcasted after every successful
command. command.
""" """
def __init__(self, key, fsm_type, require_signature=False): def __init__(self, *, fsm_type, require_signature=False):
super().__init__(key, scope=Scope.WEBSOCKET)
self.fsm = fsm_type() self.fsm = fsm_type()
self._fsm_updater = FSMUpdater(self.fsm) self._fsm_updater = FSMUpdater(self.fsm)
self.auth_key = KeyManager().auth_key self.auth_key = KeyManager().auth_key
@ -36,15 +35,14 @@ class FSMManagingEventHandler(EventHandler):
'update': self.handle_update 'update': self.handle_update
} }
def handle_event(self, message): def handle_event(self, message, server_connector):
try: try:
message = self.command_handlers[message['data']['command']](message) message = self.command_handlers[message['data']['command']](message)
if message: if message:
fsm_update_message = self._fsm_updater.fsm_update fsm_update_message = self._fsm_updater.fsm_update
sign_message(self.auth_key, message) sign_message(self.auth_key, message)
sign_message(self.auth_key, fsm_update_message) sign_message(self.auth_key, fsm_update_message)
self.server_connector.send_message(fsm_update_message, Scope.BROADCAST) server_connector.send_message(fsm_update_message, Scope.BROADCAST)
self.send_message(message)
except KeyError: except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -6,7 +6,7 @@ class FSMUpdater:
def fsm_update(self): def fsm_update(self):
return { return {
'key': 'fsm_update', 'key': 'fsm_update',
'data': self.fsm_update_data **self.fsm_update_data
} }
@property @property

View File

@ -0,0 +1 @@
from .ide_handler import IdeHandler

View File

@ -1,5 +1,4 @@
# pylint: disable=redefined-outer-name # pylint: disable=redefined-outer-name
from dataclasses import dataclass from dataclasses import dataclass
from secrets import token_urlsafe from secrets import token_urlsafe
from os.path import join from os.path import join
@ -11,6 +10,7 @@ import pytest
from .file_manager import FileManager from .file_manager import FileManager
@dataclass @dataclass
class ManagerContext: class ManagerContext:
folder: str folder: str

View File

@ -1,10 +1,9 @@
import logging import logging
from tfw.networking import Scope from tfw.internals.networking import Scope
from tfw.components import FileManager from tfw.internals.inotify import InotifyObserver
from tfw.components.inotify import InotifyObserver
from .event_handler import EventHandler from .file_manager import FileManager
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -32,7 +31,8 @@ BUILD_ARTIFACTS = (
) )
class IdeEventHandler(EventHandler): class IdeHandler:
keys = ['ide']
# pylint: disable=too-many-arguments,anomalous-backslash-in-string # pylint: disable=too-many-arguments,anomalous-backslash-in-string
""" """
Event handler implementing the backend of our browser based IDE. Event handler implementing the backend of our browser based IDE.
@ -47,7 +47,7 @@ class IdeEventHandler(EventHandler):
The API of each command is documented in their respective handler. The API of each command is documented in their respective handler.
""" """
def __init__(self, key, directory, allowed_directories, selected_file=None, exclude=None): def __init__(self, *, directory, allowed_directories, selected_file=None, exclude=None):
""" """
:param key: the key this instance should listen to :param key: the key this instance should listen to
:param directory: working directory which the EventHandler should serve files from :param directory: working directory which the EventHandler should serve files from
@ -55,7 +55,7 @@ class IdeEventHandler(EventHandler):
:param selected_file: file that is selected by default :param selected_file: file that is selected by default
:param exclude: list of filenames that should not appear between files (for .o, .pyc, etc.) :param exclude: list of filenames that should not appear between files (for .o, .pyc, etc.)
""" """
super().__init__(key, scope=Scope.WEBSOCKET) self.server_connector = None
try: try:
self.filemanager = FileManager( self.filemanager = FileManager(
allowed_directories=allowed_directories, allowed_directories=allowed_directories,
@ -84,10 +84,13 @@ class IdeEventHandler(EventHandler):
} }
def _reload_frontend(self, event): # pylint: disable=unused-argument def _reload_frontend(self, event): # pylint: disable=unused-argument
self.server_connector.send_message({ self.send_message({
'key': 'ide', 'key': 'ide',
'data': {'command': 'reload'} 'data': {'command': 'reload'}
}, Scope.WEBSOCKET) })
def send_message(self, message):
self.server_connector.send_message(message, scope=Scope.WEBSOCKET)
def read(self, data): def read(self, data):
""" """
@ -179,7 +182,7 @@ class IdeEventHandler(EventHandler):
data['files'] = self.filemanager.files data['files'] = self.filemanager.files
data['directory'] = self.filemanager.workdir data['directory'] = self.filemanager.workdir
def handle_event(self, message): def handle_event(self, message, _):
try: try:
data = message['data'] data = message['data']
message['data'] = self.commands[data['command']](data) message['data'] = self.commands[data['command']](data)

View File

@ -0,0 +1 @@
from .pipe_io_handler import PipeIOHandler, PipeIOHandlerBase, TransformerPipeIOHandler, CommandHandler

View File

@ -10,17 +10,17 @@ from secrets import token_urlsafe
from threading import Thread from threading import Thread
from contextlib import suppress from contextlib import suppress
from tfw.components.pipe_io_server import PipeIOServer, terminate_process_on_failure from .pipe_io_server import PipeIOServer, terminate_process_on_failure
from .event_handler import EventHandler
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
DEFAULT_PERMISSIONS = 0o600 DEFAULT_PERMISSIONS = 0o600
class PipeIOEventHandlerBase(EventHandler): class PipeIOHandlerBase:
def __init__(self, key, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS): keys = ['']
super().__init__(key)
def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS):
self.server_connector = None
self.pipe_io = CallbackPipeIOServer( self.pipe_io = CallbackPipeIOServer(
in_pipe_path, in_pipe_path,
out_pipe_path, out_pipe_path,
@ -49,26 +49,26 @@ class CallbackPipeIOServer(PipeIOServer):
LOG.exception('Failed to handle message %s from pipe %s!', message, self.in_pipe) LOG.exception('Failed to handle message %s from pipe %s!', message, self.in_pipe)
class PipeIOEventHandler(PipeIOEventHandlerBase): class PipeIOHandler(PipeIOHandlerBase):
def handle_event(self, message): def handle_event(self, message, _):
json_bytes = dumps(message).encode() json_bytes = dumps(message).encode()
self.pipe_io.send_message(json_bytes) self.pipe_io.send_message(json_bytes)
def handle_pipe_event(self, message_bytes): def handle_pipe_event(self, message_bytes):
json = loads(message_bytes) json = loads(message_bytes)
self.send_message(json) self.server_connector.send_message(json)
class TransformerPipeIOEventHandler(PipeIOEventHandlerBase): class TransformerPipeIOHandler(PipeIOHandlerBase):
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
def __init__( def __init__(
self, key, in_pipe_path, out_pipe_path, self, in_pipe_path, out_pipe_path,
transform_in_cmd, transform_out_cmd, transform_in_cmd, transform_out_cmd,
permissions=DEFAULT_PERMISSIONS permissions=DEFAULT_PERMISSIONS
): ):
self._transform_in = partial(self._transform_message, transform_in_cmd) self._transform_in = partial(self._transform_message, transform_in_cmd)
self._transform_out = partial(self._transform_message, transform_out_cmd) self._transform_out = partial(self._transform_message, transform_out_cmd)
super().__init__(key, in_pipe_path, out_pipe_path, permissions) super().__init__(in_pipe_path, out_pipe_path, permissions)
@staticmethod @staticmethod
def _transform_message(transform_cmd, message): def _transform_message(transform_cmd, message):
@ -83,7 +83,7 @@ class TransformerPipeIOEventHandler(PipeIOEventHandlerBase):
return proc.stdout return proc.stdout
raise ValueError(f'Transforming message {message} failed!') raise ValueError(f'Transforming message {message} failed!')
def handle_event(self, message): def handle_event(self, message, _):
json_bytes = dumps(message).encode() json_bytes = dumps(message).encode()
transformed_bytes = self._transform_out(json_bytes) transformed_bytes = self._transform_out(json_bytes)
if transformed_bytes: if transformed_bytes:
@ -93,13 +93,12 @@ class TransformerPipeIOEventHandler(PipeIOEventHandlerBase):
transformed_bytes = self._transform_in(message_bytes) transformed_bytes = self._transform_in(message_bytes)
if transformed_bytes: if transformed_bytes:
json_message = loads(transformed_bytes) json_message = loads(transformed_bytes)
self.send_message(json_message) self.server_connector.send_message(json_message)
class CommandEventHandler(PipeIOEventHandler): class CommandHandler(PipeIOHandler):
def __init__(self, key, command, permissions=DEFAULT_PERMISSIONS): def __init__(self, command, permissions=DEFAULT_PERMISSIONS):
super().__init__( super().__init__(
key,
self._generate_tempfilename(), self._generate_tempfilename(),
self._generate_tempfilename(), self._generate_tempfilename(),
permissions permissions

View File

@ -0,0 +1,2 @@
from .process_handler import ProcessHandler
from .process_log_handler import ProcessLogHandler

View File

@ -1,19 +1,19 @@
import logging import logging
from tfw.networking import Scope from tfw.internals.networking import Scope
from tfw.internals.inotify import InotifyObserver
from .inotify import InotifyObserver from .supervisor import ProcessLogManager
from .supervisor import LogManager
class LogInotifyObserver(InotifyObserver, LogManager): class LogInotifyObserver(InotifyObserver, ProcessLogManager):
def __init__(self, server_connector, supervisor_uri, process_name, log_tail=0): def __init__(self, server_connector, supervisor_uri, process_name, log_tail=0):
self._prevent_log_recursion() self._prevent_log_recursion()
self._server_connector = server_connector self._server_connector = server_connector
self._process_name = process_name self._process_name = process_name
self.log_tail = log_tail self.log_tail = log_tail
self._procinfo = None self._procinfo = None
LogManager.__init__(self, supervisor_uri) ProcessLogManager.__init__(self, supervisor_uri)
InotifyObserver.__init__(self, self._get_logfiles()) InotifyObserver.__init__(self, self._get_logfiles())
@staticmethod @staticmethod

View File

@ -1,16 +1,15 @@
import logging import logging
from xmlrpc.client import Fault as SupervisorFault from xmlrpc.client import Fault as SupervisorFault
from tfw.config import TFWENV from tfw.internals.networking import Scope
from tfw.networking import Scope
from tfw.components import ProcessManager, LogManager
from .event_handler import EventHandler from .supervisor import ProcessManager, ProcessLogManager
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class ProcessManagingEventHandler(EventHandler, ProcessManager, LogManager): class ProcessHandler(ProcessManager, ProcessLogManager):
keys = ['processmanager']
""" """
Event handler that can manage processes managed by supervisor. Event handler that can manage processes managed by supervisor.
@ -23,10 +22,9 @@ class ProcessManagingEventHandler(EventHandler, ProcessManager, LogManager):
Commands available: start, stop, restart, readlog Commands available: start, stop, restart, readlog
(the names are as self-documenting as it gets) (the names are as self-documenting as it gets)
""" """
def __init__(self, key, log_tail=0): def __init__(self, *, supervisor_uri, log_tail=0):
EventHandler.__init__(self, key, scope=Scope.WEBSOCKET) ProcessManager.__init__(self, supervisor_uri)
ProcessManager.__init__(self, TFWENV.SUPERVISOR_HTTP_URI) ProcessLogManager.__init__(self, supervisor_uri)
LogManager.__init__(self, TFWENV.SUPERVISOR_HTTP_URI)
self.log_tail = log_tail self.log_tail = log_tail
self.commands = { self.commands = {
'start': self.start_process, 'start': self.start_process,
@ -34,7 +32,7 @@ class ProcessManagingEventHandler(EventHandler, ProcessManager, LogManager):
'restart': self.restart_process 'restart': self.restart_process
} }
def handle_event(self, message): def handle_event(self, message, server_connector):
try: try:
data = message['data'] data = message['data']
try: try:
@ -50,6 +48,6 @@ class ProcessManagingEventHandler(EventHandler, ProcessManager, LogManager):
data['process_name'], data['process_name'],
self.log_tail self.log_tail
) )
self.send_message(message) server_connector.send_message(message, scope=Scope.WEBSOCKET)
except KeyError: except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -1,15 +1,12 @@
import logging import logging
from tfw.config import TFWENV from .log_inotify_observer import LogInotifyObserver
from tfw.networking import Scope
from tfw.components import LogInotifyObserver
from .event_handler import EventHandler
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class LogMonitoringEventHandler(EventHandler): class ProcessLogHandler:
keys = ['logmonitor']
""" """
Monitors the output of a supervisor process (stdout, stderr) and Monitors the output of a supervisor process (stdout, stderr) and
sends the results to the frontend. sends the results to the frontend.
@ -19,23 +16,28 @@ class LogMonitoringEventHandler(EventHandler):
The API of each command is documented in their respective handler. The API of each command is documented in their respective handler.
""" """
def __init__(self, key, process_name, log_tail=0): def __init__(self, *, process_name, supervisor_uri, log_tail=0):
super().__init__(key, scope=Scope.WEBSOCKET) self.server_connector = None
self.process_name = process_name self.process_name = process_name
self._monitor = LogInotifyObserver( self._supervisor_uri = supervisor_uri
server_connector=self.server_connector, self._initial_log_tail = log_tail
supervisor_uri=TFWENV.SUPERVISOR_HTTP_URI, self._monitor = None
process_name=process_name,
log_tail=log_tail
)
self._monitor.start()
self.command_handlers = { self.command_handlers = {
'process_name': self.handle_process_name, 'process_name': self.handle_process_name,
'log_tail': self.handle_log_tail 'log_tail': self.handle_log_tail
} }
def handle_event(self, message): def start(self):
self._monitor = LogInotifyObserver(
server_connector=self.server_connector,
supervisor_uri=self._supervisor_uri,
process_name=self.process_name,
log_tail=self._initial_log_tail
)
self._monitor.start()
def handle_event(self, message, _):
try: try:
data = message['data'] data = message['data']
self.command_handlers[data['command']](data) self.command_handlers[data['command']](data)

View File

@ -22,7 +22,7 @@ class ProcessManager(SupervisorBase):
self.start_process(process_name) self.start_process(process_name)
class LogManager(SupervisorBase): class ProcessLogManager(SupervisorBase):
def read_stdout(self, process_name, tail=0): def read_stdout(self, process_name, tail=0):
return self.supervisor.readProcessStdoutLog(process_name, -tail, 0) return self.supervisor.readProcessStdoutLog(process_name, -tail, 0)

View File

@ -0,0 +1 @@
from .snapshot_handler import SnapshotHandler

View File

@ -6,18 +6,18 @@ from datetime import datetime
from dateutil import parser as dateparser from dateutil import parser as dateparser
from tfw.components.snapshot_provider import SnapshotProvider from tfw.internals.networking import Scope
from tfw.config import TFWENV
from tfw.networking import Scope
from .event_handler import EventHandler from .snapshot_provider import SnapshotProvider
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class DirectorySnapshottingEventHandler(EventHandler): class SnapshotHandler:
def __init__(self, key, directories, exclude_unix_patterns=None): keys = ['snapshot']
super().__init__(key, scope=Scope.WEBSOCKET)
def __init__(self, *, directories, snapshots_dir, exclude_unix_patterns=None):
self._snapshots_dir = snapshots_dir
self.snapshot_providers = {} self.snapshot_providers = {}
self._exclude_unix_patterns = exclude_unix_patterns self._exclude_unix_patterns = exclude_unix_patterns
self.init_snapshot_providers(directories) self.init_snapshot_providers(directories)
@ -37,20 +37,19 @@ class DirectorySnapshottingEventHandler(EventHandler):
self._exclude_unix_patterns self._exclude_unix_patterns
) )
@staticmethod def init_git_dir(self, index, directory):
def init_git_dir(index, directory):
git_dir = joinpath( git_dir = joinpath(
TFWENV.SNAPSHOTS_DIR, self._snapshots_dir,
f'{basename(directory)}-{index}' f'{basename(directory)}-{index}'
) )
makedirs(git_dir, exist_ok=True) makedirs(git_dir, exist_ok=True)
return git_dir return git_dir
def handle_event(self, message): def handle_event(self, message, server_connector):
try: try:
data = message['data'] data = message['data']
message['data'] = self.command_handlers[data['command']](data) message['data'] = self.command_handlers[data['command']](data)
self.send_message(message) server_connector.send_message(message, scope=Scope.WEBSOCKET)
except KeyError: except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

View File

@ -0,0 +1,3 @@
from .terminal_handler import TerminalHandler
from .terminal_commands_handler import TerminalCommandsHandler
from .commands_equal import CommandsEqual

View File

@ -1,7 +1,7 @@
from shlex import split from shlex import split
from re import search from re import search
from tfw.decorators.lazy_property import lazy_property from tfw.internals.lazy import lazy_property
class CommandsEqual: class CommandsEqual:

View File

@ -2,7 +2,7 @@ from re import findall
from re import compile as compileregex from re import compile as compileregex
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from tfw.components.inotify import InotifyObserver from tfw.internals.inotify import InotifyObserver
class HistoryMonitor(ABC, InotifyObserver): class HistoryMonitor(ABC, InotifyObserver):

View File

@ -26,7 +26,7 @@ class TerminalCommands(ABC):
You can also use this class to create new commands similarly. You can also use this class to create new commands similarly.
""" """
def __init__(self, bashrc=None): def __init__(self, bashrc):
self._command_method_regex = r'^command_(.+)$' self._command_method_regex = r'^command_(.+)$'
self.command_implemetations = self._build_command_to_implementation_dict() self.command_implemetations = self._build_command_to_implementation_dict()
if bashrc is not None: if bashrc is not None:

View File

@ -0,0 +1,9 @@
from .terminal_commands import TerminalCommands
class TerminalCommandsHandler(TerminalCommands):
keys = ['history.bash']
def handle_event(self, message, _):
command = message['value']
self.callback(command)

View File

@ -1,16 +1,13 @@
import logging import logging
from tfw.networking import Scope from .history_monitor import BashMonitor
from tfw.components import BashMonitor, TerminadoMiniServer from .terminado_mini_server import TerminadoMiniServer
from tfw.config import TFWENV
from tao.config import TAOENV
from .event_handler import EventHandler
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class TerminalEventHandler(EventHandler): class TerminalHandler:
keys = ['shell']
""" """
Event handler responsible for managing terminal sessions for frontend xterm Event handler responsible for managing terminal sessions for frontend xterm
sessions to connect to. You need to instanciate this in order for frontend sessions to connect to. You need to instanciate this in order for frontend
@ -20,19 +17,20 @@ class TerminalEventHandler(EventHandler):
a command to be executed. a command to be executed.
The API of each command is documented in their respective handler. The API of each command is documented in their respective handler.
""" """
def __init__(self, key): def __init__(self, *, port, user, workind_directory, histfile):
""" """
:param key: key this EventHandler listens to :param key: key this EventHandler listens to
:param monitor: tfw.components.HistoryMonitor instance to read command history from :param monitor: tfw.components.HistoryMonitor instance to read command history from
""" """
super().__init__(key, scope=Scope.WEBSOCKET) self.server_connector = None
self._historymonitor = BashMonitor(self.server_connector, TFWENV.HISTFILE) self._histfile = histfile
bash_as_user_cmd = ['sudo', '-u', TAOENV.USER, 'bash'] self._historymonitor = None
bash_as_user_cmd = ['sudo', '-u', user, 'bash']
self.terminado_server = TerminadoMiniServer( self.terminado_server = TerminadoMiniServer(
'/terminal', '/terminal',
TFWENV.TERMINADO_PORT, port,
TFWENV.TERMINADO_WD, workind_directory,
bash_as_user_cmd bash_as_user_cmd
) )
@ -41,18 +39,20 @@ class TerminalEventHandler(EventHandler):
'read': self.read 'read': self.read
} }
self._historymonitor.start()
self.terminado_server.listen() self.terminado_server.listen()
def start(self):
self._historymonitor = BashMonitor(self.server_connector, self._histfile)
self._historymonitor.start()
@property @property
def historymonitor(self): def historymonitor(self):
return self._historymonitor return self._historymonitor
def handle_event(self, message): def handle_event(self, message, _):
try: try:
data = message['data'] data = message['data']
message['data'] = self.commands[data['command']](data) message['data'] = self.commands[data['command']](data)
self.send_message(message)
except KeyError: except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)

1
tfw/config/__init__.py Normal file
View File

@ -0,0 +1 @@
from .envvars import TFWENV, TAOENV

4
tfw/config/envvars.py Normal file
View File

@ -0,0 +1,4 @@
from .lazy_environment import LazyEnvironment
TFWENV = LazyEnvironment('TFW_', 'tfwenvtuple').environment
TAOENV = LazyEnvironment('AVATAO_', 'taoenvtuple').environment

View File

@ -1,7 +1,7 @@
from collections import namedtuple from collections import namedtuple
from os import environ from os import environ
from tfw.decorators.lazy_property import lazy_property from tfw.internals.lazy import lazy_property
class LazyEnvironment: class LazyEnvironment:

2
tfw/event_handlers.py Normal file
View File

@ -0,0 +1,2 @@
# pylint: disable=unused-import
from tfw.internals.event_handling import EventHandler, FSMAwareEventHandler

View File

@ -4,7 +4,7 @@ from datetime import datetime
from transitions import Machine, MachineError from transitions import Machine, MachineError
from tfw.mixins.callback_mixin import CallbackMixin from tfw.internals.callback_mixin import CallbackMixin
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)

View File

@ -1,6 +1,6 @@
from transitions import State from transitions import State
from tfw.fsm.fsm_base import FSMBase from .fsm_base import FSMBase
class LinearFSM(FSMBase): class LinearFSM(FSMBase):

View File

@ -6,7 +6,7 @@ import yaml
import jinja2 import jinja2
from transitions import State from transitions import State
from tfw.fsm.fsm_base import FSMBase from .fsm_base import FSMBase
class YamlFSM(FSMBase): class YamlFSM(FSMBase):

View File

@ -1,6 +1,6 @@
from functools import partial from functools import partial
from tfw.decorators.lazy_property import lazy_property from .lazy import lazy_property
class CallbackMixin: class CallbackMixin:

View File

@ -11,8 +11,8 @@ from cryptography.hazmat.primitives.hashes import SHA256
from cryptography.hazmat.primitives.hmac import HMAC as _HMAC from cryptography.hazmat.primitives.hmac import HMAC as _HMAC
from cryptography.exceptions import InvalidSignature from cryptography.exceptions import InvalidSignature
from tfw.networking import message_bytes from tfw.internals.networking import message_bytes
from tfw.decorators.lazy_property import lazy_property from tfw.internals.lazy import lazy_property
from tfw.config import TFWENV from tfw.config import TFWENV

View File

@ -0,0 +1,3 @@
from .event_handler_factory_base import EventHandlerFactoryBase
from .event_handler import EventHandler
from .fsm_aware_event_handler import FSMAwareEventHandler

View File

@ -0,0 +1,27 @@
class EventHandler:
_instances = set()
def __init__(self, server_connector):
type(self)._instances.add(self)
self.server_connector = server_connector
def start(self):
self.server_connector.register_callback(self._event_callback)
def _event_callback(self, message):
self.handle_event(message, self.server_connector)
def handle_event(self, message, server_connector):
raise NotImplementedError()
@classmethod
def stop_all_instances(cls):
for instance in cls._instances:
instance.stop()
def stop(self):
self.server_connector.close()
self.cleanup()
def cleanup(self):
pass

View File

@ -0,0 +1,68 @@
from contextlib import suppress
from .event_handler import EventHandler
class EventHandlerFactoryBase:
def build(self, handler_stub, *, keys=None, event_handler_type=EventHandler):
builder = EventHandlerBuilder(handler_stub, keys, event_handler_type)
server_connector = self._build_server_connector()
event_handler = builder.build(server_connector)
handler_stub.server_connector = server_connector
with suppress(AttributeError):
handler_stub.start()
event_handler.start()
return event_handler
def _build_server_connector(self):
raise NotImplementedError()
class EventHandlerBuilder:
def __init__(self, event_handler, supplied_keys, event_handler_type):
self._analyzer = HandlerStubAnalyzer(event_handler, supplied_keys)
self._event_handler_type = event_handler_type
def build(self, server_connector):
event_handler = self._event_handler_type(server_connector)
server_connector.subscribe(*self._try_get_keys(event_handler))
event_handler.handle_event = self._analyzer.handle_event
with suppress(AttributeError):
event_handler.cleanup = self._analyzer.cleanup
return event_handler
def _try_get_keys(self, event_handler):
try:
return self._analyzer.keys
except ValueError:
with suppress(AttributeError):
return event_handler.keys
raise
class HandlerStubAnalyzer:
def __init__(self, event_handler, supplied_keys):
self._event_handler = event_handler
self._supplied_keys = supplied_keys
@property
def keys(self):
if self._supplied_keys is None:
try:
return self._event_handler.keys
except AttributeError:
raise ValueError('No keys supplied!')
return self._supplied_keys
@property
def handle_event(self):
try:
return self._event_handler.handle_event
except AttributeError:
if callable(self._event_handler):
return self._event_handler
raise ValueError('Object must implement handle_event or be a callable!')
@property
def cleanup(self):
return self._event_handler.cleanup

View File

@ -0,0 +1,37 @@
import logging
from tfw.internals.crypto import KeyManager, verify_message
LOG = logging.getLogger(__name__)
class FSMAware:
keys = ['fsm_update']
"""
Base class for stuff that has to be aware of the framework FSM.
This is done by processing 'fsm_update' messages.
"""
def __init__(self):
self.fsm_state = None
self.fsm_in_accepted_state = False
self.fsm_event_log = []
self._auth_key = KeyManager().auth_key
def process_message(self, message):
if message['key'] == 'fsm_update':
if verify_message(self._auth_key, message):
self._handle_fsm_update(message)
def _handle_fsm_update(self, message):
try:
new_state = message['current_state']
if self.fsm_state != new_state:
self.handle_fsm_step(message)
self.fsm_state = new_state
self.fsm_in_accepted_state = message['in_accepted_state']
self.fsm_event_log.append(message)
except KeyError:
LOG.error('Invalid fsm_update message received!')
def handle_fsm_step(self, message):
pass

View File

@ -0,0 +1,19 @@
from .event_handler import EventHandler
from .fsm_aware import FSMAware
class FSMAwareEventHandler(EventHandler, FSMAware):
# pylint: disable=abstract-method
"""
Abstract base class for EventHandlers which automatically
keep track of the state of the TFW FSM.
"""
def __init__(self, server_connector):
EventHandler.__init__(self, server_connector)
FSMAware.__init__(self)
def _event_callback(self, message):
self.process_message(message)
def handle_fsm_step(self, message):
self.handle_event(message, self.server_connector)

View File

@ -0,0 +1,190 @@
# pylint: disable=redefined-outer-name,attribute-defined-outside-init
from secrets import token_urlsafe
from random import randint
import pytest
from .event_handler_factory_base import EventHandlerFactoryBase
from .event_handler import EventHandler
class MockEventHandlerFactory(EventHandlerFactoryBase):
def _build_server_connector(self):
return MockServerConnector()
class MockServerConnector:
def __init__(self):
self.keys = []
self._on_message = None
def simulate_message(self, message):
self._on_message(message)
def register_callback(self, callback):
self._on_message = callback
def subscribe(self, *keys):
self.keys.extend(keys)
def unsubscribe(self, *keys):
for key in keys:
self.keys.remove(key)
def send_message(self, message, scope=None):
pass
def close(self):
pass
class MockEventHandlerStub:
def __init__(self):
self.server_connector = None
self.last_message = None
self.cleaned_up = False
self.started = False
def start(self):
self.started = True
def cleanup(self):
self.cleaned_up = True
class MockEventHandler(MockEventHandlerStub):
# pylint: disable=unused-argument
def handle_event(self, message, server_connector):
self.last_message = message
class MockCallable(MockEventHandlerStub):
def __call__(self, message, server_connector):
self.last_message = message
@pytest.fixture
def test_msg():
yield token_urlsafe(randint(16, 64))
@pytest.fixture
def test_keys():
yield [
token_urlsafe(randint(2, 8))
for _ in range(randint(16, 32))
]
def test_build_from_object(test_keys, test_msg):
mock_eh = MockEventHandlerStub()
def handle_event(message, server_connector):
raise RuntimeError(message, server_connector.keys)
mock_eh.handle_event = handle_event
assert not mock_eh.started
eh = MockEventHandlerFactory().build(mock_eh, keys=test_keys)
assert mock_eh.started
assert mock_eh.server_connector is eh.server_connector
with pytest.raises(RuntimeError) as err:
eh.server_connector.simulate_message(test_msg)
msg, keys = err.args
assert msg == test_msg
assert keys == test_keys
assert not mock_eh.cleaned_up
eh.stop()
assert mock_eh.cleaned_up
def test_build_from_object_with_keys(test_keys, test_msg):
mock_eh = MockEventHandler()
mock_eh.keys = test_keys
assert not mock_eh.started
eh = MockEventHandlerFactory().build(mock_eh)
assert mock_eh.server_connector.keys == test_keys
assert eh.server_connector is mock_eh.server_connector
assert mock_eh.started
assert not mock_eh.last_message
eh.server_connector.simulate_message(test_msg)
assert mock_eh.last_message == test_msg
assert not mock_eh.cleaned_up
EventHandler.stop_all_instances()
assert mock_eh.cleaned_up
def test_build_from_simple_object(test_keys, test_msg):
class SimpleMockEventHandler:
# pylint: disable=no-self-use
def handle_event(self, message, server_connector):
raise RuntimeError(message, server_connector)
mock_eh = SimpleMockEventHandler()
eh = MockEventHandlerFactory().build(mock_eh, keys=test_keys)
with pytest.raises(RuntimeError) as err:
eh.server_connector.simulate_message(test_msg)
msg, keys = err.args
assert msg == test_msg
assert keys == test_keys
def test_build_from_callable(test_keys, test_msg):
mock_eh = MockCallable()
assert not mock_eh.started
eh = MockEventHandlerFactory().build(mock_eh, keys=test_keys)
assert mock_eh.started
assert mock_eh.server_connector is eh.server_connector
assert eh.server_connector.keys == test_keys
assert not mock_eh.last_message
eh.server_connector.simulate_message(test_msg)
assert mock_eh.last_message == test_msg
assert not mock_eh.cleaned_up
eh.stop()
assert mock_eh.cleaned_up
def test_build_from_function(test_keys, test_msg):
def some_function(message, server_connector):
raise RuntimeError(message, server_connector.keys)
eh = MockEventHandlerFactory().build(some_function, keys=test_keys)
assert eh.server_connector.keys == test_keys
with pytest.raises(RuntimeError) as err:
eh.server_connector.simulate_message(test_msg)
msg, keys = err.args
assert msg == test_msg
assert keys == test_keys
def test_build_from_lambda(test_keys, test_msg):
def assert_messages_equal(msg):
assert msg == test_msg
fun = lambda msg, sc: assert_messages_equal(msg)
eh = MockEventHandlerFactory().build(fun, keys=test_keys)
eh.server_connector.simulate_message(test_msg)
def test_build_raises_if_no_key(test_keys):
eh = MockEventHandler()
with pytest.raises(ValueError):
MockEventHandlerFactory().build(eh)
def handle_event(*_):
pass
with pytest.raises(ValueError):
MockEventHandlerFactory().build(handle_event)
with pytest.raises(ValueError):
MockEventHandlerFactory().build(lambda msg, sc: None)
WithKeysEventHandler = EventHandler
WithKeysEventHandler.keys = test_keys
MockEventHandlerFactory().build(eh, event_handler_type=WithKeysEventHandler)
eh.keys = test_keys
MockEventHandlerFactory().build(eh)

View File

@ -8,7 +8,7 @@ from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class EventHandlerDownlinkConnector(): class EventHandlerDownlinkConnector:
def __init__(self, bind_addr): def __init__(self, bind_addr):
self._zmq_pull_socket = zmq.Context.instance().socket(zmq.PULL) self._zmq_pull_socket = zmq.Context.instance().socket(zmq.PULL)
self._zmq_pull_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_pull_socket.setsockopt(zmq.RCVHWM, 0)
@ -24,7 +24,7 @@ class EventHandlerDownlinkConnector():
self._zmq_pull_stream.close() self._zmq_pull_stream.close()
class EventHandlerUplinkConnector(): class EventHandlerUplinkConnector:
def __init__(self, bind_addr): def __init__(self, bind_addr):
self._zmq_pub_socket = zmq.Context.instance().socket(zmq.PUB) self._zmq_pub_socket = zmq.Context.instance().socket(zmq.PUB)
self._zmq_pub_socket.setsockopt(zmq.SNDHWM, 0) self._zmq_pub_socket.setsockopt(zmq.SNDHWM, 0)

View File

@ -1,5 +1,4 @@
import logging import logging
from functools import partial
import zmq import zmq
from zmq.eventloop.zmqstream import ZMQStream from zmq.eventloop.zmqstream import ZMQStream
@ -10,29 +9,43 @@ from .serialization import serialize_tfw_msg, with_deserialize_tfw_msg
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class ServerDownlinkConnector(): class ServerDownlinkConnector:
def __init__(self, connect_addr): def __init__(self, connect_addr):
self.keys = []
self._on_recv_callback = None
self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB) self._zmq_sub_socket = zmq.Context.instance().socket(zmq.SUB)
self._zmq_sub_socket.connect(connect_addr)
self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0) self._zmq_sub_socket.setsockopt(zmq.RCVHWM, 0)
self._zmq_sub_socket.connect(connect_addr)
self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket)
self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE) def subscribe(self, *keys):
self.unsubscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.UNSUBSCRIBE) for key in keys:
self._zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, key)
self.keys.append(key)
def unsubscribe(self, *keys):
for key in keys:
self._zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, key)
self.keys.remove(key)
def register_callback(self, callback): def register_callback(self, callback):
callback = with_deserialize_tfw_msg(callback) self._on_recv_callback = callback
self._zmq_sub_stream.on_recv(callback) self._zmq_sub_stream.on_recv(with_deserialize_tfw_msg(self._on_recv))
def _on_recv(self, message):
key = message['key']
if key in self.keys or '' in self.keys:
self._on_recv_callback(message)
def close(self): def close(self):
self._zmq_sub_stream.close() self._zmq_sub_stream.close()
class ServerUplinkConnector(): class ServerUplinkConnector:
def __init__(self, connect_addr): def __init__(self, connect_addr):
self._zmq_push_socket = zmq.Context.instance().socket(zmq.PUSH) self._zmq_push_socket = zmq.Context.instance().socket(zmq.PUSH)
self._zmq_push_socket.connect(connect_addr)
self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0) self._zmq_push_socket.setsockopt(zmq.SNDHWM, 0)
self._zmq_push_socket.connect(connect_addr)
def send_message(self, message, scope=Scope.ZMQ): def send_message(self, message, scope=Scope.ZMQ):
message['scope'] = scope.value message['scope'] = scope.value

View File

@ -0,0 +1 @@
from .zmq_websocket_router import ZMQWebSocketRouter

View File

@ -0,0 +1,22 @@
from tfw.internals.networking import Scope
class TFWRouter:
def __init__(self, send_to_zmq, send_to_websockets):
self.send_to_zmq = send_to_zmq
self.send_to_websockets = send_to_websockets
def route(self, message):
scope = Scope(message.pop('scope', 'zmq'))
routing_table = {
Scope.ZMQ: self.send_to_zmq,
Scope.WEBSOCKET: self.send_to_websockets,
Scope.BROADCAST: self.broadcast
}
action = routing_table[scope]
action(message)
def broadcast(self, message):
self.send_to_zmq(message)
self.send_to_websockets(message)

View File

@ -3,16 +3,16 @@ import logging
from tornado.websocket import WebSocketHandler from tornado.websocket import WebSocketHandler
from tfw.networking import Scope from .tfw_router import TFWRouter
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class ZMQWebSocketRouter(WebSocketHandler): class ZMQWebSocketRouter(WebSocketHandler):
# pylint: disable=abstract-method # pylint: disable=abstract-method,attribute-defined-outside-init
instances = set() instances = set()
def initialize(self, **kwargs): # pylint: disable=arguments-differ def initialize(self, **kwargs):
self.event_handler_connector = kwargs['event_handler_connector'] self.event_handler_connector = kwargs['event_handler_connector']
self.tfw_router = TFWRouter(self.send_to_zmq, self.send_to_websockets) self.tfw_router = TFWRouter(self.send_to_zmq, self.send_to_websockets)
@ -46,24 +46,3 @@ class ZMQWebSocketRouter(WebSocketHandler):
# much secure, very cors, wow # much secure, very cors, wow
def check_origin(self, origin): def check_origin(self, origin):
return True return True
class TFWRouter:
def __init__(self, send_to_zmq, send_to_websockets):
self.send_to_zmq = send_to_zmq
self.send_to_websockets = send_to_websockets
def route(self, message):
scope = Scope(message.pop('scope', 'zmq'))
routing_table = {
Scope.ZMQ: self.send_to_zmq,
Scope.WEBSOCKET: self.send_to_websockets,
Scope.BROADCAST: self.broadcast
}
action = routing_table[scope]
action(message)
def broadcast(self, message):
self.send_to_zmq(message)
self.send_to_websockets(message)

View File

@ -102,7 +102,7 @@ class LogFormatter(Formatter):
class VerboseLogFormatter(Formatter): class VerboseLogFormatter(Formatter):
def format(self, record): def format(self, record): # pylint: disable=no-self-use
date = datetime.utcfromtimestamp(record.created).strftime('%H:%M:%S') date = datetime.utcfromtimestamp(record.created).strftime('%H:%M:%S')
if record.args: if record.args:
message = record.msg % record.args message = record.msg % record.args

4
tfw/main/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .tfw_connector import TFWUplinkConnector, TFWConnector
from .event_handler_factory import EventHandlerFactory
from .signal_handling import setup_signal_handlers
from .tfw_server import TFWServer

View File

@ -0,0 +1,8 @@
from tfw.internals.event_handling import EventHandlerFactoryBase
from .tfw_connector import TFWConnector
class EventHandlerFactory(EventHandlerFactoryBase):
def _build_server_connector(self):
return TFWConnector()

View File

@ -0,0 +1,11 @@
from signal import signal, SIGTERM, SIGINT
from tfw.internals.event_handling import EventHandler
def setup_signal_handlers():
def stop(*_):
EventHandler.stop_all_instances()
exit(0)
signal(SIGTERM, stop)
signal(SIGINT, stop)

View File

@ -1,4 +1,4 @@
from tfw.networking import ServerUplinkConnector, ServerConnector from tfw.internals.networking import ServerConnector, ServerUplinkConnector
from tfw.config import TFWENV from tfw.config import TFWENV
@ -12,12 +12,12 @@ class ConnAddrMixin:
return f'tcp://localhost:{TFWENV.PUB_PORT}' return f'tcp://localhost:{TFWENV.PUB_PORT}'
class TFWServerUplinkConnector(ServerUplinkConnector, ConnAddrMixin): class TFWUplinkConnector(ServerUplinkConnector, ConnAddrMixin):
def __init__(self): def __init__(self):
super().__init__(self.uplink_conn_addr) super().__init__(self.uplink_conn_addr)
class TFWServerConnector(ServerConnector, ConnAddrMixin): class TFWConnector(ServerConnector, ConnAddrMixin):
def __init__(self): def __init__(self):
super().__init__( super().__init__(
self.downlink_conn_addr, self.downlink_conn_addr,

View File

@ -2,11 +2,10 @@ import logging
from tornado.web import Application from tornado.web import Application
from tfw.networking import EventHandlerConnector from tfw.internals.networking import EventHandlerConnector
from tfw.internals.server import ZMQWebSocketRouter
from tfw.config import TFWENV from tfw.config import TFWENV
from .zmq_websocket_router import ZMQWebSocketRouter
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)