baseimage-tutorial-framework/lib/tfw/components/source_code_event_handler.py

184 lines
6.3 KiB
Python
Raw Normal View History

# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from os.path import isfile, join, relpath, exists, isdir, realpath
from glob import glob
from fnmatch import fnmatchcase
from collections import Iterable
from tfw import TriggerlessEventHandler
from tfw.config.logs import logging
from .directory_monitor import DirectoryMonitor
LOG = logging.getLogger(__name__)
class FileManager: # pylint: disable=too-many-instance-attributes
2018-04-05 15:16:41 +00:00
def __init__(self, working_directory, allowed_directories, selected_file=None, exclude=None):
self._exclude, self.exclude = None, exclude
self._allowed_directories, self.allowed_directories = None, allowed_directories
self._workdir, self.workdir = None, working_directory
self._filename, self.filename = None, selected_file or self.files[0]
@property
def exclude(self):
return self._exclude
@exclude.setter
def exclude(self, exclude):
if exclude is None:
return
if not isinstance(exclude, Iterable):
raise TypeError('Exclude must be Iterable!')
self._exclude = exclude
@property
def workdir(self):
return self._workdir
@workdir.setter
def workdir(self, directory):
if not exists(directory) or not isdir(directory):
raise EnvironmentError('"{}" is not a directory!'.format(directory))
2018-04-05 15:16:41 +00:00
if not self._is_whitelisted(directory):
raise EnvironmentError('Directory "{}" is not in whitelist!'.format(directory))
self._workdir = directory
@property
def allowed_directories(self):
return self._allowed_directories
@allowed_directories.setter
def allowed_directories(self, directories):
self._allowed_directories = directories
@property
def filename(self):
return self._filename
@filename.setter
def filename(self, filename):
if not filename in self.files:
2018-03-25 14:20:49 +00:00
raise EnvironmentError('No such file in workdir!')
self._filename = filename
@property
def files(self):
2018-02-08 15:56:30 +00:00
return [self._relpath(file) for file in glob(join(self._workdir, '**/*'), recursive=True)
2018-04-05 15:19:56 +00:00
if isfile(file)
and self._is_whitelisted(file)
and not any(fnmatchcase(file, blacklisted) for blacklisted in self.exclude)]
@property
def file_contents(self):
with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile:
return ifile.read()
@file_contents.setter
def file_contents(self, value):
with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile:
ofile.write(value)
def _is_whitelisted(self, file):
return any(realpath(file).startswith(allowed_dir) for allowed_dir in self.allowed_directories)
def _filepath(self, filename):
return join(self._workdir, filename)
def _relpath(self, filename):
return relpath(self._filepath(filename), start=self._workdir)
class SourceCodeEventHandler(TriggerlessEventHandler):
# pylint: disable=too-many-arguments
2018-04-05 15:16:41 +00:00
def __init__(self, key, directory, allowed_directories, selected_file=None, exclude=None):
super().__init__(key)
self.filemanager = FileManager(allowed_directories=allowed_directories, working_directory=directory,
selected_file=selected_file, exclude=exclude)
self.commands = {'read': self.read,
'write': self.write,
'select': self.select,
'selectdir': self.select_dir,
'exclude': self.exclude}
self._monitor = None
self.reload_monitor()
@property
def monitor(self):
return self._monitor
def reload_monitor(self):
if self._monitor:
try:
self._monitor.stop()
except KeyError:
logging.debug('Working directory was removed ignoring...')
self._monitor = DirectoryMonitor(self.filemanager.workdir)
self._monitor.watch() # This runs on a separate thread
def read(self, data):
try:
data['content'] = self.filemanager.file_contents
except PermissionError:
data['content'] = 'You have no permission to open that file :('
except FileNotFoundError:
data['content'] = 'This file was removed :('
except Exception: # pylint: disable=broad-except
data['content'] = 'Failed to read file :('
return data
def write(self, data):
self.monitor.ignore = self.monitor.ignore + 1
try:
self.filemanager.file_contents = data['content']
except Exception: # pylint: disable=broad-except
LOG.exception('Error writing file!')
del data['content']
return data
def select(self, data):
try:
self.filemanager.filename = data['filename']
except EnvironmentError:
LOG.exception('Failed to select file "%s"', data['filename'])
return data
def select_dir(self, data):
try:
self.filemanager.workdir = data['directory']
self.reload_monitor()
try:
self.filemanager.filename = self.filemanager.files[0]
self.read(data)
except IndexError:
data['content'] = 'No files in this directory :('
2018-04-06 14:09:05 +00:00
except EnvironmentError as err:
LOG.error('Failed to select directory "%s". Reason: %s', data['directory'], str(err))
return data
def exclude(self, data):
try:
self.filemanager.exclude = list(data['exclude'])
except TypeError:
LOG.error('Exclude must be Iterable!')
return data
def attach_fileinfo(self, data):
data['filename'] = self.filemanager.filename
data['files'] = self.filemanager.files
data['directory'] = self.filemanager.workdir
def handle_event(self, key, message):
try:
data = message['data']
message['data'] = self.commands[data['command']](data)
self.attach_fileinfo(data)
return message
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
2018-02-13 14:38:46 +00:00
def cleanup(self):
self.monitor.stop()