mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-09 17:07:17 +00:00
164 lines
5.3 KiB
Python
164 lines
5.3 KiB
Python
from os.path import isfile, join, relpath, exists, isdir
|
||
from glob import glob
|
||
from fnmatch import fnmatchcase
|
||
from collections import Iterable
|
||
|
||
from tfw.event_handler_base import TriggerlessEventHandler
|
||
from tfw.components.directory_monitor import DirectoryMonitor
|
||
from tfw.config.logs import logging
|
||
|
||
LOG = logging.getLogger(__name__)
|
||
|
||
|
||
class FileManager:
|
||
def __init__(self, working_directory, selected_file=None, exclude=None):
|
||
self._exclude, self.exclude = None, exclude
|
||
self._workdir, self.workdir = None, working_directory
|
||
self._filename, self.filename = None, selected_file or self.files[0]
|
||
|
||
@property
|
||
def exclude(self):
|
||
return self._exclude
|
||
|
||
@exclude.setter
|
||
def exclude(self, exclude):
|
||
if exclude is None:
|
||
return
|
||
if not isinstance(exclude, Iterable):
|
||
raise TypeError('Exclude must be Iterable!')
|
||
self._exclude = exclude
|
||
|
||
@property
|
||
def workdir(self):
|
||
return self._workdir
|
||
|
||
@workdir.setter
|
||
def workdir(self, directory):
|
||
if not exists(directory) or not isdir(directory):
|
||
raise EnvironmentError('"{}" is not a directory!'.format(directory))
|
||
self._workdir = directory
|
||
|
||
@property
|
||
def filename(self):
|
||
return self._filename
|
||
|
||
@filename.setter
|
||
def filename(self, filename):
|
||
if not filename in self.files:
|
||
raise EnvironmentError('No such file in workdir!')
|
||
self._filename = filename
|
||
|
||
@property
|
||
def files(self):
|
||
return [self._relpath(file) for file in glob(join(self._workdir, '**/*'), recursive=True)
|
||
if isfile(file) and
|
||
not any(fnmatchcase(file, blacklisted) for blacklisted in self.exclude)]
|
||
|
||
@property
|
||
def file_contents(self):
|
||
with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile:
|
||
return ifile.read()
|
||
|
||
@file_contents.setter
|
||
def file_contents(self, value):
|
||
with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile:
|
||
ofile.write(value)
|
||
|
||
def _filepath(self, filename):
|
||
return join(self._workdir, filename)
|
||
|
||
def _relpath(self, filename):
|
||
return relpath(self._filepath(filename), start=self._workdir)
|
||
|
||
|
||
class SourceCodeEventHandler(TriggerlessEventHandler):
|
||
def __init__(self, key, directory, selected_file=None, exclude=None):
|
||
super().__init__(key)
|
||
self.filemanager = FileManager(directory, selected_file=selected_file, exclude=exclude)
|
||
|
||
self.commands = {'read': self.read,
|
||
'write': self.write,
|
||
'select': self.select,
|
||
'selectdir': self.select_dir,
|
||
'exclude': self.exclude}
|
||
|
||
self._monitor = None
|
||
self.reload_monitor()
|
||
|
||
@property
|
||
def monitor(self):
|
||
return self._monitor
|
||
|
||
def reload_monitor(self):
|
||
if self._monitor:
|
||
try:
|
||
self._monitor.stop()
|
||
except KeyError:
|
||
logging.debug('Working directory was removed – ignoring...')
|
||
self._monitor = DirectoryMonitor(self.filemanager.workdir)
|
||
self._monitor.watch() # This runs on a separate thread
|
||
|
||
def read(self, data):
|
||
try:
|
||
data['content'] = self.filemanager.file_contents
|
||
except PermissionError:
|
||
data['content'] = 'You have no permission to open that file :('
|
||
except FileNotFoundError:
|
||
data['content'] = 'This file was removed :('
|
||
except Exception:
|
||
data['content'] = 'Failed to read file :('
|
||
return data
|
||
|
||
def write(self, data):
|
||
self.monitor.ignore = self.monitor.ignore + 1
|
||
try:
|
||
self.filemanager.file_contents = data['content']
|
||
except Exception:
|
||
LOG.exception('Error writing file!')
|
||
del data['content']
|
||
return data
|
||
|
||
def select(self, data):
|
||
try:
|
||
self.filemanager.filename = data['filename']
|
||
except EnvironmentError:
|
||
LOG.exception('Failed to select file "%s"', data['filename'])
|
||
return data
|
||
|
||
def select_dir(self, data):
|
||
try:
|
||
self.filemanager.workdir = data['directory']
|
||
self.reload_monitor()
|
||
try:
|
||
self.filemanager.filename = self.filemanager.files[0]
|
||
self.read(data)
|
||
except IndexError:
|
||
data['content'] = 'No files in this directory :('
|
||
except EnvironmentError:
|
||
LOG.error('Failed to select directory "%s"', data['directory'])
|
||
return data
|
||
|
||
def exclude(self, data):
|
||
try:
|
||
self.filemanager.exclude = list(data['exclude'])
|
||
except TypeError:
|
||
LOG.error('Exclude must be Iterable!')
|
||
return data
|
||
|
||
def attach_fileinfo(self, data):
|
||
data['filename'] = self.filemanager.filename
|
||
data['files'] = self.filemanager.files
|
||
data['directory'] = self.filemanager.workdir
|
||
|
||
def handle_event(self, key, message):
|
||
try:
|
||
data = message['data']
|
||
message['data'] = self.commands[data['command']](data)
|
||
self.attach_fileinfo(data)
|
||
return message
|
||
except KeyError:
|
||
LOG.error('IGNORING MESSAGE: Invalid message received: %s', message)
|
||
|
||
def cleanup(self):
|
||
self.monitor.stop()
|