diff --git a/lib/tfw/components/ide_event_handler/file_manager.py b/lib/tfw/components/ide_event_handler/file_manager.py new file mode 100644 index 0000000..967e06f --- /dev/null +++ b/lib/tfw/components/ide_event_handler/file_manager.py @@ -0,0 +1,93 @@ +from typing import Iterable +from glob import glob +from fnmatch import fnmatchcase +from os.path import basename, isfile, join, relpath, exists, isdir, realpath + + +class FileManager: # pylint: disable=too-many-instance-attributes + def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None): + self._exclude, self.exclude = [], exclude + self._allowed_directories, self.allowed_directories = None, allowed_directories + self._workdir, self.workdir = None, working_directory + self._filename, self.filename = None, selected_file or self.files[0] + + @property + def exclude(self): + return self._exclude + + @exclude.setter + def exclude(self, exclude): + if exclude is None: + return + if not isinstance(exclude, Iterable): + raise TypeError('Exclude must be Iterable!') + self._exclude = exclude + + @property + def workdir(self): + return self._workdir + + @workdir.setter + def workdir(self, directory): + if not exists(directory) or not isdir(directory): + raise EnvironmentError(f'"{directory}" is not a directory!') + if not self._is_in_allowed_dir(directory): + raise EnvironmentError(f'Directory "{directory}" is not allowed!') + self._workdir = directory + + @property + def allowed_directories(self): + return self._allowed_directories + + @allowed_directories.setter + def allowed_directories(self, directories): + self._allowed_directories = [realpath(directory) for directory in directories] + + @property + def filename(self): + return self._filename + + @filename.setter + def filename(self, filename): + if filename not in self.files: + raise EnvironmentError('No such file in workdir!') + self._filename = filename + + @property + def files(self): + return [ + self._relpath(file) + for file in glob(join(self._workdir, '**/*'), recursive=True) + if isfile(file) + and self._is_in_allowed_dir(file) + and not self._is_blacklisted(file) + ] + + @property + def file_contents(self): + with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile: + return ifile.read() + + @file_contents.setter + def file_contents(self, value): + with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile: + ofile.write(value) + + def _is_in_allowed_dir(self, path): + return any( + realpath(path).startswith(allowed_dir) + for allowed_dir in self.allowed_directories + ) + + def _is_blacklisted(self, file): + return any( + fnmatchcase(file, blacklisted) or + fnmatchcase(basename(file), blacklisted) + for blacklisted in self.exclude + ) + + def _filepath(self, filename): + return join(self._workdir, filename) + + def _relpath(self, filename): + return relpath(self._filepath(filename), start=self._workdir) diff --git a/lib/tfw/components/ide_event_handler.py b/lib/tfw/components/ide_event_handler/ide_event_handler.py similarity index 68% rename from lib/tfw/components/ide_event_handler.py rename to lib/tfw/components/ide_event_handler/ide_event_handler.py index da32cda..218d3ed 100644 --- a/lib/tfw/components/ide_event_handler.py +++ b/lib/tfw/components/ide_event_handler/ide_event_handler.py @@ -11,97 +11,11 @@ from tfw.event_handlers import FrontendEventHandlerBase from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin from tfw.components.directory_monitor import DirectoryMonitor +from .file_manager import FileManager + LOG = logging.getLogger(__name__) -class FileManager: # pylint: disable=too-many-instance-attributes - def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None): - self._exclude, self.exclude = None, exclude - self._allowed_directories, self.allowed_directories = None, allowed_directories - self._workdir, self.workdir = None, working_directory - self._filename, self.filename = None, selected_file or self.files[0] - - @property - def exclude(self): - return self._exclude - - @exclude.setter - def exclude(self, exclude): - if exclude is None: - return - if not isinstance(exclude, Iterable): - raise TypeError('Exclude must be Iterable!') - self._exclude = exclude - - @property - def workdir(self): - return self._workdir - - @workdir.setter - def workdir(self, directory): - if not exists(directory) or not isdir(directory): - raise EnvironmentError(f'"{directory}" is not a directory!') - if not self._is_in_allowed_dir(directory): - raise EnvironmentError(f'Directory "{directory}" is not allowed!') - self._workdir = directory - - @property - def allowed_directories(self): - return self._allowed_directories - - @allowed_directories.setter - def allowed_directories(self, directories): - self._allowed_directories = directories - - @property - def filename(self): - return self._filename - - @filename.setter - def filename(self, filename): - if filename not in self.files: - raise EnvironmentError('No such file in workdir!') - self._filename = filename - - @property - def files(self): - return [ - self._relpath(file) - for file in glob(join(self._workdir, '**/*'), recursive=True) - if isfile(file) - and self._is_in_allowed_dir(file) - and not self._is_blacklisted(file) - ] - - @property - def file_contents(self): - with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile: - return ifile.read() - - @file_contents.setter - def file_contents(self, value): - with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile: - ofile.write(value) - - def _is_in_allowed_dir(self, path): - return any( - realpath(path).startswith(allowed_dir) - for allowed_dir in self.allowed_directories - ) - - def _is_blacklisted(self, file): - return any( - fnmatchcase(file, blacklisted) - for blacklisted in self.exclude - ) - - def _filepath(self, filename): - return join(self._workdir, filename) - - def _relpath(self, filename): - return relpath(self._filepath(filename), start=self._workdir) - - class IdeEventHandler(FrontendEventHandlerBase, MonitorManagerMixin): # pylint: disable=too-many-arguments,anomalous-backslash-in-string """ diff --git a/lib/tfw/components/ide_event_handler/test_file_manager.py b/lib/tfw/components/ide_event_handler/test_file_manager.py new file mode 100644 index 0000000..33c8024 --- /dev/null +++ b/lib/tfw/components/ide_event_handler/test_file_manager.py @@ -0,0 +1,124 @@ +# pylint: disable=redefined-outer-name + +from dataclasses import dataclass +from secrets import token_urlsafe +from os.path import join +from os import chdir, mkdir, symlink +from pathlib import Path +from tempfile import TemporaryDirectory + +import pytest + +from file_manager import FileManager + +@dataclass +class ManagerContext: + folder: str + manager: FileManager + + def join(self, path): + return join(self.folder, path) + + +@pytest.fixture() +def context(): + dirs = {} + + with TemporaryDirectory() as workdir: + chdir(workdir) + for name in ['allowed', 'excluded', 'invis']: + node = join(workdir, name) + mkdir(node) + Path(join(node, 'empty.txt')).touch() + Path(join(node, 'empty.bin')).touch() + dirs[name] = node + + yield ManagerContext( + workdir, + FileManager( + dirs['allowed'], + [dirs['allowed'], dirs['excluded']], + exclude=['*/excluded/*'] + ) + ) + +@pytest.mark.parametrize('subdir', ['allowed/', 'excluded/']) +def test_select_allowed_dirs(context, subdir): + context.manager.workdir = context.join(subdir) + assert context.manager.workdir == context.join(subdir) + newdir = context.join(subdir+'deep') + mkdir(newdir) + context.manager.workdir = newdir + assert context.manager.workdir == newdir + +@pytest.mark.parametrize('invdir', ['', 'invis']) +def test_select_forbidden_dirs(context, invdir): + fullpath = context.join(invdir) + with pytest.raises(OSError): + context.manager.workdir = fullpath + assert context.manager.workdir != fullpath + context.manager.allowed_directories += [fullpath] + context.manager.workdir = fullpath + assert context.manager.workdir == fullpath + + +@pytest.mark.parametrize('filename', ['another.txt', '*.txt']) +def test_select_allowed_files(context, filename): + Path(context.join('allowed/'+filename)).touch() + assert filename in context.manager.files + context.manager.filename = filename + assert context.manager.filename == filename + +@pytest.mark.parametrize('path', [ + {'dir': 'allowed/', 'file': 'illegal.bin'}, + {'dir': 'excluded/', 'file': 'legal.txt'}, + {'dir': 'allowed/', 'file': token_urlsafe(16)+'.bin'}, + {'dir': 'excluded/', 'file': token_urlsafe(16)+'.txt'}, + {'dir': 'allowed/', 'file': token_urlsafe(32)+'.bin'}, + {'dir': 'excluded/', 'file': token_urlsafe(32)+'.txt'} +]) +def test_select_excluded_files(context, path): + context.manager.workdir = context.join(path['dir']) + context.manager.exclude = ['*/excluded/*', '*.bin'] + Path(context.join(path['dir']+path['file'])).touch() + assert path['file'] not in context.manager.files + with pytest.raises(OSError): + context.manager.filename = path['file'] + +@pytest.mark.parametrize('path', [ + {'src': 'excluded/empty.txt', 'dst': 'allowed/link.txt'}, + {'src': 'invis/empty.txt', 'dst': 'allowed/link.txt'}, + {'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'}, + {'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'}, + {'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'}, + {'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'} +]) +def test_select_excluded_symlinks(context, path): + symlink(context.join(path['src']), context.join(path['dst'])) + assert path['dst'] not in context.manager.files + +def test_read_write_file(context): + for _ in range(128): + context.manager.filename = 'empty.txt' + content = token_urlsafe(32) + context.manager.file_contents = content + assert context.manager.file_contents == content + with open(context.join('allowed/empty.txt'), 'r') as ifile: + assert ifile.read() == content + +def test_regular_ide_actions(context): + context.manager.workdir = context.join('allowed') + newfile1, newfile2 = token_urlsafe(16), token_urlsafe(16) + Path(context.join(f'allowed/{newfile1}')).touch() + Path(context.join(f'allowed/{newfile2}')).touch() + for _ in range(8): + context.manager.filename = newfile1 + content1 = token_urlsafe(32) + context.manager.file_contents = content1 + context.manager.filename = newfile2 + content2 = token_urlsafe(32) + context.manager.file_contents = content2 + context.manager.filename = newfile1 + assert context.manager.file_contents == content1 + context.manager.filename = newfile2 + assert context.manager.file_contents == content2