mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-06 02:41:20 +00:00
92 lines
3.2 KiB
Python
92 lines
3.2 KiB
Python
from os.path import isfile, join, relpath
|
|
from glob import glob
|
|
|
|
from tfw.components.mixins import SupervisorMixin
|
|
from tfw.event_handler_base import TriggerlessEventHandler
|
|
from tfw.components.directory_monitor import DirectoryMonitor
|
|
|
|
from tfw.config.logs import logging
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class FileManager:
|
|
def __init__(self, working_directory, selected_file=None):
|
|
self.exclude = ['__pycache__']
|
|
self._workdir = working_directory
|
|
self.filename = selected_file or self.files[0]
|
|
|
|
def select_file(self, filename):
|
|
if not filename in self.files:
|
|
raise EnvironmentError('No such file in workdir!')
|
|
self.filename = filename
|
|
|
|
@property
|
|
def files(self):
|
|
return [self._relpath(file) for file in glob(join(self._workdir, '**/*'), recursive=True)
|
|
if isfile(file) and
|
|
not any(word in file for word in self.exclude)]
|
|
|
|
@property
|
|
def file_contents(self):
|
|
with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile:
|
|
return ifile.read()
|
|
|
|
@file_contents.setter
|
|
def file_contents(self, value):
|
|
with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile:
|
|
ofile.write(value)
|
|
|
|
def _filepath(self, filename):
|
|
return join(self._workdir, filename)
|
|
|
|
def _relpath(self, filename):
|
|
return relpath(self._filepath(filename), start=self._workdir)
|
|
|
|
|
|
class SourceCodeEventHandler(TriggerlessEventHandler, SupervisorMixin):
|
|
def __init__(self, key, directory, process_name, selected_file=None):
|
|
super().__init__(key)
|
|
self.filemanager = FileManager(directory, selected_file=selected_file)
|
|
self.process_name = process_name
|
|
|
|
self.commands = {
|
|
'read': self.read,
|
|
'write': self.write,
|
|
'select': self.select
|
|
}
|
|
|
|
self.monitor = DirectoryMonitor(directory)
|
|
self.monitor.watch() # This runs on a separate thread
|
|
|
|
def read(self, data):
|
|
try: data['content'] = self.filemanager.file_contents
|
|
except PermissionError: data['content'] = 'You have no permission to open that file :('
|
|
except FileNotFoundError: data['content'] = 'This file was removed :('
|
|
except Exception: data['content'] = 'Failed to read file :('
|
|
return data
|
|
|
|
def write(self, data):
|
|
with self.monitor.pauser:
|
|
try: self.filemanager.file_contents = data['content']
|
|
except Exception: log.exception('Error writing file!')
|
|
self.restart_process()
|
|
return data
|
|
|
|
def select(self, data):
|
|
try: self.filemanager.select_file(data['filename'])
|
|
except EnvironmentError: log.exception('Failed to select file "{}"'.format(data['filename']))
|
|
return data
|
|
|
|
def attach_fileinfo(self, data):
|
|
data['filename'] = self.filemanager.filename
|
|
data['files'] = self.filemanager.files
|
|
|
|
def handle_event(self, key, data_json):
|
|
data = data_json['data']
|
|
data_json['data'] = self.commands[data['command']](data)
|
|
self.attach_fileinfo(data)
|
|
return data_json
|
|
|
|
def cleanup(self):
|
|
self.monitor.stop()
|