mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2025-01-22 20:01:55 +00:00
Add unit test for file manager
This commit is contained in:
parent
a69031015b
commit
9cb8ef0e72
93
lib/tfw/components/ide_event_handler/file_manager.py
Normal file
93
lib/tfw/components/ide_event_handler/file_manager.py
Normal file
@ -0,0 +1,93 @@
|
||||
from typing import Iterable
|
||||
from glob import glob
|
||||
from fnmatch import fnmatchcase
|
||||
from os.path import basename, isfile, join, relpath, exists, isdir, realpath
|
||||
|
||||
|
||||
class FileManager: # pylint: disable=too-many-instance-attributes
|
||||
def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None):
|
||||
self._exclude, self.exclude = [], exclude
|
||||
self._allowed_directories, self.allowed_directories = None, allowed_directories
|
||||
self._workdir, self.workdir = None, working_directory
|
||||
self._filename, self.filename = None, selected_file or self.files[0]
|
||||
|
||||
@property
|
||||
def exclude(self):
|
||||
return self._exclude
|
||||
|
||||
@exclude.setter
|
||||
def exclude(self, exclude):
|
||||
if exclude is None:
|
||||
return
|
||||
if not isinstance(exclude, Iterable):
|
||||
raise TypeError('Exclude must be Iterable!')
|
||||
self._exclude = exclude
|
||||
|
||||
@property
|
||||
def workdir(self):
|
||||
return self._workdir
|
||||
|
||||
@workdir.setter
|
||||
def workdir(self, directory):
|
||||
if not exists(directory) or not isdir(directory):
|
||||
raise EnvironmentError(f'"{directory}" is not a directory!')
|
||||
if not self._is_in_allowed_dir(directory):
|
||||
raise EnvironmentError(f'Directory "{directory}" is not allowed!')
|
||||
self._workdir = directory
|
||||
|
||||
@property
|
||||
def allowed_directories(self):
|
||||
return self._allowed_directories
|
||||
|
||||
@allowed_directories.setter
|
||||
def allowed_directories(self, directories):
|
||||
self._allowed_directories = directories
|
||||
|
||||
@property
|
||||
def filename(self):
|
||||
return self._filename
|
||||
|
||||
@filename.setter
|
||||
def filename(self, filename):
|
||||
if filename not in self.files:
|
||||
raise EnvironmentError('No such file in workdir!')
|
||||
self._filename = filename
|
||||
|
||||
@property
|
||||
def files(self):
|
||||
return [
|
||||
self._relpath(file)
|
||||
for file in glob(join(self._workdir, '**/*'), recursive=True)
|
||||
if isfile(file)
|
||||
and self._is_in_allowed_dir(file)
|
||||
and not self._is_blacklisted(file)
|
||||
]
|
||||
|
||||
@property
|
||||
def file_contents(self):
|
||||
with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile:
|
||||
return ifile.read()
|
||||
|
||||
@file_contents.setter
|
||||
def file_contents(self, value):
|
||||
with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile:
|
||||
ofile.write(value)
|
||||
|
||||
def _is_in_allowed_dir(self, path):
|
||||
return any(
|
||||
realpath(path).startswith(allowed_dir)
|
||||
for allowed_dir in self.allowed_directories
|
||||
)
|
||||
|
||||
def _is_blacklisted(self, file):
|
||||
return any(
|
||||
fnmatchcase(file, blacklisted) or
|
||||
fnmatchcase(basename(file), blacklisted)
|
||||
for blacklisted in self.exclude
|
||||
)
|
||||
|
||||
def _filepath(self, filename):
|
||||
return join(self._workdir, filename)
|
||||
|
||||
def _relpath(self, filename):
|
||||
return relpath(self._filepath(filename), start=self._workdir)
|
@ -11,98 +11,11 @@ from tfw.event_handlers import FrontendEventHandlerBase
|
||||
from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin
|
||||
from tfw.components.directory_monitor import DirectoryMonitor
|
||||
|
||||
from .file_manager import FileManager
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FileManager: # pylint: disable=too-many-instance-attributes
|
||||
def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None):
|
||||
self._exclude, self.exclude = [], exclude
|
||||
self._allowed_directories, self.allowed_directories = None, allowed_directories
|
||||
self._workdir, self.workdir = None, working_directory
|
||||
self._filename, self.filename = None, selected_file or self.files[0]
|
||||
|
||||
@property
|
||||
def exclude(self):
|
||||
return self._exclude
|
||||
|
||||
@exclude.setter
|
||||
def exclude(self, exclude):
|
||||
if exclude is None:
|
||||
return
|
||||
if not isinstance(exclude, Iterable):
|
||||
raise TypeError('Exclude must be Iterable!')
|
||||
self._exclude = exclude
|
||||
|
||||
@property
|
||||
def workdir(self):
|
||||
return self._workdir
|
||||
|
||||
@workdir.setter
|
||||
def workdir(self, directory):
|
||||
if not exists(directory) or not isdir(directory):
|
||||
raise EnvironmentError(f'"{directory}" is not a directory!')
|
||||
if not self._is_in_allowed_dir(directory):
|
||||
raise EnvironmentError(f'Directory "{directory}" is not allowed!')
|
||||
self._workdir = directory
|
||||
|
||||
@property
|
||||
def allowed_directories(self):
|
||||
return self._allowed_directories
|
||||
|
||||
@allowed_directories.setter
|
||||
def allowed_directories(self, directories):
|
||||
self._allowed_directories = directories
|
||||
|
||||
@property
|
||||
def filename(self):
|
||||
return self._filename
|
||||
|
||||
@filename.setter
|
||||
def filename(self, filename):
|
||||
if filename not in self.files:
|
||||
raise EnvironmentError('No such file in workdir!')
|
||||
self._filename = filename
|
||||
|
||||
@property
|
||||
def files(self):
|
||||
return [
|
||||
self._relpath(file)
|
||||
for file in glob(join(self._workdir, '**/*'), recursive=True)
|
||||
if isfile(file)
|
||||
and self._is_in_allowed_dir(file)
|
||||
and not self._is_blacklisted(file)
|
||||
]
|
||||
|
||||
@property
|
||||
def file_contents(self):
|
||||
with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile:
|
||||
return ifile.read()
|
||||
|
||||
@file_contents.setter
|
||||
def file_contents(self, value):
|
||||
with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile:
|
||||
ofile.write(value)
|
||||
|
||||
def _is_in_allowed_dir(self, path):
|
||||
return any(
|
||||
realpath(path).startswith(allowed_dir)
|
||||
for allowed_dir in self.allowed_directories
|
||||
)
|
||||
|
||||
def _is_blacklisted(self, file):
|
||||
return any(
|
||||
fnmatchcase(file, blacklisted) or
|
||||
fnmatchcase(basename(file), blacklisted)
|
||||
for blacklisted in self.exclude
|
||||
)
|
||||
|
||||
def _filepath(self, filename):
|
||||
return join(self._workdir, filename)
|
||||
|
||||
def _relpath(self, filename):
|
||||
return relpath(self._filepath(filename), start=self._workdir)
|
||||
|
||||
|
||||
class IdeEventHandler(FrontendEventHandlerBase, MonitorManagerMixin):
|
||||
# pylint: disable=too-many-arguments,anomalous-backslash-in-string
|
||||
"""
|
||||
|
97
lib/tfw/components/ide_event_handler/test_filemanager.py
Normal file
97
lib/tfw/components/ide_event_handler/test_filemanager.py
Normal file
@ -0,0 +1,97 @@
|
||||
# pylint: disable=redefined-outer-name
|
||||
|
||||
from secrets import token_urlsafe
|
||||
from pathlib import Path
|
||||
from shutil import rmtree
|
||||
from os.path import join, realpath
|
||||
from os import mkdir, rmdir, remove, symlink
|
||||
|
||||
import pytest
|
||||
|
||||
from filemanager import FileManager
|
||||
|
||||
|
||||
WORKDIR = realpath('test_filemanager')
|
||||
|
||||
def workdir_pref(path):
|
||||
return join(WORKDIR, path)
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def manager():
|
||||
dirs = []
|
||||
mkdir(WORKDIR)
|
||||
|
||||
for i in range(3):
|
||||
node = workdir_pref('dir_'+str(i).zfill(2))
|
||||
mkdir(node)
|
||||
Path(join(node, 'empty.txt')).touch()
|
||||
Path(join(node, 'empty.bin')).touch()
|
||||
dirs.append(node)
|
||||
|
||||
yield FileManager(dirs[0], dirs[:-1], exclude=['*/dir_01/*', '*.bin'])
|
||||
rmtree(WORKDIR)
|
||||
|
||||
@pytest.mark.parametrize('subdir', ['dir_00', 'dir_01'])
|
||||
def test_select_allowed_dirs(manager, subdir):
|
||||
manager.workdir = workdir_pref(subdir)
|
||||
assert manager.workdir == workdir_pref(subdir)
|
||||
newdir = workdir_pref(join(subdir, 'deep'))
|
||||
mkdir(newdir)
|
||||
manager.workdir = newdir
|
||||
assert manager.workdir == newdir
|
||||
rmdir(newdir)
|
||||
|
||||
@pytest.mark.parametrize('excdir', ['/', workdir_pref('dir_02')])
|
||||
def test_select_excluded_dirs(manager, excdir):
|
||||
allowed = manager.allowed_directories
|
||||
with pytest.raises(OSError):
|
||||
manager.workdir = excdir
|
||||
assert manager.workdir != excdir
|
||||
manager.allowed_directories = allowed+[excdir]
|
||||
manager.workdir = excdir
|
||||
assert manager.workdir == excdir
|
||||
manager.allowed_directories = allowed
|
||||
|
||||
@pytest.mark.parametrize('filename', ['another.txt', '*.txt'])
|
||||
def test_select_allowed_files(manager, filename):
|
||||
manager.workdir = workdir_pref('dir_00')
|
||||
newfile = workdir_pref(join('dir_00', filename))
|
||||
Path(newfile).touch()
|
||||
assert filename in manager.files
|
||||
manager.filename = filename
|
||||
assert manager.filename == filename
|
||||
remove(newfile)
|
||||
|
||||
@pytest.mark.parametrize('path', [
|
||||
['dir_00', 'illegal.bin'],
|
||||
['dir_01', 'legal.txt']
|
||||
])
|
||||
def test_select_excluded_files(manager, path):
|
||||
manager.workdir = workdir_pref(path[0])
|
||||
newfile = workdir_pref(join(path[0], path[1]))
|
||||
Path(newfile).touch()
|
||||
assert path[1] not in manager.files
|
||||
with pytest.raises(OSError):
|
||||
manager.filename = path[1]
|
||||
remove(newfile)
|
||||
|
||||
@pytest.mark.parametrize('path', [
|
||||
['dir_02/empty.txt', 'dir_00/link.txt'],
|
||||
['dir_01/empty.txt', 'dir_00/link.bin']
|
||||
])
|
||||
def test_select_excluded_symlinks(manager, path):
|
||||
manager.workdir = workdir_pref('dir_00')
|
||||
link = workdir_pref(path[1])
|
||||
symlink(workdir_pref(path[0]), link)
|
||||
assert path[1] not in manager.files
|
||||
remove(link)
|
||||
|
||||
def test_read_write_file(manager):
|
||||
for _ in range(128):
|
||||
manager.workdir = workdir_pref('dir_00')
|
||||
manager.filename = 'empty.txt'
|
||||
content = token_urlsafe(32)
|
||||
manager.file_contents = content
|
||||
assert manager.file_contents == content
|
||||
with open(workdir_pref('dir_00/empty.txt'), "r") as ifile:
|
||||
assert ifile.read() == content
|
Loading…
Reference in New Issue
Block a user