From 9cb8ef0e7276051d73b7b8bc65e4a65026d318b6 Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Mon, 20 May 2019 14:52:02 +0200 Subject: [PATCH] Add unit test for file manager --- .../ide_event_handler/file_manager.py | 93 ++++++++++++++++++ .../ide_event_handler/ide_event_handler.py | 91 +---------------- .../ide_event_handler/test_filemanager.py | 97 +++++++++++++++++++ 3 files changed, 192 insertions(+), 89 deletions(-) create mode 100644 lib/tfw/components/ide_event_handler/file_manager.py create mode 100644 lib/tfw/components/ide_event_handler/test_filemanager.py diff --git a/lib/tfw/components/ide_event_handler/file_manager.py b/lib/tfw/components/ide_event_handler/file_manager.py new file mode 100644 index 0000000..59431ea --- /dev/null +++ b/lib/tfw/components/ide_event_handler/file_manager.py @@ -0,0 +1,93 @@ +from typing import Iterable +from glob import glob +from fnmatch import fnmatchcase +from os.path import basename, isfile, join, relpath, exists, isdir, realpath + + +class FileManager: # pylint: disable=too-many-instance-attributes + def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None): + self._exclude, self.exclude = [], exclude + self._allowed_directories, self.allowed_directories = None, allowed_directories + self._workdir, self.workdir = None, working_directory + self._filename, self.filename = None, selected_file or self.files[0] + + @property + def exclude(self): + return self._exclude + + @exclude.setter + def exclude(self, exclude): + if exclude is None: + return + if not isinstance(exclude, Iterable): + raise TypeError('Exclude must be Iterable!') + self._exclude = exclude + + @property + def workdir(self): + return self._workdir + + @workdir.setter + def workdir(self, directory): + if not exists(directory) or not isdir(directory): + raise EnvironmentError(f'"{directory}" is not a directory!') + if not self._is_in_allowed_dir(directory): + raise EnvironmentError(f'Directory "{directory}" is not allowed!') + self._workdir = directory + + @property + def allowed_directories(self): + return self._allowed_directories + + @allowed_directories.setter + def allowed_directories(self, directories): + self._allowed_directories = directories + + @property + def filename(self): + return self._filename + + @filename.setter + def filename(self, filename): + if filename not in self.files: + raise EnvironmentError('No such file in workdir!') + self._filename = filename + + @property + def files(self): + return [ + self._relpath(file) + for file in glob(join(self._workdir, '**/*'), recursive=True) + if isfile(file) + and self._is_in_allowed_dir(file) + and not self._is_blacklisted(file) + ] + + @property + def file_contents(self): + with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile: + return ifile.read() + + @file_contents.setter + def file_contents(self, value): + with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile: + ofile.write(value) + + def _is_in_allowed_dir(self, path): + return any( + realpath(path).startswith(allowed_dir) + for allowed_dir in self.allowed_directories + ) + + def _is_blacklisted(self, file): + return any( + fnmatchcase(file, blacklisted) or + fnmatchcase(basename(file), blacklisted) + for blacklisted in self.exclude + ) + + def _filepath(self, filename): + return join(self._workdir, filename) + + def _relpath(self, filename): + return relpath(self._filepath(filename), start=self._workdir) diff --git a/lib/tfw/components/ide_event_handler/ide_event_handler.py b/lib/tfw/components/ide_event_handler/ide_event_handler.py index 19d8652..218d3ed 100644 --- a/lib/tfw/components/ide_event_handler/ide_event_handler.py +++ b/lib/tfw/components/ide_event_handler/ide_event_handler.py @@ -11,98 +11,11 @@ from tfw.event_handlers import FrontendEventHandlerBase from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin from tfw.components.directory_monitor import DirectoryMonitor +from .file_manager import FileManager + LOG = logging.getLogger(__name__) -class FileManager: # pylint: disable=too-many-instance-attributes - def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None): - self._exclude, self.exclude = [], exclude - self._allowed_directories, self.allowed_directories = None, allowed_directories - self._workdir, self.workdir = None, working_directory - self._filename, self.filename = None, selected_file or self.files[0] - - @property - def exclude(self): - return self._exclude - - @exclude.setter - def exclude(self, exclude): - if exclude is None: - return - if not isinstance(exclude, Iterable): - raise TypeError('Exclude must be Iterable!') - self._exclude = exclude - - @property - def workdir(self): - return self._workdir - - @workdir.setter - def workdir(self, directory): - if not exists(directory) or not isdir(directory): - raise EnvironmentError(f'"{directory}" is not a directory!') - if not self._is_in_allowed_dir(directory): - raise EnvironmentError(f'Directory "{directory}" is not allowed!') - self._workdir = directory - - @property - def allowed_directories(self): - return self._allowed_directories - - @allowed_directories.setter - def allowed_directories(self, directories): - self._allowed_directories = directories - - @property - def filename(self): - return self._filename - - @filename.setter - def filename(self, filename): - if filename not in self.files: - raise EnvironmentError('No such file in workdir!') - self._filename = filename - - @property - def files(self): - return [ - self._relpath(file) - for file in glob(join(self._workdir, '**/*'), recursive=True) - if isfile(file) - and self._is_in_allowed_dir(file) - and not self._is_blacklisted(file) - ] - - @property - def file_contents(self): - with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile: - return ifile.read() - - @file_contents.setter - def file_contents(self, value): - with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile: - ofile.write(value) - - def _is_in_allowed_dir(self, path): - return any( - realpath(path).startswith(allowed_dir) - for allowed_dir in self.allowed_directories - ) - - def _is_blacklisted(self, file): - return any( - fnmatchcase(file, blacklisted) or - fnmatchcase(basename(file), blacklisted) - for blacklisted in self.exclude - ) - - def _filepath(self, filename): - return join(self._workdir, filename) - - def _relpath(self, filename): - return relpath(self._filepath(filename), start=self._workdir) - - class IdeEventHandler(FrontendEventHandlerBase, MonitorManagerMixin): # pylint: disable=too-many-arguments,anomalous-backslash-in-string """ diff --git a/lib/tfw/components/ide_event_handler/test_filemanager.py b/lib/tfw/components/ide_event_handler/test_filemanager.py new file mode 100644 index 0000000..d62f3e0 --- /dev/null +++ b/lib/tfw/components/ide_event_handler/test_filemanager.py @@ -0,0 +1,97 @@ +# pylint: disable=redefined-outer-name + +from secrets import token_urlsafe +from pathlib import Path +from shutil import rmtree +from os.path import join, realpath +from os import mkdir, rmdir, remove, symlink + +import pytest + +from filemanager import FileManager + + +WORKDIR = realpath('test_filemanager') + +def workdir_pref(path): + return join(WORKDIR, path) + +@pytest.fixture(scope='module') +def manager(): + dirs = [] + mkdir(WORKDIR) + + for i in range(3): + node = workdir_pref('dir_'+str(i).zfill(2)) + mkdir(node) + Path(join(node, 'empty.txt')).touch() + Path(join(node, 'empty.bin')).touch() + dirs.append(node) + + yield FileManager(dirs[0], dirs[:-1], exclude=['*/dir_01/*', '*.bin']) + rmtree(WORKDIR) + +@pytest.mark.parametrize('subdir', ['dir_00', 'dir_01']) +def test_select_allowed_dirs(manager, subdir): + manager.workdir = workdir_pref(subdir) + assert manager.workdir == workdir_pref(subdir) + newdir = workdir_pref(join(subdir, 'deep')) + mkdir(newdir) + manager.workdir = newdir + assert manager.workdir == newdir + rmdir(newdir) + +@pytest.mark.parametrize('excdir', ['/', workdir_pref('dir_02')]) +def test_select_excluded_dirs(manager, excdir): + allowed = manager.allowed_directories + with pytest.raises(OSError): + manager.workdir = excdir + assert manager.workdir != excdir + manager.allowed_directories = allowed+[excdir] + manager.workdir = excdir + assert manager.workdir == excdir + manager.allowed_directories = allowed + +@pytest.mark.parametrize('filename', ['another.txt', '*.txt']) +def test_select_allowed_files(manager, filename): + manager.workdir = workdir_pref('dir_00') + newfile = workdir_pref(join('dir_00', filename)) + Path(newfile).touch() + assert filename in manager.files + manager.filename = filename + assert manager.filename == filename + remove(newfile) + +@pytest.mark.parametrize('path', [ + ['dir_00', 'illegal.bin'], + ['dir_01', 'legal.txt'] +]) +def test_select_excluded_files(manager, path): + manager.workdir = workdir_pref(path[0]) + newfile = workdir_pref(join(path[0], path[1])) + Path(newfile).touch() + assert path[1] not in manager.files + with pytest.raises(OSError): + manager.filename = path[1] + remove(newfile) + +@pytest.mark.parametrize('path', [ + ['dir_02/empty.txt', 'dir_00/link.txt'], + ['dir_01/empty.txt', 'dir_00/link.bin'] +]) +def test_select_excluded_symlinks(manager, path): + manager.workdir = workdir_pref('dir_00') + link = workdir_pref(path[1]) + symlink(workdir_pref(path[0]), link) + assert path[1] not in manager.files + remove(link) + +def test_read_write_file(manager): + for _ in range(128): + manager.workdir = workdir_pref('dir_00') + manager.filename = 'empty.txt' + content = token_urlsafe(32) + manager.file_contents = content + assert manager.file_contents == content + with open(workdir_pref('dir_00/empty.txt'), "r") as ifile: + assert ifile.read() == content