baseimage-tutorial-framework/lib/tfw/components/ide_event_handler.py

238 lines
8.8 KiB
Python

# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from os.path import isfile, join, relpath, exists, isdir, realpath
from glob import glob
from fnmatch import fnmatchcase
from collections import Iterable
from tfw import EventHandlerBase
from tfw.mixins import MonitorManagerMixin
from tfw.config.logs import logging
from .directory_monitor import DirectoryMonitor
LOG = logging.getLogger(__name__)
class FileManager: # pylint: disable=too-many-instance-attributes
def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None):
self._exclude, self.exclude = None, exclude
self._allowed_directories, self.allowed_directories = None, allowed_directories
self._workdir, self.workdir = None, working_directory
self._filename, self.filename = None, selected_file or self.files[0]
@property
def exclude(self):
return self._exclude
@exclude.setter
def exclude(self, exclude):
if exclude is None:
return
if not isinstance(exclude, Iterable):
raise TypeError('Exclude must be Iterable!')
self._exclude = exclude
@property
def workdir(self):
return self._workdir
@workdir.setter
def workdir(self, directory):
if not exists(directory) or not isdir(directory):
raise EnvironmentError(f'"{directory}" is not a directory!')
if not self._is_in_whitelisted_dir(directory):
raise EnvironmentError(f'Directory "{directory}" is not in whitelist!')
self._workdir = directory
@property
def allowed_directories(self):
return self._allowed_directories
@allowed_directories.setter
def allowed_directories(self, directories):
self._allowed_directories = directories
@property
def filename(self):
return self._filename
@filename.setter
def filename(self, filename):
if filename not in self.files:
raise EnvironmentError('No such file in workdir!')
self._filename = filename
@property
def files(self):
return [self._relpath(file) for file in glob(join(self._workdir, '**/*'), recursive=True)
if isfile(file) and self._is_in_whitelisted_dir(file) and not self._is_blacklisted(file)]
@property
def file_contents(self):
with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile:
return ifile.read()
@file_contents.setter
def file_contents(self, value):
with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile:
ofile.write(value)
def _is_in_whitelisted_dir(self, path):
return any(realpath(path).startswith(allowed_dir) for allowed_dir in self.allowed_directories)
def _is_blacklisted(self, file):
return any(fnmatchcase(file, blacklisted) for blacklisted in self.exclude)
def _filepath(self, filename):
return join(self._workdir, filename)
def _relpath(self, filename):
return relpath(self._filepath(filename), start=self._workdir)
class IdeEventHandler(EventHandlerBase, MonitorManagerMixin):
# pylint: disable=too-many-arguments
"""
Event handler implementing the backend of our browser based IDE.
By default all files in the directory specified in __init__ are displayed
on the fontend. Note that this is a stateful component.
When any file in the selected directory changes they are automatically refreshed
on the frontend (this is done by listening to inotify events).
This EventHandler accepts messages that have a data['command'] key specifying
a command to be executed.
The API of each command is documented in their respective handler.
"""
def __init__(self, key, directory, allowed_directories, selected_file=None, exclude=None,
additional_watched_directories=None):
"""
:param key: the key this instance should listen to
:param directory: working directory which the EventHandler should serve files from
:param allowed_directories: list of directories that can be switched to using the selectdir command
:param selected_file: file that is selected by default
:param exclude: list of filenames that should not appear between files (for .o, .pyc, etc.)
:param additional_watched_directories: refresh the selected file when files change in these directories
(the working directory is watched by default, this is useful for
symlinks and such)
"""
super().__init__(key)
try:
self.filemanager = FileManager(allowed_directories=allowed_directories, working_directory=directory,
selected_file=selected_file, exclude=exclude)
except IndexError:
raise EnvironmentError(f'No file(s) in IdeEventHandler working_directory "{directory}"!')
self.watched_directories = [self.filemanager.workdir]
if additional_watched_directories:
self.watched_directories.extend(additional_watched_directories)
MonitorManagerMixin.__init__(self, DirectoryMonitor, self.watched_directories)
self.commands = {'read': self.read,
'write': self.write,
'select': self.select,
'selectdir': self.select_dir,
'exclude': self.exclude}
def read(self, data):
"""
Read the currently selected file.
:return dict: message with the contents of the file in data['content']
"""
try:
data['content'] = self.filemanager.file_contents
except PermissionError:
data['content'] = 'You have no permission to open that file :('
except FileNotFoundError:
data['content'] = 'This file was removed :('
except Exception: # pylint: disable=broad-except
data['content'] = 'Failed to read file :('
return data
def write(self, data):
"""
Overwrites a file with the desired string.
:param data: TFW message data containing key 'content'
(new file content)
"""
self.monitor.ignore = self.monitor.ignore + 1
try:
self.filemanager.file_contents = data['content']
except Exception: # pylint: disable=broad-except
LOG.exception('Error writing file!')
del data['content']
return data
def select(self, data):
"""
Selects a file from the current directory.
:param data: TFW message data containing 'filename'
(name of file to select relative to the current directory)
"""
try:
self.filemanager.filename = data['filename']
except EnvironmentError:
LOG.exception('Failed to select file "%s"', data['filename'])
return data
def select_dir(self, data):
"""
Select a new working directory to display files from.
:param data: TFW message data containing 'directory'
(absolute path of diretory to select.
must be a path whitelisted in
self.allowed_directories)
"""
try:
self.filemanager.workdir = data['directory']
self.reload_monitor()
try:
self.filemanager.filename = self.filemanager.files[0]
self.read(data)
except IndexError:
data['content'] = 'No files in this directory :('
except EnvironmentError as err:
LOG.error('Failed to select directory "%s". Reason: %s', data['directory'], str(err))
return data
def exclude(self, data):
"""
Overwrite list of excluded files
:param data: TFW message data containing 'exclude'
(list of unix-style filename patterns to be excluded,
e.g.: ["\*.pyc", "\*.o")
"""
try:
self.filemanager.exclude = list(data['exclude'])
except TypeError:
LOG.error('Exclude must be Iterable!')
return data
def attach_fileinfo(self, data):
"""
Basic information included in every response to the frontend.
"""
data['filename'] = self.filemanager.filename
data['files'] = self.filemanager.files
data['directory'] = self.filemanager.workdir
def handle_event(self, message):
try:
data = message['data']
message['data'] = self.commands[data['command']](data)
self.attach_fileinfo(data)
return message
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
def cleanup(self):
self.monitor.stop()