Simplify IDE handler and file manager

This commit is contained in:
R. Richard
2019-08-07 09:44:03 +02:00
parent e6d2777520
commit d31a850a4e
3 changed files with 127 additions and 299 deletions

View File

@ -1,93 +1,56 @@
from typing import Iterable
from functools import wraps
from glob import glob
from fnmatch import fnmatchcase
from os.path import basename, isfile, join, relpath, exists, isdir, realpath
from os.path import dirname, isdir, isfile, realpath
class FileManager: # pylint: disable=too-many-instance-attributes
def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None):
self._exclude, self.exclude = [], exclude
self._allowed_directories, self.allowed_directories = None, allowed_directories
self._workdir, self.workdir = None, working_directory
self._filename, self.filename = None, selected_file or self.files[0]
def _with_is_allowed(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if self._is_allowed(args[0]): # pylint: disable=protected-access
return func(self, *args, **kwargs)
raise ValueError('Forbidden path.')
return wrapper
@property
def exclude(self):
return self._exclude
@exclude.setter
def exclude(self, exclude):
if exclude is None:
return
if not isinstance(exclude, Iterable):
raise TypeError('Exclude must be Iterable!')
self._exclude = exclude
@property
def workdir(self):
return self._workdir
@workdir.setter
def workdir(self, directory):
if not exists(directory) or not isdir(directory):
raise EnvironmentError(f'"{directory}" is not a directory!')
if not self._is_in_allowed_dir(directory):
raise EnvironmentError(f'Directory "{directory}" is not allowed!')
self._workdir = directory
@property
def allowed_directories(self):
return self._allowed_directories
@allowed_directories.setter
def allowed_directories(self, directories):
self._allowed_directories = [realpath(directory) for directory in directories]
@property
def filename(self):
return self._filename
@filename.setter
def filename(self, filename):
if filename not in self.files:
raise EnvironmentError('No such file in workdir!')
self._filename = filename
class FileManager: # pylint: disable=too-many-instance-attributes
def __init__(self, patterns):
self.patterns = patterns
@property
def files(self):
return [
self._relpath(file)
for file in glob(join(self._workdir, '**/*'), recursive=True)
if isfile(file)
and self._is_in_allowed_dir(file)
and not self._is_blacklisted(file)
]
return list(set(
path
for pattern in self.patterns
for path in glob(pattern, recursive=True)
if isfile(path) and self._is_allowed(path)
))
@property
def file_contents(self):
with open(self._filepath(self.filename), 'rb', buffering=0) as ifile:
def parents(self):
return list(set(
self._find_directory(pattern)
for pattern in self.patterns
))
@staticmethod
def _find_directory(pattern):
while pattern and not isdir(pattern):
pattern = dirname(pattern)
return pattern
def _is_allowed(self, filepath):
return any(
fnmatchcase(realpath(filepath), pattern)
for pattern in self.patterns
)
@_with_is_allowed
def read_file(self, filepath): # pylint: disable=no-self-use
with open(filepath, 'rb', buffering=0) as ifile:
return ifile.read().decode(errors='surrogateescape')
@file_contents.setter
def file_contents(self, value):
with open(self._filepath(self.filename), 'wb', buffering=0) as ofile:
ofile.write(value.encode())
def _is_in_allowed_dir(self, path):
return any(
realpath(path).startswith(allowed_dir)
for allowed_dir in self.allowed_directories
)
def _is_blacklisted(self, file):
return any(
fnmatchcase(file, blacklisted) or
fnmatchcase(basename(file), blacklisted)
for blacklisted in self.exclude
)
def _filepath(self, filename):
return join(self._workdir, filename)
def _relpath(self, filename):
return relpath(self._filepath(filename), start=self._workdir)
@_with_is_allowed
def write_file(self, filepath, contents): # pylint: disable=no-self-use
with open(filepath, 'wb', buffering=0) as ofile:
ofile.write(contents.encode())

View File

@ -1,8 +1,8 @@
# pylint: disable=redefined-outer-name
from dataclasses import dataclass
from secrets import token_urlsafe
from os import mkdir, symlink
from os.path import join
from os import chdir, mkdir, symlink
from pathlib import Path
from tempfile import TemporaryDirectory
@ -13,112 +13,75 @@ from .file_manager import FileManager
@dataclass
class ManagerContext:
folder: str
workdir: str
subdir: str
subfile: str
manager: FileManager
def join(self, path):
return join(self.folder, path)
def create_random_file(self, dirname, extension):
filename = self.join(f'{dirname}/{generate_name()}{extension}')
Path(filename).touch()
return filename
def create_random_folder(self, basepath):
dirname = self.join(f'{basepath}/{generate_name()}')
mkdir(dirname)
return dirname
def create_random_link(self, source, dirname, extension):
linkname = self.join(f'{dirname}/{generate_name()}{extension}')
symlink(source, linkname)
return linkname
def join(self, path):
return join(self.workdir, path)
def generate_name():
return token_urlsafe(16)
@pytest.fixture()
def context():
dirs = {}
with TemporaryDirectory() as workdir:
chdir(workdir)
for name in ['allowed', 'excluded', 'invis']:
node = join(workdir, name)
mkdir(node)
Path(join(node, 'empty.txt')).touch()
Path(join(node, 'empty.bin')).touch()
dirs[name] = node
subdir = join(workdir, generate_name())
subfile = join(subdir, generate_name() + '.txt')
mkdir(subdir)
Path(subfile).touch()
manager = FileManager([join(workdir, '**/*.txt')])
yield ManagerContext(workdir, subdir, subfile, manager)
yield ManagerContext(
workdir,
FileManager(
dirs['allowed'],
[dirs['allowed'], dirs['excluded']],
exclude=['*/excluded/*']
)
)
def test_matching_files(context):
newdir = context.create_random_folder(context.subdir)
newfile = context.create_random_file(newdir, '.txt')
newlink = context.create_random_link(newfile, newdir, '.txt')
assert set(context.manager.files) == {context.subfile, newfile, newlink}
@pytest.mark.parametrize('subdir', ['allowed/', 'excluded/'])
def test_select_allowed_dirs(context, subdir):
context.manager.workdir = context.join(subdir)
assert context.manager.workdir == context.join(subdir)
newdir = context.join(subdir+'deep')
mkdir(newdir)
context.manager.workdir = newdir
assert context.manager.workdir == newdir
def test_unmatching_files(context):
newtxt = context.create_random_file(context.workdir, '.txt')
newbin = context.create_random_file(context.subdir, '.bin')
context.create_random_link(newtxt, context.subdir, '.txt')
context.create_random_link(newbin, context.subdir, '.txt')
assert context.manager.files == [context.subfile]
@pytest.mark.parametrize('invdir', ['', 'invis'])
def test_select_forbidden_dirs(context, invdir):
fullpath = context.join(invdir)
with pytest.raises(OSError):
context.manager.workdir = fullpath
assert context.manager.workdir != fullpath
context.manager.allowed_directories += [fullpath]
context.manager.workdir = fullpath
assert context.manager.workdir == fullpath
@pytest.mark.parametrize('filename', ['another.txt', '*.txt'])
def test_select_allowed_files(context, filename):
Path(context.join('allowed/'+filename)).touch()
assert filename in context.manager.files
context.manager.filename = filename
assert context.manager.filename == filename
@pytest.mark.parametrize('path', [
{'dir': 'allowed/', 'file': 'illegal.bin'},
{'dir': 'excluded/', 'file': 'legal.txt'},
{'dir': 'allowed/', 'file': token_urlsafe(16)+'.bin'},
{'dir': 'excluded/', 'file': token_urlsafe(16)+'.txt'},
{'dir': 'allowed/', 'file': token_urlsafe(32)+'.bin'},
{'dir': 'excluded/', 'file': token_urlsafe(32)+'.txt'}
])
def test_select_excluded_files(context, path):
context.manager.workdir = context.join(path['dir'])
context.manager.exclude = ['*/excluded/*', '*.bin']
Path(context.join(path['dir']+path['file'])).touch()
assert path['file'] not in context.manager.files
with pytest.raises(OSError):
context.manager.filename = path['file']
@pytest.mark.parametrize('path', [
{'src': 'excluded/empty.txt', 'dst': 'allowed/link.txt'},
{'src': 'invis/empty.txt', 'dst': 'allowed/link.txt'},
{'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'},
{'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(16)+'.txt'},
{'src': 'excluded/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'},
{'src': 'invis/empty.txt', 'dst': 'allowed/'+token_urlsafe(32)+'.txt'}
])
def test_select_excluded_symlinks(context, path):
symlink(context.join(path['src']), context.join(path['dst']))
assert path['dst'] not in context.manager.files
def test_parents(context):
newdir = context.create_random_folder(context.workdir)
context.manager.patterns += [f'{newdir}/[!/@]*/**/?.c']
assert set(context.manager.parents) == {context.workdir, newdir}
def test_read_write_file(context):
for _ in range(128):
context.manager.filename = 'empty.txt'
content = token_urlsafe(32)
context.manager.file_contents = content
assert context.manager.file_contents == content
with open(context.join('allowed/empty.txt'), 'r') as ifile:
context.manager.write_file(context.subfile, content)
assert context.manager.read_file(context.subfile) == content
with open(context.subfile, 'r') as ifile:
assert ifile.read() == content
def test_regular_ide_actions(context):
context.manager.workdir = context.join('allowed')
newfile1, newfile2 = token_urlsafe(16), token_urlsafe(16)
Path(context.join(f'allowed/{newfile1}')).touch()
Path(context.join(f'allowed/{newfile2}')).touch()
for _ in range(8):
newfile1 = context.create_random_file(context.subdir, '.txt')
newfile2 = context.create_random_file(context.subdir, '.txt')
for _ in range(4):
context.manager.filename = newfile1
content1 = token_urlsafe(32)
context.manager.file_contents = content1
context.manager.filename = newfile2
content2 = token_urlsafe(32)
context.manager.file_contents = content2
context.manager.filename = newfile1
assert context.manager.file_contents == content1
context.manager.filename = newfile2
assert context.manager.file_contents == content2
content1, content2 = token_urlsafe(32), token_urlsafe(32)
context.manager.write_file(newfile1, content1)
context.manager.write_file(newfile2, content2)
assert context.manager.read_file(newfile1) == content1
assert context.manager.read_file(newfile2) == content2