# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. from os.path import isfile, join, relpath, exists, isdir, realpath from glob import glob from fnmatch import fnmatchcase from collections import Iterable from tfw import TriggerlessEventHandler from tfw.config.logs import logging from .directory_monitor import DirectoryMonitor LOG = logging.getLogger(__name__) class FileManager: # pylint: disable=too-many-instance-attributes def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None): self._exclude, self.exclude = None, exclude self._allowed_directories, self.allowed_directories = None, allowed_directories self._workdir, self.workdir = None, working_directory self._filename, self.filename = None, selected_file or self.files[0] @property def exclude(self): return self._exclude @exclude.setter def exclude(self, exclude): if exclude is None: return if not isinstance(exclude, Iterable): raise TypeError('Exclude must be Iterable!') self._exclude = exclude @property def workdir(self): return self._workdir @workdir.setter def workdir(self, directory): if not exists(directory) or not isdir(directory): raise EnvironmentError('"{}" is not a directory!'.format(directory)) if not self._is_whitelisted(directory): raise EnvironmentError('Directory "{}" is not in whitelist!'.format(directory)) self._workdir = directory @property def allowed_directories(self): return self._allowed_directories @allowed_directories.setter def allowed_directories(self, directories): self._allowed_directories = directories @property def filename(self): return self._filename @filename.setter def filename(self, filename): if not filename in self.files: raise EnvironmentError('No such file in workdir!') self._filename = filename @property def files(self): return [self._relpath(file) for file in glob(join(self._workdir, '**/*'), recursive=True) if isfile(file) and self._is_whitelisted(file) and not any(fnmatchcase(file, blacklisted) for blacklisted in self.exclude)] @property def file_contents(self): with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile: return ifile.read() @file_contents.setter def file_contents(self, value): with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile: ofile.write(value) def _is_whitelisted(self, file): return any(realpath(file).startswith(allowed_dir) for allowed_dir in self.allowed_directories) def _filepath(self, filename): return join(self._workdir, filename) def _relpath(self, filename): return relpath(self._filepath(filename), start=self._workdir) class SourceCodeEventHandler(TriggerlessEventHandler): # pylint: disable=too-many-arguments def __init__(self, key, directory, allowed_directories, selected_file=None, exclude=None): super().__init__(key) self.filemanager = FileManager(allowed_directories=allowed_directories, working_directory=directory, selected_file=selected_file, exclude=exclude) self.commands = {'read': self.read, 'write': self.write, 'select': self.select, 'selectdir': self.select_dir, 'exclude': self.exclude} self._monitor = None self.reload_monitor() @property def monitor(self): return self._monitor def reload_monitor(self): if self._monitor: try: self._monitor.stop() except KeyError: logging.debug('Working directory was removed – ignoring...') self._monitor = DirectoryMonitor(self.filemanager.workdir) self._monitor.watch() # This runs on a separate thread def read(self, data): try: data['content'] = self.filemanager.file_contents except PermissionError: data['content'] = 'You have no permission to open that file :(' except FileNotFoundError: data['content'] = 'This file was removed :(' except Exception: # pylint: disable=broad-except data['content'] = 'Failed to read file :(' return data def write(self, data): self.monitor.ignore = self.monitor.ignore + 1 try: self.filemanager.file_contents = data['content'] except Exception: # pylint: disable=broad-except LOG.exception('Error writing file!') del data['content'] return data def select(self, data): try: self.filemanager.filename = data['filename'] except EnvironmentError: LOG.exception('Failed to select file "%s"', data['filename']) return data def select_dir(self, data): try: self.filemanager.workdir = data['directory'] self.reload_monitor() try: self.filemanager.filename = self.filemanager.files[0] self.read(data) except IndexError: data['content'] = 'No files in this directory :(' except EnvironmentError as err: LOG.error('Failed to select directory "%s". Reason: %s', data['directory'], str(err)) return data def exclude(self, data): try: self.filemanager.exclude = list(data['exclude']) except TypeError: LOG.error('Exclude must be Iterable!') return data def attach_fileinfo(self, data): data['filename'] = self.filemanager.filename data['files'] = self.filemanager.files data['directory'] = self.filemanager.workdir def handle_event(self, key, message): try: data = message['data'] message['data'] = self.commands[data['command']](data) self.attach_fileinfo(data) return message except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) def cleanup(self): self.monitor.stop()