mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-25 19:01:33 +00:00
Merge pull request #55 from avatao-content/file_manager_fixes
File manager fixes
This commit is contained in:
commit
8482d7ec2b
93
lib/tfw/components/ide_event_handler/file_manager.py
Normal file
93
lib/tfw/components/ide_event_handler/file_manager.py
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
from typing import Iterable
|
||||||
|
from glob import glob
|
||||||
|
from fnmatch import fnmatchcase
|
||||||
|
from os.path import basename, isfile, join, relpath, exists, isdir, realpath
|
||||||
|
|
||||||
|
|
||||||
|
class FileManager: # pylint: disable=too-many-instance-attributes
|
||||||
|
def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None):
|
||||||
|
self._exclude, self.exclude = [], exclude
|
||||||
|
self._allowed_directories, self.allowed_directories = None, allowed_directories
|
||||||
|
self._workdir, self.workdir = None, working_directory
|
||||||
|
self._filename, self.filename = None, selected_file or self.files[0]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def exclude(self):
|
||||||
|
return self._exclude
|
||||||
|
|
||||||
|
@exclude.setter
|
||||||
|
def exclude(self, exclude):
|
||||||
|
if exclude is None:
|
||||||
|
return
|
||||||
|
if not isinstance(exclude, Iterable):
|
||||||
|
raise TypeError('Exclude must be Iterable!')
|
||||||
|
self._exclude = exclude
|
||||||
|
|
||||||
|
@property
|
||||||
|
def workdir(self):
|
||||||
|
return self._workdir
|
||||||
|
|
||||||
|
@workdir.setter
|
||||||
|
def workdir(self, directory):
|
||||||
|
if not exists(directory) or not isdir(directory):
|
||||||
|
raise EnvironmentError(f'"{directory}" is not a directory!')
|
||||||
|
if not self._is_in_allowed_dir(directory):
|
||||||
|
raise EnvironmentError(f'Directory "{directory}" is not allowed!')
|
||||||
|
self._workdir = directory
|
||||||
|
|
||||||
|
@property
|
||||||
|
def allowed_directories(self):
|
||||||
|
return self._allowed_directories
|
||||||
|
|
||||||
|
@allowed_directories.setter
|
||||||
|
def allowed_directories(self, directories):
|
||||||
|
self._allowed_directories = [realpath(directory) for directory in directories]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def filename(self):
|
||||||
|
return self._filename
|
||||||
|
|
||||||
|
@filename.setter
|
||||||
|
def filename(self, filename):
|
||||||
|
if filename not in self.files:
|
||||||
|
raise EnvironmentError('No such file in workdir!')
|
||||||
|
self._filename = filename
|
||||||
|
|
||||||
|
@property
|
||||||
|
def files(self):
|
||||||
|
return [
|
||||||
|
self._relpath(file)
|
||||||
|
for file in glob(join(self._workdir, '**/*'), recursive=True)
|
||||||
|
if isfile(file)
|
||||||
|
and self._is_in_allowed_dir(file)
|
||||||
|
and not self._is_blacklisted(file)
|
||||||
|
]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def file_contents(self):
|
||||||
|
with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile:
|
||||||
|
return ifile.read()
|
||||||
|
|
||||||
|
@file_contents.setter
|
||||||
|
def file_contents(self, value):
|
||||||
|
with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile:
|
||||||
|
ofile.write(value)
|
||||||
|
|
||||||
|
def _is_in_allowed_dir(self, path):
|
||||||
|
return any(
|
||||||
|
realpath(path).startswith(allowed_dir)
|
||||||
|
for allowed_dir in self.allowed_directories
|
||||||
|
)
|
||||||
|
|
||||||
|
def _is_blacklisted(self, file):
|
||||||
|
return any(
|
||||||
|
fnmatchcase(file, blacklisted) or
|
||||||
|
fnmatchcase(basename(file), blacklisted)
|
||||||
|
for blacklisted in self.exclude
|
||||||
|
)
|
||||||
|
|
||||||
|
def _filepath(self, filename):
|
||||||
|
return join(self._workdir, filename)
|
||||||
|
|
||||||
|
def _relpath(self, filename):
|
||||||
|
return relpath(self._filepath(filename), start=self._workdir)
|
@ -11,97 +11,11 @@ from tfw.event_handlers import FrontendEventHandlerBase
|
|||||||
from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin
|
from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin
|
||||||
from tfw.components.directory_monitor import DirectoryMonitor
|
from tfw.components.directory_monitor import DirectoryMonitor
|
||||||
|
|
||||||
|
from .file_manager import FileManager
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class FileManager: # pylint: disable=too-many-instance-attributes
|
|
||||||
def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None):
|
|
||||||
self._exclude, self.exclude = None, exclude
|
|
||||||
self._allowed_directories, self.allowed_directories = None, allowed_directories
|
|
||||||
self._workdir, self.workdir = None, working_directory
|
|
||||||
self._filename, self.filename = None, selected_file or self.files[0]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def exclude(self):
|
|
||||||
return self._exclude
|
|
||||||
|
|
||||||
@exclude.setter
|
|
||||||
def exclude(self, exclude):
|
|
||||||
if exclude is None:
|
|
||||||
return
|
|
||||||
if not isinstance(exclude, Iterable):
|
|
||||||
raise TypeError('Exclude must be Iterable!')
|
|
||||||
self._exclude = exclude
|
|
||||||
|
|
||||||
@property
|
|
||||||
def workdir(self):
|
|
||||||
return self._workdir
|
|
||||||
|
|
||||||
@workdir.setter
|
|
||||||
def workdir(self, directory):
|
|
||||||
if not exists(directory) or not isdir(directory):
|
|
||||||
raise EnvironmentError(f'"{directory}" is not a directory!')
|
|
||||||
if not self._is_in_allowed_dir(directory):
|
|
||||||
raise EnvironmentError(f'Directory "{directory}" is not allowed!')
|
|
||||||
self._workdir = directory
|
|
||||||
|
|
||||||
@property
|
|
||||||
def allowed_directories(self):
|
|
||||||
return self._allowed_directories
|
|
||||||
|
|
||||||
@allowed_directories.setter
|
|
||||||
def allowed_directories(self, directories):
|
|
||||||
self._allowed_directories = directories
|
|
||||||
|
|
||||||
@property
|
|
||||||
def filename(self):
|
|
||||||
return self._filename
|
|
||||||
|
|
||||||
@filename.setter
|
|
||||||
def filename(self, filename):
|
|
||||||
if filename not in self.files:
|
|
||||||
raise EnvironmentError('No such file in workdir!')
|
|
||||||
self._filename = filename
|
|
||||||
|
|
||||||
@property
|
|
||||||
def files(self):
|
|
||||||
return [
|
|
||||||
self._relpath(file)
|
|
||||||
for file in glob(join(self._workdir, '**/*'), recursive=True)
|
|
||||||
if isfile(file)
|
|
||||||
and self._is_in_allowed_dir(file)
|
|
||||||
and not self._is_blacklisted(file)
|
|
||||||
]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def file_contents(self):
|
|
||||||
with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile:
|
|
||||||
return ifile.read()
|
|
||||||
|
|
||||||
@file_contents.setter
|
|
||||||
def file_contents(self, value):
|
|
||||||
with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile:
|
|
||||||
ofile.write(value)
|
|
||||||
|
|
||||||
def _is_in_allowed_dir(self, path):
|
|
||||||
return any(
|
|
||||||
realpath(path).startswith(allowed_dir)
|
|
||||||
for allowed_dir in self.allowed_directories
|
|
||||||
)
|
|
||||||
|
|
||||||
def _is_blacklisted(self, file):
|
|
||||||
return any(
|
|
||||||
fnmatchcase(file, blacklisted)
|
|
||||||
for blacklisted in self.exclude
|
|
||||||
)
|
|
||||||
|
|
||||||
def _filepath(self, filename):
|
|
||||||
return join(self._workdir, filename)
|
|
||||||
|
|
||||||
def _relpath(self, filename):
|
|
||||||
return relpath(self._filepath(filename), start=self._workdir)
|
|
||||||
|
|
||||||
|
|
||||||
class IdeEventHandler(FrontendEventHandlerBase, MonitorManagerMixin):
|
class IdeEventHandler(FrontendEventHandlerBase, MonitorManagerMixin):
|
||||||
# pylint: disable=too-many-arguments,anomalous-backslash-in-string
|
# pylint: disable=too-many-arguments,anomalous-backslash-in-string
|
||||||
"""
|
"""
|
124
lib/tfw/components/ide_event_handler/test_file_manager.py
Normal file
124
lib/tfw/components/ide_event_handler/test_file_manager.py
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
# pylint: disable=redefined-outer-name
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from secrets import token_urlsafe
|
||||||
|
from os.path import join
|
||||||
|
from os import chdir, mkdir, symlink
|
||||||
|
from pathlib import Path
|
||||||
|
from tempfile import TemporaryDirectory
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from file_manager import FileManager
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ManagerContext:
|
||||||
|
folder: str
|
||||||
|
manager: FileManager
|
||||||
|
|
||||||
|
def join(self, path):
|
||||||
|
return join(self.folder, path)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def context():
|
||||||
|
dirs = {}
|
||||||
|
|
||||||
|
with TemporaryDirectory() as workdir:
|
||||||
|
chdir(workdir)
|
||||||
|
for name in ['allowed', 'excluded', 'invis']:
|
||||||
|
node = join(workdir, name)
|
||||||
|
mkdir(node)
|
||||||
|
Path(join(node, 'empty.txt')).touch()
|
||||||
|
Path(join(node, 'empty.bin')).touch()
|
||||||
|
dirs[name] = node
|
||||||
|
|
||||||
|
yield ManagerContext(
|
||||||
|
workdir,
|
||||||
|
FileManager(
|
||||||
|
dirs['allowed'],
|
||||||
|
[dirs['allowed'], dirs['excluded']],
|
||||||
|
exclude=['*/excluded/*']
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('subdir', ['allowed/', 'excluded/'])
|
||||||
|
def test_select_allowed_dirs(context, subdir):
|
||||||
|
context.manager.workdir = context.join(subdir)
|
||||||
|
assert context.manager.workdir == context.join(subdir)
|
||||||
|
newdir = context.join(subdir+'deep')
|
||||||
|
mkdir(newdir)
|
||||||
|
context.manager.workdir = newdir
|
||||||
|
assert context.manager.workdir == newdir
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('invdir', ['', 'invis'])
|
||||||
|
def test_select_forbidden_dirs(context, invdir):
|
||||||
|
fullpath = context.join(invdir)
|
||||||
|
with pytest.raises(OSError):
|
||||||
|
context.manager.workdir = fullpath
|
||||||
|
assert context.manager.workdir != fullpath
|
||||||
|
context.manager.allowed_directories += [fullpath]
|
||||||
|
context.manager.workdir = fullpath
|
||||||
|
assert context.manager.workdir == fullpath
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('filename', ['another.txt', '*.txt'])
|
||||||
|
def test_select_allowed_files(context, filename):
|
||||||
|
Path(context.join('allowed/'+filename)).touch()
|
||||||
|
assert filename in context.manager.files
|
||||||
|
context.manager.filename = filename
|
||||||
|
assert context.manager.filename == filename
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('path', [
|
||||||
|
{'dir': 'allowed/', 'file': 'illegal.bin'},
|
||||||
|
{'dir': 'excluded/', 'file': 'legal.txt'},
|
||||||
|
{'dir': 'allowed/', 'file': token_urlsafe(16)+'.bin'},
|
||||||
|
{'dir': 'excluded/', 'file': token_urlsafe(16)+'.txt'},
|
||||||
|
{'dir': 'allowed/', 'file': token_urlsafe(32)+'.bin'},
|
||||||
|
{'dir': 'excluded/', 'file': token_urlsafe(32)+'.txt'}
|
||||||
|
])
|
||||||
|
def test_select_excluded_files(context, path):
|
||||||
|
context.manager.workdir = context.join(path['dir'])
|
||||||
|
context.manager.exclude = ['*/excluded/*', '*.bin']
|
||||||
|
Path(context.join(path['dir']+path['file'])).touch()
|
||||||
|
assert path['file'] not in context.manager.files
|
||||||
|
with pytest.raises(OSError):
|
||||||
|
context.manager.filename = path['file']
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('path', [
|
||||||
|
{'src': 'excluded/empty.txt', 'dst': 'allowed/link.txt'},
|
||||||
|
{'src': 'invis/empty.txt', 'dst': 'allowed/link.txt'},
|
||||||
|
{'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'},
|
||||||
|
{'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'},
|
||||||
|
{'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'},
|
||||||
|
{'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'}
|
||||||
|
])
|
||||||
|
def test_select_excluded_symlinks(context, path):
|
||||||
|
symlink(context.join(path['src']), context.join(path['dst']))
|
||||||
|
assert path['dst'] not in context.manager.files
|
||||||
|
|
||||||
|
def test_read_write_file(context):
|
||||||
|
for _ in range(128):
|
||||||
|
context.manager.filename = 'empty.txt'
|
||||||
|
content = token_urlsafe(32)
|
||||||
|
context.manager.file_contents = content
|
||||||
|
assert context.manager.file_contents == content
|
||||||
|
with open(context.join('allowed/empty.txt'), 'r') as ifile:
|
||||||
|
assert ifile.read() == content
|
||||||
|
|
||||||
|
def test_regular_ide_actions(context):
|
||||||
|
context.manager.workdir = context.join('allowed')
|
||||||
|
newfile1, newfile2 = token_urlsafe(16), token_urlsafe(16)
|
||||||
|
Path(context.join(f'allowed/{newfile1}')).touch()
|
||||||
|
Path(context.join(f'allowed/{newfile2}')).touch()
|
||||||
|
for _ in range(8):
|
||||||
|
context.manager.filename = newfile1
|
||||||
|
content1 = token_urlsafe(32)
|
||||||
|
context.manager.file_contents = content1
|
||||||
|
context.manager.filename = newfile2
|
||||||
|
content2 = token_urlsafe(32)
|
||||||
|
context.manager.file_contents = content2
|
||||||
|
context.manager.filename = newfile1
|
||||||
|
assert context.manager.file_contents == content1
|
||||||
|
context.manager.filename = newfile2
|
||||||
|
assert context.manager.file_contents == content2
|
Loading…
Reference in New Issue
Block a user