1
0
mirror of https://github.com/avatao-content/test-tutorial-framework synced 2025-01-15 12:51:56 +00:00

Remove pipe auxlib

This commit is contained in:
Kristóf Tóth 2019-07-12 23:19:22 +02:00
parent e50a8732fc
commit eb60c1e08e

View File

@ -1,235 +0,0 @@
from json import dumps, loads
from tfw.crypto import KeyManager, sign_message, verify_message
from tfw.builtins import PipeIOEventHandlerBase
from tfw.builtins.pipe_io_event_handler import DEFAULT_PERMISSIONS
from tfw.networking.scope import Scope
class SignMessagePipeIOEventHandler(PipeIOEventHandlerBase):
"""
Signs a valid TFW message with HMAC.
Note that the running process needs root permissions in order to read
the authentication key.
When forwarding is true, it will send the signed message to the TFW
server before writing it into the output pipe.
"""
def __init__(
self, in_pipe_path, out_pipe_path,
permissions=DEFAULT_PERMISSIONS,
forwarding=True
):
self.forwarding = forwarding
self.auth_key = KeyManager().auth_key
super().__init__(None, in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
pass
def handle_pipe_event(self, message_bytes):
message = loads(message_bytes)
sign_message(self.auth_key, message)
if self.forwarding:
self.server_connector.send_message(message)
self.pipe_io.send_message(dumps(message).encode())
class VerifyMessagePipeIOEventHandler(PipeIOEventHandlerBase):
"""
Verifies a signed TFW message.
This pipe also needs root permissions. Send the serialized JSON object
to the pipe, then wait for its boolean response.
"""
def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS):
self.auth_key = KeyManager().auth_key
super().__init__(None, in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
pass
def handle_pipe_event(self, message_bytes):
message = loads(message_bytes)
validity = verify_message(self.auth_key, message)
self.pipe_io.send_message(str(validity).lower().encode())
class BotPipeIOEventHandler(PipeIOEventHandlerBase):
"""
Sends bot messages to the frontend.
If you assign @originator, it will be the default message sender.
When you write a line to the pipe, it will be considered as a single
message and gets appended to the queue until an empty line is received,
which triggers forwarding the messages to the TFW server.
"""
def __init__(
self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS,
originator='avataobot'
):
self.queue = []
self.originator = originator
super().__init__(None, in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
pass
def handle_pipe_event(self, message_bytes):
if message_bytes == b"":
if self.queue:
self.server_connector.send_message({
'key': 'queueMessages',
'data': {
'messages': self.queue
}
})
self.queue = []
else:
self.queue.append({
'originator': self.originator,
'message': message_bytes.decode().replace('\\n', '\n')
})
class DeployPipeIOEventHandler(PipeIOEventHandlerBase):
"""
Manages deployment in the IDE.
When you receive "deploy", then you have to answer with a "true" or
"false" depending whether you are satisfied with the result or not.
The @process parameter is the name of the supervised service.
"""
# pylint: disable=too-many-arguments
def __init__(
self, in_pipe_path, out_pipe_path, process,
permissions=DEFAULT_PERMISSIONS
):
self.expected = False
self.process = process
self.onsuccess = {
'key': 'processmanager',
'data': {
'process_name': process,
'command': 'restart'
}
}
self.onerror = {
'key': 'processmanager',
'data': {
'process_name': process,
'error': True
}
}
super().__init__('processmanager', in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
if message == self.onsuccess:
self.expected = True
self.pipe_io.send_message(b'deploy')
def handle_pipe_event(self, message_bytes):
if not self.expected:
raise ValueError(
f'{self.pipe_io.in_pipe}: There is nothing to deploy.'
)
self.expected = False
if message_bytes == b'true':
self.server_connector.send_message(self.onsuccess, scope=Scope.WEBSOCKET)
elif message_bytes == b'false':
self.server_connector.send_message(self.onerror, scope=Scope.WEBSOCKET)
else:
raise ValueError(
f'{self.pipe_io.in_pipe}: Expected "true" or "false".'
)
class IdePipeIOEventHandler(PipeIOEventHandlerBase):
"""
Manipulates the content of the IDE.
You can observe a file, and when the user edits it, you will receive
the new contents where newlines are escaped as "\\n".
In order to overwrite the file, send an escaped text back to the pipe.
Since the pipe doesn't know if the file is selected initially in the IDE,
you have to provide this information by yourself with @selected,
but it will track it later on.
"""
# pylint: disable=too-many-arguments
def __init__(
self, in_pipe_path, out_pipe_path, filename,
permissions=DEFAULT_PERMISSIONS,
selected=True
):
self.selected = selected
self.filename = filename
super().__init__('ide', in_pipe_path, out_pipe_path, permissions)
def handle_event(self, message):
data = message['data']
if data['command'] == 'select':
self.selected = data['filename'] == self.filename
elif data['command'] == 'write' and self.selected:
clean = data['content'].replace('\n', '\\n')
self.pipe_io.send_message(clean.encode())
def handle_pipe_event(self, message_bytes):
if not self.selected:
self.server_connector.send_message({
'key': 'mirror',
'data': {
'key': 'ide',
'data': {
'command': 'select',
'filename': self.filename
}
}
})
self.server_connector.send_message({
'key': 'mirror',
'data': {
'key': 'ide',
'data': {
'command': 'write',
'content': message_bytes.decode().replace('\\n', '\n')
}
}
})
self.server_connector.send_message({
'key': 'mirror',
'data': {
'key': 'ide',
'data': {
'command': 'read'
}
}
})
class FSMPipeIOEventHandler(PipeIOEventHandlerBase):
"""
Handles FSM steps.
When the FSM enters the next state, you will receive a line containing
its name. To trigger a state change, send the name of the transition to
the pipe.
"""
def __init__(self, in_pipe_path, out_pipe_path, permissions=DEFAULT_PERMISSIONS):
super().__init__(
['fsm', 'fsm_update'],
in_pipe_path, out_pipe_path, permissions
)
def handle_event(self, message):
if 'current_state' in message['data']:
self.pipe_io.send_message(message['data']['current_state'].encode())
def handle_pipe_event(self, message_bytes):
self.server_connector.send_message({
'key': '',
'trigger': message_bytes.decode()
})