From d31a850a4e70cce27b328e2c172d84b9c8665f14 Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Wed, 7 Aug 2019 09:44:03 +0200 Subject: [PATCH] Simplify IDE handler and file manager --- .../ide/file_manager/file_manager.py | 123 +++++--------- .../ide/file_manager/test_file_manager.py | 147 ++++++----------- tfw/components/ide/ide_handler.py | 156 ++++-------------- 3 files changed, 127 insertions(+), 299 deletions(-) diff --git a/tfw/components/ide/file_manager/file_manager.py b/tfw/components/ide/file_manager/file_manager.py index 0e4fc6a..e416cb9 100644 --- a/tfw/components/ide/file_manager/file_manager.py +++ b/tfw/components/ide/file_manager/file_manager.py @@ -1,93 +1,56 @@ -from typing import Iterable +from functools import wraps from glob import glob from fnmatch import fnmatchcase -from os.path import basename, isfile, join, relpath, exists, isdir, realpath +from os.path import dirname, isdir, isfile, realpath -class FileManager: # pylint: disable=too-many-instance-attributes - def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None): - self._exclude, self.exclude = [], exclude - self._allowed_directories, self.allowed_directories = None, allowed_directories - self._workdir, self.workdir = None, working_directory - self._filename, self.filename = None, selected_file or self.files[0] +def _with_is_allowed(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + if self._is_allowed(args[0]): # pylint: disable=protected-access + return func(self, *args, **kwargs) + raise ValueError('Forbidden path.') + return wrapper - @property - def exclude(self): - return self._exclude - @exclude.setter - def exclude(self, exclude): - if exclude is None: - return - if not isinstance(exclude, Iterable): - raise TypeError('Exclude must be Iterable!') - self._exclude = exclude - - @property - def workdir(self): - return self._workdir - - @workdir.setter - def workdir(self, directory): - if not exists(directory) or not isdir(directory): - raise EnvironmentError(f'"{directory}" is not a directory!') - if not self._is_in_allowed_dir(directory): - raise EnvironmentError(f'Directory "{directory}" is not allowed!') - self._workdir = directory - - @property - def allowed_directories(self): - return self._allowed_directories - - @allowed_directories.setter - def allowed_directories(self, directories): - self._allowed_directories = [realpath(directory) for directory in directories] - - @property - def filename(self): - return self._filename - - @filename.setter - def filename(self, filename): - if filename not in self.files: - raise EnvironmentError('No such file in workdir!') - self._filename = filename +class FileManager: # pylint: disable=too-many-instance-attributes + def __init__(self, patterns): + self.patterns = patterns @property def files(self): - return [ - self._relpath(file) - for file in glob(join(self._workdir, '**/*'), recursive=True) - if isfile(file) - and self._is_in_allowed_dir(file) - and not self._is_blacklisted(file) - ] + return list(set( + path + for pattern in self.patterns + for path in glob(pattern, recursive=True) + if isfile(path) and self._is_allowed(path) + )) @property - def file_contents(self): - with open(self._filepath(self.filename), 'rb', buffering=0) as ifile: + def parents(self): + return list(set( + self._find_directory(pattern) + for pattern in self.patterns + )) + + @staticmethod + def _find_directory(pattern): + while pattern and not isdir(pattern): + pattern = dirname(pattern) + return pattern + + def _is_allowed(self, filepath): + return any( + fnmatchcase(realpath(filepath), pattern) + for pattern in self.patterns + ) + + @_with_is_allowed + def read_file(self, filepath): # pylint: disable=no-self-use + with open(filepath, 'rb', buffering=0) as ifile: return ifile.read().decode(errors='surrogateescape') - @file_contents.setter - def file_contents(self, value): - with open(self._filepath(self.filename), 'wb', buffering=0) as ofile: - ofile.write(value.encode()) - - def _is_in_allowed_dir(self, path): - return any( - realpath(path).startswith(allowed_dir) - for allowed_dir in self.allowed_directories - ) - - def _is_blacklisted(self, file): - return any( - fnmatchcase(file, blacklisted) or - fnmatchcase(basename(file), blacklisted) - for blacklisted in self.exclude - ) - - def _filepath(self, filename): - return join(self._workdir, filename) - - def _relpath(self, filename): - return relpath(self._filepath(filename), start=self._workdir) + @_with_is_allowed + def write_file(self, filepath, contents): # pylint: disable=no-self-use + with open(filepath, 'wb', buffering=0) as ofile: + ofile.write(contents.encode()) diff --git a/tfw/components/ide/file_manager/test_file_manager.py b/tfw/components/ide/file_manager/test_file_manager.py index 8f978d3..08e4f25 100644 --- a/tfw/components/ide/file_manager/test_file_manager.py +++ b/tfw/components/ide/file_manager/test_file_manager.py @@ -1,8 +1,8 @@ # pylint: disable=redefined-outer-name from dataclasses import dataclass from secrets import token_urlsafe +from os import mkdir, symlink from os.path import join -from os import chdir, mkdir, symlink from pathlib import Path from tempfile import TemporaryDirectory @@ -13,112 +13,75 @@ from .file_manager import FileManager @dataclass class ManagerContext: - folder: str + workdir: str + subdir: str + subfile: str manager: FileManager - def join(self, path): - return join(self.folder, path) + def create_random_file(self, dirname, extension): + filename = self.join(f'{dirname}/{generate_name()}{extension}') + Path(filename).touch() + return filename + def create_random_folder(self, basepath): + dirname = self.join(f'{basepath}/{generate_name()}') + mkdir(dirname) + return dirname + + def create_random_link(self, source, dirname, extension): + linkname = self.join(f'{dirname}/{generate_name()}{extension}') + symlink(source, linkname) + return linkname + + def join(self, path): + return join(self.workdir, path) + +def generate_name(): + return token_urlsafe(16) @pytest.fixture() def context(): - dirs = {} - with TemporaryDirectory() as workdir: - chdir(workdir) - for name in ['allowed', 'excluded', 'invis']: - node = join(workdir, name) - mkdir(node) - Path(join(node, 'empty.txt')).touch() - Path(join(node, 'empty.bin')).touch() - dirs[name] = node + subdir = join(workdir, generate_name()) + subfile = join(subdir, generate_name() + '.txt') + mkdir(subdir) + Path(subfile).touch() + manager = FileManager([join(workdir, '**/*.txt')]) + yield ManagerContext(workdir, subdir, subfile, manager) - yield ManagerContext( - workdir, - FileManager( - dirs['allowed'], - [dirs['allowed'], dirs['excluded']], - exclude=['*/excluded/*'] - ) - ) +def test_matching_files(context): + newdir = context.create_random_folder(context.subdir) + newfile = context.create_random_file(newdir, '.txt') + newlink = context.create_random_link(newfile, newdir, '.txt') + assert set(context.manager.files) == {context.subfile, newfile, newlink} -@pytest.mark.parametrize('subdir', ['allowed/', 'excluded/']) -def test_select_allowed_dirs(context, subdir): - context.manager.workdir = context.join(subdir) - assert context.manager.workdir == context.join(subdir) - newdir = context.join(subdir+'deep') - mkdir(newdir) - context.manager.workdir = newdir - assert context.manager.workdir == newdir +def test_unmatching_files(context): + newtxt = context.create_random_file(context.workdir, '.txt') + newbin = context.create_random_file(context.subdir, '.bin') + context.create_random_link(newtxt, context.subdir, '.txt') + context.create_random_link(newbin, context.subdir, '.txt') + assert context.manager.files == [context.subfile] -@pytest.mark.parametrize('invdir', ['', 'invis']) -def test_select_forbidden_dirs(context, invdir): - fullpath = context.join(invdir) - with pytest.raises(OSError): - context.manager.workdir = fullpath - assert context.manager.workdir != fullpath - context.manager.allowed_directories += [fullpath] - context.manager.workdir = fullpath - assert context.manager.workdir == fullpath - - -@pytest.mark.parametrize('filename', ['another.txt', '*.txt']) -def test_select_allowed_files(context, filename): - Path(context.join('allowed/'+filename)).touch() - assert filename in context.manager.files - context.manager.filename = filename - assert context.manager.filename == filename - -@pytest.mark.parametrize('path', [ - {'dir': 'allowed/', 'file': 'illegal.bin'}, - {'dir': 'excluded/', 'file': 'legal.txt'}, - {'dir': 'allowed/', 'file': token_urlsafe(16)+'.bin'}, - {'dir': 'excluded/', 'file': token_urlsafe(16)+'.txt'}, - {'dir': 'allowed/', 'file': token_urlsafe(32)+'.bin'}, - {'dir': 'excluded/', 'file': token_urlsafe(32)+'.txt'} -]) -def test_select_excluded_files(context, path): - context.manager.workdir = context.join(path['dir']) - context.manager.exclude = ['*/excluded/*', '*.bin'] - Path(context.join(path['dir']+path['file'])).touch() - assert path['file'] not in context.manager.files - with pytest.raises(OSError): - context.manager.filename = path['file'] - -@pytest.mark.parametrize('path', [ - {'src': 'excluded/empty.txt', 'dst': 'allowed/link.txt'}, - {'src': 'invis/empty.txt', 'dst': 'allowed/link.txt'}, - {'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'}, - {'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'}, - {'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'}, - {'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'} -]) -def test_select_excluded_symlinks(context, path): - symlink(context.join(path['src']), context.join(path['dst'])) - assert path['dst'] not in context.manager.files +def test_parents(context): + newdir = context.create_random_folder(context.workdir) + context.manager.patterns += [f'{newdir}/[!/@]*/**/?.c'] + assert set(context.manager.parents) == {context.workdir, newdir} def test_read_write_file(context): for _ in range(128): - context.manager.filename = 'empty.txt' content = token_urlsafe(32) - context.manager.file_contents = content - assert context.manager.file_contents == content - with open(context.join('allowed/empty.txt'), 'r') as ifile: + context.manager.write_file(context.subfile, content) + assert context.manager.read_file(context.subfile) == content + with open(context.subfile, 'r') as ifile: assert ifile.read() == content def test_regular_ide_actions(context): - context.manager.workdir = context.join('allowed') - newfile1, newfile2 = token_urlsafe(16), token_urlsafe(16) - Path(context.join(f'allowed/{newfile1}')).touch() - Path(context.join(f'allowed/{newfile2}')).touch() - for _ in range(8): + newfile1 = context.create_random_file(context.subdir, '.txt') + newfile2 = context.create_random_file(context.subdir, '.txt') + for _ in range(4): context.manager.filename = newfile1 - content1 = token_urlsafe(32) - context.manager.file_contents = content1 - context.manager.filename = newfile2 - content2 = token_urlsafe(32) - context.manager.file_contents = content2 - context.manager.filename = newfile1 - assert context.manager.file_contents == content1 - context.manager.filename = newfile2 - assert context.manager.file_contents == content2 + content1, content2 = token_urlsafe(32), token_urlsafe(32) + context.manager.write_file(newfile1, content1) + context.manager.write_file(newfile2, content2) + assert context.manager.read_file(newfile1) == content1 + assert context.manager.read_file(newfile2) == content2 diff --git a/tfw/components/ide/ide_handler.py b/tfw/components/ide/ide_handler.py index 38f7842..f7435d6 100644 --- a/tfw/components/ide/ide_handler.py +++ b/tfw/components/ide/ide_handler.py @@ -1,4 +1,5 @@ import logging +from os.path import isfile from tfw.internals.networking import Scope from tfw.internals.inotify import InotifyObserver @@ -32,161 +33,62 @@ BUILD_ARTIFACTS = ( class IdeHandler: - keys = ['ide'] - # pylint: disable=too-many-arguments,anomalous-backslash-in-string - """ - Event handler implementing the backend of our browser based IDE. - By default all files in the directory specified in __init__ are displayed - on the fontend. Note that this is a stateful component. + keys = ['ide.read', 'ide.write'] - When any file in the selected directory changes they are automatically refreshed - on the frontend (this is done by listening to inotify events). - - This EventHandler accepts messages that have a data['command'] key specifying - a command to be executed. - - The API of each command is documented in their respective handler. - """ - def __init__(self, *, directory, allowed_directories, selected_file=None, exclude=None): - """ - :param key: the key this instance should listen to - :param directory: working directory which the EventHandler should serve files from - :param allowed_directories: list of directories that can be switched to using selectdir - :param selected_file: file that is selected by default - :param exclude: list of filenames that should not appear between files (for .o, .pyc, etc.) - """ + def __init__(self, *, patterns, initial_file=''): self.connector = None - try: - self.filemanager = FileManager( - allowed_directories=allowed_directories, - working_directory=directory, - selected_file=selected_file, - exclude=exclude - ) - except IndexError: - raise EnvironmentError( - f'No file(s) in IdeEventHandler working_directory "{directory}"!' - ) + self.filemanager = FileManager(patterns) + self._initial_file = initial_file self.monitor = InotifyObserver( - self.filemanager.allowed_directories, + path=self.filemanager.parents, exclude=BUILD_ARTIFACTS ) self.monitor.on_modified = self._reload_frontend self.monitor.start() self.commands = { - 'read': self.read, - 'write': self.write, - 'select': self.select, - 'selectdir': self.select_dir, - 'exclude': self.exclude + 'ide.read' : self.read, + 'ide.write' : self.write } + @property + def initial_file(self): + if not isfile(self._initial_file): + self._initial_file = self.filemanager.files[0] + return self._initial_file + def _reload_frontend(self, event): # pylint: disable=unused-argument - self.send_message({ - 'key': 'ide', - 'data': {'command': 'reload'} - }) + self.send_message({'key': 'ide.reload'}) def send_message(self, message): self.connector.send_message(message, scope=Scope.WEBSOCKET) - def read(self, data): - """ - Read the currently selected file. - - :return dict: TFW message data containing key 'content' - (contents of the selected file) - """ + def read(self, message): + if message.get('files'): + self.filemanager.patterns = message['files'] try: - data['content'] = self.filemanager.file_contents + message['content'] = self.filemanager.read_file(message['filename']) except PermissionError: - data['content'] = 'You have no permission to open that file :(' + message['content'] = 'You have no permission to open that file :(' except FileNotFoundError: - data['content'] = 'This file was removed :(' + message['content'] = 'This file was removed :(' except Exception: # pylint: disable=broad-except - data['content'] = 'Failed to read file :(' - return data + message['content'] = 'Failed to read file :(' - def write(self, data): - """ - Overwrites a file with the desired string. - - :param data: TFW message data containing key 'content' - (new file content) - - """ + def write(self, message): try: - self.filemanager.file_contents = data['content'] + self.filemanager.write_file(message['filename'], message['content']) except Exception: # pylint: disable=broad-except LOG.exception('Error writing file!') - del data['content'] - return data - - def select(self, data): - """ - Selects a file from the current directory. - - :param data: TFW message data containing 'filename' - (name of file to select relative to the current directory) - """ - try: - self.filemanager.filename = data['filename'] - except EnvironmentError: - LOG.exception('Failed to select file "%s"', data['filename']) - return data - - def select_dir(self, data): - """ - Select a new working directory to display files from. - - :param data: TFW message data containing 'directory' - (absolute path of diretory to select. - must be a path whitelisted in - self.allowed_directories) - """ - try: - self.filemanager.workdir = data['directory'] - try: - self.filemanager.filename = self.filemanager.files[0] - self.read(data) - except IndexError: - data['content'] = 'No files in this directory :(' - except EnvironmentError as err: - LOG.error( - 'Failed to select directory "%s". Reason: %s', - data['directory'], str(err) - ) - return data - - def exclude(self, data): - """ - Overwrite list of excluded files - - :param data: TFW message data containing 'exclude' - (list of unix-style filename patterns to be excluded, - e.g.: ["\*.pyc", "\*.o") - """ - try: - self.filemanager.exclude = list(data['exclude']) - except TypeError: - LOG.error('Exclude must be Iterable!') - return data - - def attach_fileinfo(self, data): - """ - Basic information included in every response to the frontend. - """ - data['filename'] = self.filemanager.filename - data['files'] = self.filemanager.files - data['directory'] = self.filemanager.workdir + del message['content'] def handle_event(self, message, _): try: - data = message['data'] - message['data'] = self.commands[data['command']](data) - self.attach_fileinfo(data) + if message['filename'] == '': + message['filename'] = self.initial_file + self.commands[message['key']](message) + message['files'] = self.filemanager.files self.send_message(message) except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)