1
0
mirror of https://github.com/avatao-content/test-tutorial-framework synced 2025-10-25 07:12:55 +00:00

Merge pull request #13 from avatao-content/pipeio

Pipeio
This commit is contained in:
therealkrispet
2019-05-28 14:53:22 +02:00
committed by GitHub
4 changed files with 326 additions and 6 deletions

View File

@@ -1,9 +1,10 @@
from ast import literal_eval
from functools import partial
from signal import signal, SIGTERM, SIGINT
from tornado.ioloop import IOLoop
from tfw import YamlFSM, FSMAwareEventHandler
from tfw import YamlFSM, FSMAwareEventHandler, EventHandlerBase
from tfw.components import IdeEventHandler, TerminalEventHandler
from tfw.components import ProcessManagingEventHandler, BashMonitor
from tfw.components import TerminalCommands, LogMonitoringEventHandler
@@ -136,9 +137,12 @@ if __name__ == '__main__':
# Example terminal command callback
terminal.historymonitor.subscribe_callback(cenator)
try:
IOLoop.instance().start()
finally:
eventhandlers = {fsm, ide, terminal, processmanager, logmonitor, message_fsm_steps}
for eh in eventhandlers:
event_handlers = EventHandlerBase.get_local_instances()
def cleanup(sig, frame):
for eh in event_handlers:
eh.cleanup()
exit(0)
signal(SIGTERM, cleanup)
signal(SIGINT, cleanup)
IOLoop.instance().start()

View File

