# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. from os.path import isfile, join, relpath, exists, isdir, realpath from glob import glob from fnmatch import fnmatchcase from typing import Iterable from tfw.event_handler_base import FrontendEventHandlerBase from tfw.mixins.monitor_manager_mixin import MonitorManagerMixin from tfw.components.directory_monitor import DirectoryMonitor from tfw.config.logs import logging LOG = logging.getLogger(__name__) class FileManager: # pylint: disable=too-many-instance-attributes def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None): self._exclude, self.exclude = None, exclude self._allowed_directories, self.allowed_directories = None, allowed_directories self._workdir, self.workdir = None, working_directory self._filename, self.filename = None, selected_file or self.files[0] @property def exclude(self): return self._exclude @exclude.setter def exclude(self, exclude): if exclude is None: return if not isinstance(exclude, Iterable): raise TypeError('Exclude must be Iterable!') self._exclude = exclude @property def workdir(self): return self._workdir @workdir.setter def workdir(self, directory): if not exists(directory) or not isdir(directory): raise EnvironmentError(f'"{directory}" is not a directory!') if not self._is_in_allowed_dir(directory): raise EnvironmentError(f'Directory "{directory}" is not allowed!') self._workdir = directory @property def allowed_directories(self): return self._allowed_directories @allowed_directories.setter def allowed_directories(self, directories): self._allowed_directories = directories @property def filename(self): return self._filename @filename.setter def filename(self, filename): if filename not in self.files: raise EnvironmentError('No such file in workdir!') self._filename = filename @property def files(self): return [ self._relpath(file) for file in glob(join(self._workdir, '**/*'), recursive=True) if isfile(file) and self._is_in_allowed_dir(file) and not self._is_blacklisted(file) ] @property def file_contents(self): with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile: return ifile.read() @file_contents.setter def file_contents(self, value): with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile: ofile.write(value) def _is_in_allowed_dir(self, path): return any( realpath(path).startswith(allowed_dir) for allowed_dir in self.allowed_directories ) def _is_blacklisted(self, file): return any( fnmatchcase(file, blacklisted) for blacklisted in self.exclude ) def _filepath(self, filename): return join(self._workdir, filename) def _relpath(self, filename): return relpath(self._filepath(filename), start=self._workdir) class IdeEventHandler(FrontendEventHandlerBase, MonitorManagerMixin): # pylint: disable=too-many-arguments,anomalous-backslash-in-string """ Event handler implementing the backend of our browser based IDE. By default all files in the directory specified in __init__ are displayed on the fontend. Note that this is a stateful component. When any file in the selected directory changes they are automatically refreshed on the frontend (this is done by listening to inotify events). This EventHandler accepts messages that have a data['command'] key specifying a command to be executed. The API of each command is documented in their respective handler. """ def __init__(self, key, directory, allowed_directories, selected_file=None, exclude=None): """ :param key: the key this instance should listen to :param directory: working directory which the EventHandler should serve files from :param allowed_directories: list of directories that can be switched to using selectdir :param selected_file: file that is selected by default :param exclude: list of filenames that should not appear between files (for .o, .pyc, etc.) """ super().__init__(key) try: self.filemanager = FileManager( allowed_directories=allowed_directories, working_directory=directory, selected_file=selected_file, exclude=exclude ) except IndexError: raise EnvironmentError( f'No file(s) in IdeEventHandler working_directory "{directory}"!' ) MonitorManagerMixin.__init__( self, DirectoryMonitor, self.key, self.filemanager.allowed_directories ) self.commands = { 'read': self.read, 'write': self.write, 'select': self.select, 'selectdir': self.select_dir, 'exclude': self.exclude } def read(self, data): """ Read the currently selected file. :return dict: TFW message data containing key 'content' (contents of the selected file) """ try: data['content'] = self.filemanager.file_contents except PermissionError: data['content'] = 'You have no permission to open that file :(' except FileNotFoundError: data['content'] = 'This file was removed :(' except Exception: # pylint: disable=broad-except data['content'] = 'Failed to read file :(' return data def write(self, data): """ Overwrites a file with the desired string. :param data: TFW message data containing key 'content' (new file content) """ self.monitor.ignore = self.monitor.ignore + 1 try: self.filemanager.file_contents = data['content'] except Exception: # pylint: disable=broad-except LOG.exception('Error writing file!') del data['content'] return data def select(self, data): """ Selects a file from the current directory. :param data: TFW message data containing 'filename' (name of file to select relative to the current directory) """ try: self.filemanager.filename = data['filename'] except EnvironmentError: LOG.exception('Failed to select file "%s"', data['filename']) return data def select_dir(self, data): """ Select a new working directory to display files from. :param data: TFW message data containing 'directory' (absolute path of diretory to select. must be a path whitelisted in self.allowed_directories) """ try: self.filemanager.workdir = data['directory'] self.reload_monitor() try: self.filemanager.filename = self.filemanager.files[0] self.read(data) except IndexError: data['content'] = 'No files in this directory :(' except EnvironmentError as err: LOG.error('Failed to select directory "%s". Reason: %s', data['directory'], str(err)) return data def exclude(self, data): """ Overwrite list of excluded files :param data: TFW message data containing 'exclude' (list of unix-style filename patterns to be excluded, e.g.: ["\*.pyc", "\*.o") """ try: self.filemanager.exclude = list(data['exclude']) except TypeError: LOG.error('Exclude must be Iterable!') return data def attach_fileinfo(self, data): """ Basic information included in every response to the frontend. """ data['filename'] = self.filemanager.filename data['files'] = self.filemanager.files data['directory'] = self.filemanager.workdir def handle_event(self, message): try: data = message['data'] message['data'] = self.commands[data['command']](data) self.attach_fileinfo(data) return message except KeyError: LOG.error('IGNORING MESSAGE: Invalid message received: %s', message) def cleanup(self): self.monitor.stop()