mirror of
				https://github.com/avatao-content/baseimage-tutorial-framework
				synced 2025-10-25 04:12:54 +00:00 
			
		
		
		
	Remove dead code from project
This commit is contained in:
		| @@ -1,26 +0,0 @@ | ||||
| import ast | ||||
| import inspect | ||||
| import re | ||||
| from io import StringIO | ||||
|  | ||||
|  | ||||
| def find_local_variable_value(func, local_variable_name): | ||||
|     func_src = inspect.getsource(func) | ||||
|     func_ast = ast.parse(func_src) | ||||
|     for node in ast.walk(func_ast): | ||||
|         if isinstance(node, ast.Assign): | ||||
|             for target in node.targets: | ||||
|                 if isinstance(target, ast.Name): | ||||
|                     if target.id == local_variable_name: | ||||
|                         return node.value.s | ||||
|  | ||||
|  | ||||
| def get_source_code(func, strip_comments=False): | ||||
|     source = inspect.getsource(func) | ||||
|     if strip_comments: | ||||
|         # TODO: less fragile way to do this (tokenizer sadly inserts whitespaces all around) | ||||
|         comment_pattern = re.compile('^(\s.*)#.*$') | ||||
|         source = ''.join( | ||||
|             line for line in StringIO(source).readlines() if re.match(comment_pattern, line) is None | ||||
|         ) | ||||
|     return source | ||||
| @@ -1,16 +0,0 @@ | ||||
| from fsm_base import FSMBase | ||||
|  | ||||
|  | ||||
| class Buttons(FSMBase): | ||||
|     states = ['A', 'B', 'C'] | ||||
|     transitions = [ | ||||
|         {'trigger': 'anchor_a', 'source': 'A', 'dest': 'B'}, | ||||
|         {'trigger': 'anchor_b', 'source': 'B', 'dest': 'C'}, | ||||
|         {'trigger': 'anchor_c', 'source': 'C', 'dest': 'A'}, | ||||
|     ] | ||||
|  | ||||
|     def __init__(self): | ||||
|         super().__init__('A') | ||||
|  | ||||
|  | ||||
| fsm = Buttons() | ||||
| @@ -1,92 +1,9 @@ | ||||
| import codecs | ||||
| import sqlite3 | ||||
| from functools import partial | ||||
|  | ||||
| import source_code | ||||
| from event_handler_base import EventHandlerBase | ||||
| from source_code_event_handler import SourceCodeEventHandler | ||||
| from terminado_event_handler import TerminadoEventHandler | ||||
| from tornado.ioloop import IOLoop | ||||
|  | ||||
| from login_component import authorize_login | ||||
| from util import create_source_code_response_data | ||||
|  | ||||
| login_component_py_response = partial( | ||||
|     create_source_code_response_data, filename='login_component.py', language='python' | ||||
| ) | ||||
|  | ||||
|  | ||||
| class EchoHandler(EventHandlerBase): | ||||
|     def handle_event(self, anchor, data_json): | ||||
|         return data_json | ||||
|  | ||||
|  | ||||
| class Rot13Handler(EventHandlerBase): | ||||
|     def handle_event(self, anchor, data_json): | ||||
|         data_json['data'] = codecs.encode(data_json['data'], 'rot13') | ||||
|         return data_json | ||||
|  | ||||
|  | ||||
| class ChangeCaseHandler(EventHandlerBase): | ||||
|     def handle_event(self, anchor, data_json): | ||||
|         data_json['data'] = data_json['data'].upper() if data_json['data'].islower() else data_json['data'].lower() | ||||
|         return data_json | ||||
|  | ||||
|  | ||||
| class ReverseHandler(EventHandlerBase): | ||||
|     def handle_event(self, anchor, data_json): | ||||
|         data_json['data'] = data_json['data'][::-1] | ||||
|         return data_json | ||||
|  | ||||
|  | ||||
| class LoginHandler(EventHandlerBase): | ||||
|     def handle_event(self, anchor, data_json): | ||||
|         email, password = data_json['data']['email'], data_json['data']['password'] | ||||
|         try: | ||||
|             sql_statement = source_code.find_local_variable_value(authorize_login, 'sql_statement') | ||||
|             yield ( | ||||
|                 'anchor_logger', | ||||
|                 'The SQL statement executed by the server will look like this:\n `{}`'.format(sql_statement) | ||||
|             ) | ||||
|  | ||||
|             yield ('anchor_webide', | ||||
|                    login_component_py_response(content=source_code.get_source_code(authorize_login, strip_comments=False))) | ||||
|  | ||||
|             sql_statement_with_values = sql_statement.format(email, password) | ||||
|             yield ( | ||||
|                 'anchor_logger', | ||||
|                 'After the submitted parameters are substituted it looks like this:\n `{}`'.format( | ||||
|                     sql_statement_with_values | ||||
|                 ) | ||||
|             ) | ||||
|  | ||||
|             logged_in_email, is_admin = authorize_login(email, password) | ||||
|  | ||||
|             yield ( | ||||
|                 'anchor_logger', | ||||
|                 'After the query is executed, it returns _{}_ as email address, and _{}_ for is_admin'.format( | ||||
|                     logged_in_email, is_admin | ||||
|                 ) | ||||
|             ) | ||||
|  | ||||
|             if logged_in_email is not None: | ||||
|                 response = 'Logged in as _{}_. You __{}have__ admin privileges.'.format( | ||||
|                     logged_in_email, | ||||
|                     '' if is_admin else 'don\'t ' | ||||
|                 ) | ||||
|             else: | ||||
|                 response = 'Bad username/password!' | ||||
|         except sqlite3.Warning: | ||||
|             response = 'Invalid request!' | ||||
|  | ||||
|         yield ('anchor_login', '# Login page\n' + response) | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     anchor_a = ChangeCaseHandler('anchor_a') | ||||
|     anchor_b = Rot13Handler('anchor_b') | ||||
|     anchor_c = ReverseHandler('anchor_c') | ||||
|     anchor_webide = SourceCodeEventHandler('anchor_webide', 'login_component.py', 'login') | ||||
|     anchor_terminado = TerminadoEventHandler('anchor_terminado', 'terminado') | ||||
|     IOLoop.instance().start() | ||||
|  | ||||
|   | ||||
| @@ -1,43 +0,0 @@ | ||||
| import json | ||||
|  | ||||
| from event_handler_base import EventHandlerBase | ||||
|  | ||||
|  | ||||
| class StatefulEventHandler(EventHandlerBase): | ||||
|     def __init__(self, anchor, event_handler_function): | ||||
|         super().__init__(anchor) | ||||
|         self.event_handler_function = event_handler_function | ||||
|         self.generator = None | ||||
|         self.subscribe('reset') | ||||
|  | ||||
|         def event_handler_callback(msg_parts): | ||||
|             anchor, message = msg_parts | ||||
|             if anchor == b'reset': | ||||
|                 self.reset() | ||||
|                 return | ||||
|             data_json = json.loads(message) | ||||
|             if self.generator is None: | ||||
|                 self.generator = self.event_handler_function(data_json) | ||||
|             try: | ||||
|                 response_anchor, response_data = next(self.generator) | ||||
|             except StopIteration: | ||||
|                 self.reset() | ||||
|                 return | ||||
|             self.subscribe(response_anchor) | ||||
|             response_data = json.dumps({ | ||||
|                 'anchor': response_anchor, | ||||
|                 'data': response_data, | ||||
|             }) | ||||
|             response = [r.encode('utf-8') for r in (response_anchor, response_data)] | ||||
|             self.server_connector.send(*response) | ||||
|  | ||||
|         self.server_connector.register_callback(event_handler_callback) | ||||
|  | ||||
|     def unsubscribe_all(self): | ||||
|         super().unsubscribe_all() | ||||
|         self.subscribe('reset') | ||||
|  | ||||
|     def reset(self): | ||||
|         self.generator = None | ||||
|         self.unsubscribe_all() | ||||
|         self.subscribe(self.anchor) | ||||
										
											Binary file not shown.
										
									
								
							
		Reference in New Issue
	
	Block a user