@@ -0,0 +1,235 @@
from json import dumps, loads
from tfw.crypto import KeyManager, sign_message, verify_message
from tfw.components import PipeIOEventHandlerBase
from tfw.components.pipe_io_event_handler import DEFAULT_PERMISSIONS
class SignMessagePipeIOEventHandler(PipeIOEventHandlerBase):
"""
Signs a valid TFW message with HMAC.
Note that the running process needs root permissions in order to read
the authentication key.
When forwarding is true, it will send the signed message to the TFW
server before writing it into the output pipe.
"""
def __init__(
self, in_pipe_path, out_pipe_path,
permissions=DEFAULT_PERMISSIONS,
forwarding=True
):
self.forwarding = forwarding
self.auth_key = KeyManager().auth_key
super().__init__(None, in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
pass
def handle_pipe_event(self, message_bytes):
message = loads(message_bytes)
sign_message(self.auth_key, message)
if self.forwarding:
self.server_connector.send(message)
self.pipe_io.send_message(dumps(message).encode())
class VerifyMessagePipeIOEventHandler(PipeIOEventHandlerBase):
"""
Verifies a signed TFW message.
This pipe also needs root permissions. Send the serialized JSON object
to the pipe, then wait for its boolean response.
"""
def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS):
self.auth_key = KeyManager().auth_key
super().__init__(None, in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
pass
def handle_pipe_event(self, message_bytes):
message = loads(message_bytes)
validity = verify_message(self.auth_key, message)
self.pipe_io.send_message(str(validity).lower().encode())
class BotPipeIOEventHandler(PipeIOEventHandlerBase):
"""
Sends bot messages to the frontend.
If you assign @originator, it will be the default message sender.
When you write a line to the pipe, it will be considered as a single
message and gets appended to the queue until an empty line is received,
which triggers forwarding the messages to the TFW server.
"""
def __init__(
self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS,
originator='avataobot'
):
self.queue = []
self.originator = originator
super().__init__(None, in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
pass
def handle_pipe_event(self, message_bytes):
if message_bytes == b"":
if self.queue:
self.server_connector.send({
'key': 'queueMessages',
'data': {
'messages': self.queue
}
})
self.queue = []
else:
self.queue.append({
'originator': self.originator,
'message': message_bytes.decode().replace('\\n', '\n')
})
class DeployPipeIOEventHandler(PipeIOEventHandlerBase):
"""
Manages deployment in the IDE.
When you receive "deploy", then you have to answer with a "true" or
"false" depending whether you are satisfied with the result or not.
The @process parameter is the name of the supervised service.
"""
# pylint: disable=too-many-arguments
def __init__(
self, in_pipe_path, out_pipe_path, process,
permissions=DEFAULT_PERMISSIONS
):
self.expected = False
self.process = process
self.onsuccess = {
'key': 'processmanager',
'data': {
'process_name': process,
'command': 'restart'
}
}
self.onerror = {
'key': 'processmanager',
'data': {
'process_name': process,
'error': True
}
}
super().__init__('processmanager', in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
if message == self.onsuccess:
self.expected = True
self.pipe_io.send_message(b'deploy')
def handle_pipe_event(self, message_bytes):
if not self.expected:
raise ValueError(
f'{self.pipe_io.in_pipe}: There is nothing to deploy.'
)
self.expected = False
if message_bytes == b'true':
self.server_connector.send(self.onsuccess)
elif message_bytes == b'false':
self.server_connector.send(self.onerror)
else:
raise ValueError(
f'{self.pipe_io.in_pipe}: Expected "true" or "false".'
)
class IdePipeIOEventHandler(PipeIOEventHandlerBase):
"""
Manipulates the content of the IDE.
You can observe a file, and when the user edits it, you will receive
the new contents where newlines are escaped as "\\n".
In order to overwrite the file, send an escaped text back to the pipe.
Since the pipe doesn't know if the file is selected initially in the IDE,
you have to provide this information by yourself with @selected,
but it will track it later on.
"""
# pylint: disable=too-many-arguments
def __init__(
self, in_pipe_path, out_pipe_path, filename,
permissions=DEFAULT_PERMISSIONS,
selected=True
):
self.selected = selected
self.filename = filename
super().__init__('ide', in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
data = message['data']
if data['command'] == 'select':
self.selected = data['filename'] == self.filename
elif data['command'] == 'write' and self.selected:
clean = data['content'].replace('\n', '\\n')
self.pipe_io.send_message(clean.encode())
def handle_pipe_event(self, message_bytes):
if not self.selected:
self.server_connector.send({
'key': 'mirror',
'data': {
'key': 'ide',
'data': {
'command': 'select',
'filename': self.filename
}
}
})
self.server_connector.send({
'key': 'mirror',
'data': {
'key': 'ide',
'data': {
'command': 'write',
'content': message_bytes.decode().replace('\\n', '\n')
}
}
})
self.server_connector.send({
'key': 'mirror',
'data': {
'key': 'ide',
'data': {
'command': 'read'
}
}
})
class FSMPipeIOEventHandler(PipeIOEventHandlerBase):
"""
Handles FSM steps.
When the FSM enters the next state, you will receive a line containing
its name. To trigger a state change, send the name of the transition to
the pipe.
"""
def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS):
super().__init__(
['fsm', 'fsm_update'],
in_pipe_path, out_pipe_path, permissions
)
def handle_event(self, message):
if 'current_state' in message['data']:
self.pipe_io.send_message(message['data']['current_state'].encode())
def handle_pipe_event(self, message_bytes):
self.server_connector.send({
'key': '',
'trigger': message_bytes.decode()
})

View File

@@ -0,0 +1,77 @@
from signal import signal, SIGTERM, SIGINT
from tornado.ioloop import IOLoop
from tfw import EventHandlerBase
from tfw.components import PipeIOEventHandler
from pipe_io_auxlib import (
SignMessagePipeIOEventHandler, VerifyMessagePipeIOEventHandler,
BotPipeIOEventHandler,
DeployPipeIOEventHandler, IdePipeIOEventHandler,
FSMPipeIOEventHandler
)
if __name__ == '__main__':
"""
Creates general purpose pipes.
The first parameter associates the receiving pipe with a key, which is
an empty string in this case. It has a special meaning, you can
subscribe to every kind of message with this key.
If you wish to filter incoming data, specify a single or more keys in
a list, eg.: processmanager, ide, key...
You can send/receive JSON messages to/from the TFW server as any user,
because we gave read+write permissions, without that parameter, only
the owner has access to the pipes.
"""
json_pipe = PipeIOEventHandler(
'',
'/tmp/tfw_json_send',
'/tmp/tfw_json_recv',
permissions=0o666
)
sign_pipe = SignMessagePipeIOEventHandler(
'/tmp/tfw_sign_send',
'/tmp/tfw_sign_recv',
forwarding=True
)
verify_pipe = VerifyMessagePipeIOEventHandler(
'/tmp/tfw_verify_send',
'/tmp/tfw_verify_recv'
)
bot_pipe = BotPipeIOEventHandler(
'/tmp/tfw_bot_send',
'/tmp/tfw_bot_recv'
)
deploy_pipe = DeployPipeIOEventHandler(
'/tmp/tfw_deploy_send',
'/tmp/tfw_deploy_recv',
'webservice'
)
ide_pipe = IdePipeIOEventHandler(
'/tmp/tfw_ide_send',
'/tmp/tfw_ide_recv',
'user_ops.py',
selected=True
)
fsm_pipe = FSMPipeIOEventHandler(
'/tmp/tfw_fsm_send',
'/tmp/tfw_fsm_recv'
)
event_handlers = EventHandlerBase.get_local_instances()
def cleanup(sig, frame):
for eh in event_handlers:
eh.cleanup()
exit(0)
signal(SIGTERM, cleanup)
signal(SIGINT, cleanup)
IOLoop.instance().start()

View File

@@ -0,0 +1,4 @@
[program:pipe_io_main]
user=root
directory=%(ENV_TFW_EHMAIN_DIR)s
command=python3 pipe_io_main.py