mirror of
				https://github.com/avatao-content/baseimage-tutorial-framework
				synced 2025-11-04 08:32:55 +00:00 
			
		
		
		
	Refactor handling of files in SourceCodeEventHandler
This commit is contained in:
		@@ -1,58 +1,81 @@
 | 
				
			|||||||
from shutil import copy, rmtree, copytree
 | 
					from shutil import copy, rmtree, copytree
 | 
				
			||||||
from os.path import splitext
 | 
					from os.path import splitext, isfile, join
 | 
				
			||||||
 | 
					from glob import glob
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from tfw.util import SupervisorMixin
 | 
					from tfw.util import SupervisorMixin
 | 
				
			||||||
from tfw.config import LOGIN_APP_DIR
 | 
					from tfw.config import LOGIN_APP_DIR
 | 
				
			||||||
from tfw.event_handler_base import EventHandlerBase
 | 
					from tfw.event_handler_base import EventHandlerBase
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from tfw.config.logs import logging
 | 
				
			||||||
 | 
					log = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class FileManager:
 | 
				
			||||||
 | 
					    def __init__(self, source_directory, working_directory, selected_file=None):
 | 
				
			||||||
 | 
					        self._sourcedir = source_directory
 | 
				
			||||||
 | 
					        self._workdir = working_directory
 | 
				
			||||||
 | 
					        self._reload_files()
 | 
				
			||||||
 | 
					        self.filename = selected_file or self.files[0]
 | 
				
			||||||
 | 
					        self.language = map_file_extension_to_language(self.filename)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def select_file(self, filename):
 | 
				
			||||||
 | 
					        if not filename in self.files:
 | 
				
			||||||
 | 
					            raise EnvironmentError('No such file in workdir!')
 | 
				
			||||||
 | 
					        self.filename = filename
 | 
				
			||||||
 | 
					        self.language = map_file_extension_to_language(self.filename)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @property
 | 
				
			||||||
 | 
					    def files(self):
 | 
				
			||||||
 | 
					        return [file for file in glob(self._workdir) if isfile(file)]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @property
 | 
				
			||||||
 | 
					    def file_contents(self):
 | 
				
			||||||
 | 
					        with open(self._filepath(self.filename), 'r') as ifile:
 | 
				
			||||||
 | 
					            return ifile.read()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @file_contents.setter
 | 
				
			||||||
 | 
					    def file_contents(self, value):
 | 
				
			||||||
 | 
					        with open(self._filepath(self.filename), 'w') as ofile:
 | 
				
			||||||
 | 
					            ofile.write(value)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _filepath(self, filename):
 | 
				
			||||||
 | 
					        return join(self._workdir, filename)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _reload_files(self):
 | 
				
			||||||
 | 
					        rmtree(self._workdir, ignore_errors=True)
 | 
				
			||||||
 | 
					        copytree(self._sourcedir, self._workdir)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class SourceCodeEventHandler(EventHandlerBase, SupervisorMixin):
 | 
					class SourceCodeEventHandler(EventHandlerBase, SupervisorMixin):
 | 
				
			||||||
    def __init__(self, anchor, filename, process_name=None):
 | 
					    def __init__(self, anchor, filename, process_name=None):
 | 
				
			||||||
        super().__init__(anchor)
 | 
					        super().__init__(anchor)
 | 
				
			||||||
        self.working_directory = LOGIN_APP_DIR
 | 
					        self.filemanager = FileManager('login_projectdir', LOGIN_APP_DIR, selected_file=filename)
 | 
				
			||||||
        self.filename = filename
 | 
					 | 
				
			||||||
        self.language = map_file_extension_to_language(filename)
 | 
					 | 
				
			||||||
        self.process_name = process_name or splitext(filename)[0]
 | 
					        self.process_name = process_name or splitext(filename)[0]
 | 
				
			||||||
        self.commands = {
 | 
					        self.commands = {
 | 
				
			||||||
            'read': self.read_file,
 | 
					            'read': self.read,
 | 
				
			||||||
            'write': self.write_file
 | 
					            'write': self.write
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.file = self.create_initial_state()
 | 
					        # Supervisor needs these to run the login program
 | 
				
			||||||
 | 
					        copy('source_code_server/server.py', LOGIN_APP_DIR)
 | 
				
			||||||
 | 
					        copy('source_code_server/users.db', LOGIN_APP_DIR)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def read(self, data_json):
 | 
				
			||||||
 | 
					        data_json['data'] = {
 | 
				
			||||||
 | 
					            'filename': self.filemanager.filename,
 | 
				
			||||||
 | 
					            'content': self.filemanager.file_contents,
 | 
				
			||||||
 | 
					            'language': self.filemanager.language
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def write(self, data_json):
 | 
				
			||||||
 | 
					        self.filemanager.file_contents = data_json['data']['content']
 | 
				
			||||||
 | 
					        self.restart_process()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def handle_event(self, anchor, data_json):
 | 
					    def handle_event(self, anchor, data_json):
 | 
				
			||||||
        data = data_json['data']
 | 
					        data = data_json['data']
 | 
				
			||||||
        self.commands[data['command']](data_json)
 | 
					        self.commands[data['command']](data_json)
 | 
				
			||||||
        return data_json
 | 
					        return data_json
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def handle_reset(self, data_json):
 | 
					 | 
				
			||||||
        self.create_initial_state()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def read_file(self, data_json):
 | 
					 | 
				
			||||||
        with open(self.file, 'r') as ifile:
 | 
					 | 
				
			||||||
            content = ifile.read()
 | 
					 | 
				
			||||||
        data_json['data'] = {
 | 
					 | 
				
			||||||
            'filename': self.filename,
 | 
					 | 
				
			||||||
            'content': content,
 | 
					 | 
				
			||||||
            'language': self.language
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def write_file(self, data_json):
 | 
					 | 
				
			||||||
        data = data_json['data']
 | 
					 | 
				
			||||||
        with open(self.file, 'w') as ofile:
 | 
					 | 
				
			||||||
            ofile.write(data['content'])
 | 
					 | 
				
			||||||
        self.restart_process()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def create_initial_state(self):
 | 
					 | 
				
			||||||
        self.stop_process()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        rmtree(self.working_directory, ignore_errors=True)
 | 
					 | 
				
			||||||
        copytree('source_code_server/', self.working_directory)
 | 
					 | 
				
			||||||
        file = copy(self.filename, self.working_directory)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        self.start_process()
 | 
					 | 
				
			||||||
        return file
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
def map_file_extension_to_language(filename):
 | 
					def map_file_extension_to_language(filename):
 | 
				
			||||||
    language_map = {
 | 
					    language_map = {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user