mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-23 09:11:32 +00:00
94 lines
3.0 KiB
Python
94 lines
3.0 KiB
Python
from typing import Iterable
|
|
from glob import glob
|
|
from fnmatch import fnmatchcase
|
|
from os.path import basename, isfile, join, relpath, exists, isdir, realpath
|
|
|
|
|
|
class FileManager: # pylint: disable=too-many-instance-attributes
|
|
def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None):
|
|
self._exclude, self.exclude = [], exclude
|
|
self._allowed_directories, self.allowed_directories = None, allowed_directories
|
|
self._workdir, self.workdir = None, working_directory
|
|
self._filename, self.filename = None, selected_file or self.files[0]
|
|
|
|
@property
|
|
def exclude(self):
|
|
return self._exclude
|
|
|
|
@exclude.setter
|
|
def exclude(self, exclude):
|
|
if exclude is None:
|
|
return
|
|
if not isinstance(exclude, Iterable):
|
|
raise TypeError('Exclude must be Iterable!')
|
|
self._exclude = exclude
|
|
|
|
@property
|
|
def workdir(self):
|
|
return self._workdir
|
|
|
|
@workdir.setter
|
|
def workdir(self, directory):
|
|
if not exists(directory) or not isdir(directory):
|
|
raise EnvironmentError(f'"{directory}" is not a directory!')
|
|
if not self._is_in_allowed_dir(directory):
|
|
raise EnvironmentError(f'Directory "{directory}" is not allowed!')
|
|
self._workdir = directory
|
|
|
|
@property
|
|
def allowed_directories(self):
|
|
return self._allowed_directories
|
|
|
|
@allowed_directories.setter
|
|
def allowed_directories(self, directories):
|
|
self._allowed_directories = [realpath(directory) for directory in directories]
|
|
|
|
@property
|
|
def filename(self):
|
|
return self._filename
|
|
|
|
@filename.setter
|
|
def filename(self, filename):
|
|
if filename not in self.files:
|
|
raise EnvironmentError('No such file in workdir!')
|
|
self._filename = filename
|
|
|
|
@property
|
|
def files(self):
|
|
return [
|
|
self._relpath(file)
|
|
for file in glob(join(self._workdir, '**/*'), recursive=True)
|
|
if isfile(file)
|
|
and self._is_in_allowed_dir(file)
|
|
and not self._is_blacklisted(file)
|
|
]
|
|
|
|
@property
|
|
def file_contents(self):
|
|
with open(self._filepath(self.filename), 'rb', buffering=0) as ifile:
|
|
return ifile.read().decode(errors='surrogateescape')
|
|
|
|
@file_contents.setter
|
|
def file_contents(self, value):
|
|
with open(self._filepath(self.filename), 'wb', buffering=0) as ofile:
|
|
ofile.write(value.encode())
|
|
|
|
def _is_in_allowed_dir(self, path):
|
|
return any(
|
|
realpath(path).startswith(allowed_dir)
|
|
for allowed_dir in self.allowed_directories
|
|
)
|
|
|
|
def _is_blacklisted(self, file):
|
|
return any(
|
|
fnmatchcase(file, blacklisted) or
|
|
fnmatchcase(basename(file), blacklisted)
|
|
for blacklisted in self.exclude
|
|
)
|
|
|
|
def _filepath(self, filename):
|
|
return join(self._workdir, filename)
|
|
|
|
def _relpath(self, filename):
|
|
return relpath(self._filepath(filename), start=self._workdir)
|