From a69031015b4b5a2f451a1d5cf24a4dda7196364f Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Mon, 20 May 2019 14:50:02 +0200 Subject: [PATCH 1/4] Refactor file manager --- .../components/{ => ide_event_handler}/ide_event_handler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) rename lib/tfw/components/{ => ide_event_handler}/ide_event_handler.py (98%) diff --git a/lib/tfw/components/ide_event_handler.py b/lib/tfw/components/ide_event_handler/ide_event_handler.py similarity index 98% rename from lib/tfw/components/ide_event_handler.py rename to lib/tfw/components/ide_event_handler/ide_event_handler.py index da32cda..19d8652 100644 --- a/lib/tfw/components/ide_event_handler.py +++ b/lib/tfw/components/ide_event_handler/ide_event_handler.py @@ -16,7 +16,7 @@ LOG = logging.getLogger(__name__) class FileManager: # pylint: disable=too-many-instance-attributes def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None): - self._exclude, self.exclude = None, exclude + self._exclude, self.exclude = [], exclude self._allowed_directories, self.allowed_directories = None, allowed_directories self._workdir, self.workdir = None, working_directory self._filename, self.filename = None, selected_file or self.files[0] @@ -91,7 +91,8 @@ class FileManager: # pylint: disable=too-many-instance-attributes def _is_blacklisted(self, file): return any( - fnmatchcase(file, blacklisted) + fnmatchcase(file, blacklisted) or + fnmatchcase(basename(file), blacklisted) for blacklisted in self.exclude ) From 9cb8ef0e7276051d73b7b8bc65e4a65026d318b6 Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Mon, 20 May 2019 14:52:02 +0200 Subject: [PATCH 2/4] Add unit test for file manager --- .../ide_event_handler/file_manager.py | 93 ++++++++++++++++++ .../ide_event_handler/ide_event_handler.py | 91 +---------------- .../ide_event_handler/test_filemanager.py | 97 +++++++++++++++++++ 3 files changed, 192 insertions(+), 89 deletions(-) create mode 100644 lib/tfw/components/ide_event_handler/file_manager.py create mode 100644 lib/tfw/components/ide_event_handler/test_filemanager.py diff --git a/lib/tfw/components/ide_event_handler/file_manager.py b/lib/tfw/components/ide_event_handler/file_manager.py new file mode 100644 index 0000000..59431ea --- /dev/null +++ b/lib/tfw/components/ide_event_handler/file_manager.py @@ -0,0 +1,93 @@ +from typing import Iterable +from glob import glob +from fnmatch import fnmatchcase +from os.path import basename, isfile, join, relpath, exists, isdir, realpath + + +class FileManager: # pylint: disable=too-many-instance-attributes + def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None): + self._exclude, self.exclude = [], exclude + self._allowed_directories, self.allowed_directories = None, allowed_directories + self._workdir, self.workdir = None, working_directory + self._filename, self.filename = None, selected_file or self.files[0] + + @property + def exclude(self): + return self._exclude + + @exclude.setter + def exclude(self, exclude): + if exclude is None: + return + if not isinstance(exclude, Iterable): + raise TypeError('Exclude must be Iterable!') + self._exclude = exclude + + @property + def workdir(self): + return self._workdir + + @workdir.setter + def workdir(self, directory): + if not exists(directory) or not isdir(directory): + raise EnvironmentError(f'"{directory}" is not a directory!') + if not self._is_in_allowed_dir(directory): + raise EnvironmentError(f'Directory "{directory}" is not allowed!') + self._workdir = directory + + @property + def allowed_directories(self): + return self._allowed_directories + + @allowed_directories.setter + def allowed_directories(self, directories): + self._allowed_directories = directories + + @property + def filename(self): + return self._filename + + @filename.setter + def filename(self, filename): + if filename not in self.files: + raise EnvironmentError('No such file in workdir!') + self._filename = filename + + @property + def files(self): + return [ + self._relpath(file) + for file in glob(join(self._workdir, '**/*'), recursive=True) + if isfile(file) + and self._is_in_allowed_dir(file) + and not self._is_blacklisted(file) + ] + + @property + def file_contents(self): + with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile: + return ifile.read() + + @file_contents.setter + def file_contents(self, value): + with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile: + ofile.write(value) + + def _is_in_allowed_dir(self, path): + return any( + realpath(path).startswith(allowed_dir) + for allowed_dir in self.allowed_directories + ) + + def _is_blacklisted(self, file): + return any( + fnmatchcase(file, blacklisted) or + fnmatchcase(basename(file), blacklisted) + for blacklisted in self.exclude + ) + + def _filepath(self, filename): + return join(self._workdir, filename) + + def _relpath(self, filename): + return relpath(self._filepath(filename), start=self._workdir) diff --git a/lib/tfw/components/ide_event_handler/ide_event_handler.py b/lib/tfw/components/ide_event_handler/ide_event_handler.py index 19d8652..218d3ed 100644 --- a/lib/tfw/components/ide_event_handler/ide_event_handler.py +++ b/lib/tfw/components/ide_event_handler/ide_event_handler.py @@ -11,98 +11,11 @@ from tfw.event_handlers import FrontendEventHandlerBase from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin from tfw.components.directory_monitor import DirectoryMonitor +from .file_manager import FileManager + LOG = logging.getLogger(__name__) -class FileManager: # pylint: disable=too-many-instance-attributes - def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None): - self._exclude, self.exclude = [], exclude - self._allowed_directories, self.allowed_directories = None, allowed_directories - self._workdir, self.workdir = None, working_directory - self._filename, self.filename = None, selected_file or self.files[0] - - @property - def exclude(self): - return self._exclude - - @exclude.setter - def exclude(self, exclude): - if exclude is None: - return - if not isinstance(exclude, Iterable): - raise TypeError('Exclude must be Iterable!') - self._exclude = exclude - - @property - def workdir(self): - return self._workdir - - @workdir.setter - def workdir(self, directory): - if not exists(directory) or not isdir(directory): - raise EnvironmentError(f'"{directory}" is not a directory!') - if not self._is_in_allowed_dir(directory): - raise EnvironmentError(f'Directory "{directory}" is not allowed!') - self._workdir = directory - - @property - def allowed_directories(self): - return self._allowed_directories - - @allowed_directories.setter - def allowed_directories(self, directories): - self._allowed_directories = directories - - @property - def filename(self): - return self._filename - - @filename.setter - def filename(self, filename): - if filename not in self.files: - raise EnvironmentError('No such file in workdir!') - self._filename = filename - - @property - def files(self): - return [ - self._relpath(file) - for file in glob(join(self._workdir, '**/*'), recursive=True) - if isfile(file) - and self._is_in_allowed_dir(file) - and not self._is_blacklisted(file) - ] - - @property - def file_contents(self): - with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile: - return ifile.read() - - @file_contents.setter - def file_contents(self, value): - with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile: - ofile.write(value) - - def _is_in_allowed_dir(self, path): - return any( - realpath(path).startswith(allowed_dir) - for allowed_dir in self.allowed_directories - ) - - def _is_blacklisted(self, file): - return any( - fnmatchcase(file, blacklisted) or - fnmatchcase(basename(file), blacklisted) - for blacklisted in self.exclude - ) - - def _filepath(self, filename): - return join(self._workdir, filename) - - def _relpath(self, filename): - return relpath(self._filepath(filename), start=self._workdir) - - class IdeEventHandler(FrontendEventHandlerBase, MonitorManagerMixin): # pylint: disable=too-many-arguments,anomalous-backslash-in-string """ diff --git a/lib/tfw/components/ide_event_handler/test_filemanager.py b/lib/tfw/components/ide_event_handler/test_filemanager.py new file mode 100644 index 0000000..d62f3e0 --- /dev/null +++ b/lib/tfw/components/ide_event_handler/test_filemanager.py @@ -0,0 +1,97 @@ +# pylint: disable=redefined-outer-name + +from secrets import token_urlsafe +from pathlib import Path +from shutil import rmtree +from os.path import join, realpath +from os import mkdir, rmdir, remove, symlink + +import pytest + +from filemanager import FileManager + + +WORKDIR = realpath('test_filemanager') + +def workdir_pref(path): + return join(WORKDIR, path) + +@pytest.fixture(scope='module') +def manager(): + dirs = [] + mkdir(WORKDIR) + + for i in range(3): + node = workdir_pref('dir_'+str(i).zfill(2)) + mkdir(node) + Path(join(node, 'empty.txt')).touch() + Path(join(node, 'empty.bin')).touch() + dirs.append(node) + + yield FileManager(dirs[0], dirs[:-1], exclude=['*/dir_01/*', '*.bin']) + rmtree(WORKDIR) + +@pytest.mark.parametrize('subdir', ['dir_00', 'dir_01']) +def test_select_allowed_dirs(manager, subdir): + manager.workdir = workdir_pref(subdir) + assert manager.workdir == workdir_pref(subdir) + newdir = workdir_pref(join(subdir, 'deep')) + mkdir(newdir) + manager.workdir = newdir + assert manager.workdir == newdir + rmdir(newdir) + +@pytest.mark.parametrize('excdir', ['/', workdir_pref('dir_02')]) +def test_select_excluded_dirs(manager, excdir): + allowed = manager.allowed_directories + with pytest.raises(OSError): + manager.workdir = excdir + assert manager.workdir != excdir + manager.allowed_directories = allowed+[excdir] + manager.workdir = excdir + assert manager.workdir == excdir + manager.allowed_directories = allowed + +@pytest.mark.parametrize('filename', ['another.txt', '*.txt']) +def test_select_allowed_files(manager, filename): + manager.workdir = workdir_pref('dir_00') + newfile = workdir_pref(join('dir_00', filename)) + Path(newfile).touch() + assert filename in manager.files + manager.filename = filename + assert manager.filename == filename + remove(newfile) + +@pytest.mark.parametrize('path', [ + ['dir_00', 'illegal.bin'], + ['dir_01', 'legal.txt'] +]) +def test_select_excluded_files(manager, path): + manager.workdir = workdir_pref(path[0]) + newfile = workdir_pref(join(path[0], path[1])) + Path(newfile).touch() + assert path[1] not in manager.files + with pytest.raises(OSError): + manager.filename = path[1] + remove(newfile) + +@pytest.mark.parametrize('path', [ + ['dir_02/empty.txt', 'dir_00/link.txt'], + ['dir_01/empty.txt', 'dir_00/link.bin'] +]) +def test_select_excluded_symlinks(manager, path): + manager.workdir = workdir_pref('dir_00') + link = workdir_pref(path[1]) + symlink(workdir_pref(path[0]), link) + assert path[1] not in manager.files + remove(link) + +def test_read_write_file(manager): + for _ in range(128): + manager.workdir = workdir_pref('dir_00') + manager.filename = 'empty.txt' + content = token_urlsafe(32) + manager.file_contents = content + assert manager.file_contents == content + with open(workdir_pref('dir_00/empty.txt'), "r") as ifile: + assert ifile.read() == content From b44fd200c657c7c941df71c6e4e722796ac96b4c Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Wed, 22 May 2019 22:31:55 +0200 Subject: [PATCH 3/4] Make fixtures function scoped and add new tests --- .../ide_event_handler/file_manager.py | 2 +- .../ide_event_handler/test_filemanager.py | 138 +++++++++--------- 2 files changed, 73 insertions(+), 67 deletions(-) diff --git a/lib/tfw/components/ide_event_handler/file_manager.py b/lib/tfw/components/ide_event_handler/file_manager.py index 59431ea..967e06f 100644 --- a/lib/tfw/components/ide_event_handler/file_manager.py +++ b/lib/tfw/components/ide_event_handler/file_manager.py @@ -41,7 +41,7 @@ class FileManager: # pylint: disable=too-many-instance-attributes @allowed_directories.setter def allowed_directories(self, directories): - self._allowed_directories = directories + self._allowed_directories = [realpath(directory) for directory in directories] @property def filename(self): diff --git a/lib/tfw/components/ide_event_handler/test_filemanager.py b/lib/tfw/components/ide_event_handler/test_filemanager.py index d62f3e0..c6e0b8b 100644 --- a/lib/tfw/components/ide_event_handler/test_filemanager.py +++ b/lib/tfw/components/ide_event_handler/test_filemanager.py @@ -1,97 +1,103 @@ # pylint: disable=redefined-outer-name from secrets import token_urlsafe +from os.path import join +from os import chdir, mkdir, symlink from pathlib import Path -from shutil import rmtree -from os.path import join, realpath -from os import mkdir, rmdir, remove, symlink +from tempfile import TemporaryDirectory import pytest -from filemanager import FileManager +from file_manager import FileManager -WORKDIR = realpath('test_filemanager') +class ManagerContext: + def __init__(self, folder, manager): + self.folder = folder + self.manager = manager -def workdir_pref(path): - return join(WORKDIR, path) + def join(self, path): + return join(self.folder, path) -@pytest.fixture(scope='module') -def manager(): + +@pytest.fixture() +def context(): dirs = [] - mkdir(WORKDIR) - for i in range(3): - node = workdir_pref('dir_'+str(i).zfill(2)) - mkdir(node) - Path(join(node, 'empty.txt')).touch() - Path(join(node, 'empty.bin')).touch() - dirs.append(node) + with TemporaryDirectory() as workdir: + chdir(workdir) + for name in ["allowed", "excluded", "invis"]: + node = join(workdir, name) + mkdir(node) + Path(join(node, 'empty.txt')).touch() + Path(join(node, 'empty.bin')).touch() + dirs.append(node) - yield FileManager(dirs[0], dirs[:-1], exclude=['*/dir_01/*', '*.bin']) - rmtree(WORKDIR) + yield ManagerContext( + workdir, + FileManager(dirs[0], dirs[:-1], exclude=['*/excluded/*']) + ) -@pytest.mark.parametrize('subdir', ['dir_00', 'dir_01']) -def test_select_allowed_dirs(manager, subdir): - manager.workdir = workdir_pref(subdir) - assert manager.workdir == workdir_pref(subdir) - newdir = workdir_pref(join(subdir, 'deep')) +@pytest.mark.parametrize('subdir', ['allowed/', 'excluded/']) +def test_select_allowed_dirs(context, subdir): + context.manager.workdir = context.join(subdir) + assert context.manager.workdir == context.join(subdir) + newdir = context.join(subdir+'deep') mkdir(newdir) - manager.workdir = newdir - assert manager.workdir == newdir - rmdir(newdir) + context.manager.workdir = newdir + assert context.manager.workdir == newdir -@pytest.mark.parametrize('excdir', ['/', workdir_pref('dir_02')]) -def test_select_excluded_dirs(manager, excdir): - allowed = manager.allowed_directories +@pytest.mark.parametrize('invdir', ['', 'invis']) +def test_select_forbidden_dirs(context, invdir): + fullpath = context.join(invdir) with pytest.raises(OSError): - manager.workdir = excdir - assert manager.workdir != excdir - manager.allowed_directories = allowed+[excdir] - manager.workdir = excdir - assert manager.workdir == excdir - manager.allowed_directories = allowed + context.manager.workdir = fullpath + assert context.manager.workdir != fullpath + context.manager.allowed_directories += [fullpath] + context.manager.workdir = fullpath + assert context.manager.workdir == fullpath + @pytest.mark.parametrize('filename', ['another.txt', '*.txt']) -def test_select_allowed_files(manager, filename): - manager.workdir = workdir_pref('dir_00') - newfile = workdir_pref(join('dir_00', filename)) - Path(newfile).touch() - assert filename in manager.files - manager.filename = filename - assert manager.filename == filename - remove(newfile) +def test_select_allowed_files(context, filename): + Path(context.join('allowed/'+filename)).touch() + assert filename in context.manager.files + context.manager.filename = filename + assert context.manager.filename == filename @pytest.mark.parametrize('path', [ - ['dir_00', 'illegal.bin'], - ['dir_01', 'legal.txt'] + {'dir': 'allowed/', 'file': 'illegal.bin'}, + {'dir': 'excluded/', 'file': 'legal.txt'}, + {'dir': 'allowed/', 'file': token_urlsafe(16)+'.bin'}, + {'dir': 'excluded/', 'file': token_urlsafe(16)+'.txt'}, + {'dir': 'allowed/', 'file': token_urlsafe(32)+'.bin'}, + {'dir': 'excluded/', 'file': token_urlsafe(32)+'.txt'} ]) -def test_select_excluded_files(manager, path): - manager.workdir = workdir_pref(path[0]) - newfile = workdir_pref(join(path[0], path[1])) - Path(newfile).touch() - assert path[1] not in manager.files +def test_select_excluded_files(context, path): + context.manager.workdir = context.join(path['dir']) + context.manager.exclude = ['*/excluded/*', '*.bin'] + Path(context.join(path['dir']+path['file'])).touch() + assert path['file'] not in context.manager.files with pytest.raises(OSError): - manager.filename = path[1] - remove(newfile) + context.manager.filename = path['file'] @pytest.mark.parametrize('path', [ - ['dir_02/empty.txt', 'dir_00/link.txt'], - ['dir_01/empty.txt', 'dir_00/link.bin'] + {'src': 'excluded/empty.txt', 'dst': 'allowed/link.txt'}, + {'src': 'invis/empty.txt', 'dst': 'allowed/link.txt'}, + {'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'}, + {'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'}, + {'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'}, + {'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'} ]) -def test_select_excluded_symlinks(manager, path): - manager.workdir = workdir_pref('dir_00') - link = workdir_pref(path[1]) - symlink(workdir_pref(path[0]), link) - assert path[1] not in manager.files - remove(link) +def test_select_excluded_symlinks(context, path): + symlink(context.join(path['src']), context.join(path['dst'])) + assert path['dst'] not in context.manager.files -def test_read_write_file(manager): +def test_read_write_file(context): for _ in range(128): - manager.workdir = workdir_pref('dir_00') - manager.filename = 'empty.txt' + context.manager.filename = 'empty.txt' content = token_urlsafe(32) - manager.file_contents = content - assert manager.file_contents == content - with open(workdir_pref('dir_00/empty.txt'), "r") as ifile: + context.manager.file_contents = content + assert context.manager.file_contents == content + with open(context.join('allowed/empty.txt'), "r") as ifile: assert ifile.read() == content From 8cbe737d2fc46c21321e8f257cd60727f26cccad Mon Sep 17 00:00:00 2001 From: "R. Richard" Date: Thu, 6 Jun 2019 09:58:18 +0200 Subject: [PATCH 4/4] Turn context into a dataclass and add new test case --- ...st_filemanager.py => test_file_manager.py} | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) rename lib/tfw/components/ide_event_handler/{test_filemanager.py => test_file_manager.py} (74%) diff --git a/lib/tfw/components/ide_event_handler/test_filemanager.py b/lib/tfw/components/ide_event_handler/test_file_manager.py similarity index 74% rename from lib/tfw/components/ide_event_handler/test_filemanager.py rename to lib/tfw/components/ide_event_handler/test_file_manager.py index c6e0b8b..33c8024 100644 --- a/lib/tfw/components/ide_event_handler/test_filemanager.py +++ b/lib/tfw/components/ide_event_handler/test_file_manager.py @@ -1,5 +1,6 @@ # pylint: disable=redefined-outer-name +from dataclasses import dataclass from secrets import token_urlsafe from os.path import join from os import chdir, mkdir, symlink @@ -10,11 +11,10 @@ import pytest from file_manager import FileManager - +@dataclass class ManagerContext: - def __init__(self, folder, manager): - self.folder = folder - self.manager = manager + folder: str + manager: FileManager def join(self, path): return join(self.folder, path) @@ -22,20 +22,24 @@ class ManagerContext: @pytest.fixture() def context(): - dirs = [] + dirs = {} with TemporaryDirectory() as workdir: chdir(workdir) - for name in ["allowed", "excluded", "invis"]: + for name in ['allowed', 'excluded', 'invis']: node = join(workdir, name) mkdir(node) Path(join(node, 'empty.txt')).touch() Path(join(node, 'empty.bin')).touch() - dirs.append(node) + dirs[name] = node yield ManagerContext( workdir, - FileManager(dirs[0], dirs[:-1], exclude=['*/excluded/*']) + FileManager( + dirs['allowed'], + [dirs['allowed'], dirs['excluded']], + exclude=['*/excluded/*'] + ) ) @pytest.mark.parametrize('subdir', ['allowed/', 'excluded/']) @@ -99,5 +103,22 @@ def test_read_write_file(context): content = token_urlsafe(32) context.manager.file_contents = content assert context.manager.file_contents == content - with open(context.join('allowed/empty.txt'), "r") as ifile: + with open(context.join('allowed/empty.txt'), 'r') as ifile: assert ifile.read() == content + +def test_regular_ide_actions(context): + context.manager.workdir = context.join('allowed') + newfile1, newfile2 = token_urlsafe(16), token_urlsafe(16) + Path(context.join(f'allowed/{newfile1}')).touch() + Path(context.join(f'allowed/{newfile2}')).touch() + for _ in range(8): + context.manager.filename = newfile1 + content1 = token_urlsafe(32) + context.manager.file_contents = content1 + context.manager.filename = newfile2 + content2 = token_urlsafe(32) + context.manager.file_contents = content2 + context.manager.filename = newfile1 + assert context.manager.file_contents == content1 + context.manager.filename = newfile2 + assert context.manager.file_contents == content2