mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-23 04:51:33 +00:00
67 lines
1.9 KiB
Python
67 lines
1.9 KiB
Python
import logging
|
|
from functools import wraps
|
|
from glob import glob
|
|
from fnmatch import fnmatchcase
|
|
from os.path import dirname, isdir, isfile, realpath, isabs
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
def _with_is_allowed(func):
|
|
@wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
if self.is_allowed(args[0]):
|
|
return func(self, *args, **kwargs)
|
|
raise ValueError('Forbidden path.')
|
|
return wrapper
|
|
|
|
|
|
class FileManager: # pylint: disable=too-many-instance-attributes
|
|
def __init__(self, patterns):
|
|
self.patterns = patterns
|
|
|
|
@property
|
|
def files(self):
|
|
return list(set(
|
|
path
|
|
for pattern in self.patterns
|
|
for path in glob(pattern, recursive=True)
|
|
if isfile(path) and self.is_allowed(path)
|
|
))
|
|
|
|
@property
|
|
def parents(self):
|
|
return list(set(
|
|
self._find_directory(pattern)
|
|
for pattern in self.patterns
|
|
))
|
|
|
|
@staticmethod
|
|
def _find_directory(pattern):
|
|
while pattern and not isdir(pattern):
|
|
pattern = dirname(pattern)
|
|
return pattern
|
|
|
|
def is_allowed(self, filepath):
|
|
return any(
|
|
fnmatchcase(realpath(filepath), pattern)
|
|
for pattern in self.patterns
|
|
)
|
|
|
|
def find_file(self, filename):
|
|
if not isabs(filename):
|
|
for filepath in self.files:
|
|
if filepath.endswith(filename):
|
|
return filepath
|
|
return filename
|
|
|
|
@_with_is_allowed
|
|
def read_file(self, filepath): # pylint: disable=no-self-use
|
|
with open(filepath, 'rb', buffering=0) as ifile:
|
|
return ifile.read().decode(errors='surrogateescape')
|
|
|
|
@_with_is_allowed
|
|
def write_file(self, filepath, contents): # pylint: disable=no-self-use
|
|
with open(filepath, 'wb', buffering=0) as ofile:
|
|
ofile.write(contents.encode())
|