baseimage-tutorial-framework/lib/tfw/components/source_code_event_handler.py

150 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from os.path import isfile, join, relpath, exists, isdir
from glob import glob
from fnmatch import fnmatchcase
from collections import Iterable
from tfw.event_handler_base import TriggerlessEventHandler
from tfw.components.directory_monitor import DirectoryMonitor
from tfw.config.logs import logging
LOG = logging.getLogger(__name__)
class FileManager:
def __init__(self, working_directory, selected_file=None, exclude=None):
self._exclude, self.exclude = None, exclude
self._workdir, self.workdir = None, working_directory
self._filename, self.filename = None, selected_file or self.files[0]
@property
def exclude(self):
return self._exclude
@exclude.setter
def exclude(self, exclude):
if exclude is None: return
if not isinstance(exclude, Iterable): raise TypeError('Exclude must be Iterable!')
self._exclude = exclude
@property
def workdir(self):
return self._workdir
@workdir.setter
def workdir(self, directory):
if not exists(directory) or not isdir(directory):
raise EnvironmentError('"{}" is not a directory!'.format(directory))
self._workdir = directory
@property
def filename(self):
return self._filename
@filename.setter
def filename(self, filename):
if not filename in self.files:
raise EnvironmentError('No such file in workdir!')
self._filename = filename
@property
def files(self):
return [self._relpath(file) for file in glob(join(self._workdir, '**/*'), recursive=True)
if isfile(file) and
not any(fnmatchcase(file, blacklisted) for blacklisted in self.exclude)]
@property
def file_contents(self):
with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile:
return ifile.read()
@file_contents.setter
def file_contents(self, value):
with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile:
ofile.write(value)
def _filepath(self, filename):
return join(self._workdir, filename)
def _relpath(self, filename):
return relpath(self._filepath(filename), start=self._workdir)
class SourceCodeEventHandler(TriggerlessEventHandler):
def __init__(self, key, directory, selected_file=None, exclude=None):
super().__init__(key)
self.filemanager = FileManager(directory, selected_file=selected_file, exclude=exclude)
self.commands = {'read': self.read,
'write': self.write,
'select': self.select,
'selectdir': self.select_dir,
'exclude': self.exclude}
self._monitor = None
self.reload_monitor()
@property
def monitor(self):
return self._monitor
def reload_monitor(self):
if self._monitor:
try: self._monitor.stop()
except KeyError: logging.debug('Working directory was removed ignoring...')
self._monitor = DirectoryMonitor(self.filemanager.workdir)
self._monitor.watch() # This runs on a separate thread
def read(self, data):
try: data['content'] = self.filemanager.file_contents
except PermissionError: data['content'] = 'You have no permission to open that file :('
except FileNotFoundError: data['content'] = 'This file was removed :('
except Exception: data['content'] = 'Failed to read file :('
return data
def write(self, data):
self.monitor.ignore = self.monitor.ignore + 1
try: self.filemanager.file_contents = data['content']
except Exception: LOG.exception('Error writing file!')
del data['content']
return data
def select(self, data):
try: self.filemanager.filename = data['filename']
except EnvironmentError: LOG.exception('Failed to select file "{}"'.format(data['filename']))
return data
def select_dir(self, data):
try:
self.filemanager.workdir = data['directory']
self.reload_monitor()
try:
self.filemanager.filename = self.filemanager.files[0]
self.read(data)
except IndexError:
data['content'] = 'No files in this directory :('
except EnvironmentError:
LOG.error('Failed to select directory "{}"'.format(data['directory']))
return data
def exclude(self, data):
try: self.filemanager.exclude = list(data['exclude'])
except TypeError: LOG.error('Exclude must be Iterable!')
return data
def attach_fileinfo(self, data):
data['filename'] = self.filemanager.filename
data['files'] = self.filemanager.files
data['directory'] = self.filemanager.workdir
def handle_event(self, key, message):
try:
data = message['data']
message['data'] = self.commands[data['command']](data)
self.attach_fileinfo(data)
return message
except KeyError:
LOG.error('IGNORING MESSAGE: Invalid message received: {}'.format(message))
def cleanup(self):
self.monitor.stop()