from os.path import splitext, isfile, join, relpath from glob import glob from tfw.components.mixins import SupervisorMixin from tfw.event_handler_base import EventHandlerBase from tfw.components.directory_monitor import DirectoryMonitor from tfw.config.logs import logging log = logging.getLogger(__name__) class FileManager: def __init__(self, working_directory, selected_file=None): self.exclude = ['__pycache__'] self._workdir = working_directory self.filename = selected_file or self._relpath(self.files[0]) self.language = map_file_extension_to_language(self.filename) def select_file(self, filename): if not filename in self.files: raise EnvironmentError('No such file in workdir!') self.filename = filename self.language = map_file_extension_to_language(self.filename) @property def files(self): return [self._relpath(file) for file in glob(join(self._workdir, '**/*'), recursive=True) if isfile(file) and not any(word in file for word in self.exclude)] @property def file_contents(self): with open(self._filepath(self.filename), 'r', errors='surrogateescape') as ifile: return ifile.read() @file_contents.setter def file_contents(self, value): with open(self._filepath(self.filename), 'w', errors='surrogateescape') as ofile: ofile.write(value) def _filepath(self, filename): return join(self._workdir, filename) def _relpath(self, filename): return relpath(self._filepath(filename), start=self._workdir) class SourceCodeEventHandler(EventHandlerBase, SupervisorMixin): def __init__(self, anchor, directory, process_name, selected_file=None): super().__init__(anchor) self.filemanager = FileManager(directory, selected_file=selected_file) self.process_name = process_name self.commands = { 'read': self.read, 'write': self.write, 'select': self.select } self.monitor = DirectoryMonitor(directory) self.monitor.watch() # This runs on a separate thread def read(self, data): try: data['content'] = self.filemanager.file_contents except PermissionError: data['content'] = 'You have no permission to open that file :(' except FileNotFoundError: data['content'] = 'This file was removed :(' except Exception: data['content'] = 'Failed to read file :(' return data def write(self, data): try: self.filemanager.file_contents = data['content'] except Exception: log.exception('Error writing file!') self.restart_process() return data def select(self, data): try: self.filemanager.select_file(data['filename']) except EnvironmentError: log.exception('Failed to select file "{}"'.format(data['filename'])) return data def attach_fileinfo(self, data): data['filename'] = self.filemanager.filename data['language'] = self.filemanager.language data['files'] = self.filemanager.files def handle_event(self, anchor, data_json): data = data_json['data'] data_json['data'] = self.commands[data['command']](data) self.attach_fileinfo(data) return data_json def cleanup(self): self.monitor.stop() def map_file_extension_to_language(filename): language_map = { '.py': 'python', '.js': 'javascript', '.html': 'html', '.css': 'css', '.java': 'java', '.cpp': 'c_cpp', '.hpp': 'c_cpp', '.c': 'c_cpp', '.h': 'c_cpp', '.cs': 'csharp' } _, extension = splitext(filename) return language_map.get(extension, 'text')