mirror of
				https://github.com/avatao-content/baseimage-tutorial-framework
				synced 2025-10-31 21:02:55 +00:00 
			
		
		
		
	Move ZMQ connection logic to a separate class
This commit is contained in:
		| @@ -1,25 +1,13 @@ | |||||||
| import json | import json | ||||||
| import zmq |  | ||||||
| from zmq.eventloop import ioloop |  | ||||||
| from zmq.eventloop.zmqstream import ZMQStream |  | ||||||
|  |  | ||||||
| from config import PUBLISHER_PORT, RECEIVER_PORT |  | ||||||
|  |  | ||||||
| ioloop.install() |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class EventHandlerBase: | class EventHandlerBase: | ||||||
|     def __init__(self, anchor, zmq_context=None): |     def __init__(self, messaging, anchor): | ||||||
|  |         self.messaging = messaging | ||||||
|         self.anchor = anchor |         self.anchor = anchor | ||||||
|         self.zmq_context = zmq_context or zmq.Context.instance() |         self.messaging.subscribe(self.anchor) | ||||||
|         self.zmq_sub_socket = self.zmq_context.socket(zmq.SUB) |  | ||||||
|         self.subscriptions = {self.anchor} |         self.subscriptions = {self.anchor} | ||||||
|         self.zmq_sub_socket.setsockopt_string(zmq.SUBSCRIBE, self.anchor) |         self.messaging.register_callback(self.event_handler_callback) | ||||||
|         self.zmq_sub_socket.connect('tcp://localhost:{}'.format(PUBLISHER_PORT)) |  | ||||||
|         self.zmq_sub_stream = ZMQStream(self.zmq_sub_socket) |  | ||||||
|         self.zmq_push_socket = self.zmq_context.socket(zmq.PUSH) |  | ||||||
|         self.zmq_push_socket.connect('tcp://localhost:{}'.format(RECEIVER_PORT)) |  | ||||||
|         self.zmq_sub_stream.on_recv(self.event_handler_callback) |  | ||||||
|  |  | ||||||
|     def event_handler_callback(self, msg_parts): |     def event_handler_callback(self, msg_parts): | ||||||
|         anchor, message = msg_parts |         anchor, message = msg_parts | ||||||
| @@ -27,7 +15,7 @@ class EventHandlerBase: | |||||||
|         response = self.handle_event(anchor, data_json) if anchor != b'reset' else self.handle_reset(data_json) |         response = self.handle_event(anchor, data_json) if anchor != b'reset' else self.handle_reset(data_json) | ||||||
|         if response is None: return |         if response is None: return | ||||||
|         encoded_response = json.dumps(response).encode('utf-8') |         encoded_response = json.dumps(response).encode('utf-8') | ||||||
|         self.zmq_push_socket.send_multipart([anchor, encoded_response]) |         self.messaging.send(anchor, encoded_response) | ||||||
|  |  | ||||||
|     def handle_event(self, anchor, data_json): |     def handle_event(self, anchor, data_json): | ||||||
|         raise NotImplementedError |         raise NotImplementedError | ||||||
| @@ -42,23 +30,21 @@ class EventHandlerBase: | |||||||
|             'data': data |             'data': data | ||||||
|         } |         } | ||||||
|         encoded_message = json.dumps(message).encode('utf-8') |         encoded_message = json.dumps(message).encode('utf-8') | ||||||
|         self.zmq_push_socket.send_multipart([encoded_anchor, encoded_message]) |         self.messaging.send(encoded_anchor, encoded_message) | ||||||
|  |  | ||||||
|     def subscribe(self, anchor): |     def subscribe(self, anchor): | ||||||
|         if anchor not in self.subscriptions: |         if anchor not in self.subscriptions: | ||||||
|             self.subscriptions.add(anchor) |             self.subscriptions.add(anchor) | ||||||
|             self.zmq_sub_socket.setsockopt_string( |             self.messaging.subscribe(anchor) | ||||||
|                 zmq.SUBSCRIBE, anchor |  | ||||||
|             ) |  | ||||||
|  |  | ||||||
|     def unsubscribe(self, anchor): |     def unsubscribe(self, anchor): | ||||||
|         try: |         try: | ||||||
|             self.subscriptions.remove(anchor) |             self.subscriptions.remove(anchor) | ||||||
|             self.zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, anchor) |             self.messaging.unsubscribe(anchor) | ||||||
|         except KeyError: |         except KeyError: | ||||||
|             pass |             pass | ||||||
|  |  | ||||||
|     def unsubscribe_all(self): |     def unsubscribe_all(self): | ||||||
|         for sub in self.subscriptions: |         for sub in self.subscriptions: | ||||||
|             self.zmq_sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, sub) |             self.messaging.unsubscribe(anchor=sub) | ||||||
|         self.subscriptions.clear() |         self.subscriptions.clear() | ||||||
|   | |||||||
| @@ -4,6 +4,7 @@ from functools import partial | |||||||
|  |  | ||||||
| import source_code | import source_code | ||||||
| from event_handler_base import EventHandlerBase | from event_handler_base import EventHandlerBase | ||||||
|  | from messaging import Messaging | ||||||
| from source_code_event_handler import SourceCodeEventHandler | from source_code_event_handler import SourceCodeEventHandler | ||||||
| from terminado_event_handler import TerminadoEventHandler | from terminado_event_handler import TerminadoEventHandler | ||||||
| from tornado.ioloop import IOLoop | from tornado.ioloop import IOLoop | ||||||
| @@ -83,10 +84,11 @@ class LoginHandler(EventHandlerBase): | |||||||
|  |  | ||||||
|  |  | ||||||
| if __name__ == '__main__': | if __name__ == '__main__': | ||||||
|     anchor_a = ChangeCaseHandler('anchor_a') |     messaging = Messaging() | ||||||
|     anchor_b = Rot13Handler('anchor_b') |     anchor_a = ChangeCaseHandler(messaging, 'anchor_a') | ||||||
|     anchor_c = ReverseHandler('anchor_c') |     anchor_b = Rot13Handler(messaging, 'anchor_b') | ||||||
|     anchor_webide = SourceCodeEventHandler('anchor_webide', 'login_component.py', 'login') |     anchor_c = ReverseHandler(messaging, 'anchor_c') | ||||||
|     anchor_terminado = TerminadoEventHandler('anchor_terminado', 'terminado') |     anchor_webide = SourceCodeEventHandler(messaging, 'anchor_webide', 'login_component.py', 'login') | ||||||
|  |     anchor_terminado = TerminadoEventHandler(messaging, 'anchor_terminado', 'terminado') | ||||||
|     IOLoop.instance().start() |     IOLoop.instance().start() | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										25
									
								
								src/event_handlers/messaging.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								src/event_handlers/messaging.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,25 @@ | |||||||
|  | from functools import partial | ||||||
|  |  | ||||||
|  | import zmq | ||||||
|  | from zmq.eventloop import ioloop | ||||||
|  | from zmq.eventloop.zmqstream import ZMQStream | ||||||
|  |  | ||||||
|  | from config import PUBLISHER_PORT, RECEIVER_PORT | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class Messaging: | ||||||
|  |     def __init__(self): | ||||||
|  |         ioloop.install() | ||||||
|  |         self._zmq_context = zmq.Context.instance() | ||||||
|  |         self._zmq_sub_socket = self._zmq_context.socket(zmq.SUB) | ||||||
|  |         self._zmq_sub_socket.connect('tcp://localhost:{}'.format(PUBLISHER_PORT)) | ||||||
|  |         self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) | ||||||
|  |         self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH) | ||||||
|  |         self._zmq_push_socket.connect('tcp://localhost:{}'.format(RECEIVER_PORT)) | ||||||
|  |  | ||||||
|  |         self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE) | ||||||
|  |         self.unsubscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.UNSUBSCRIBE) | ||||||
|  |         self.register_callback = self._zmq_sub_stream.on_recv | ||||||
|  |      | ||||||
|  |     def send(self, anchor, response): | ||||||
|  |         self._zmq_push_socket.send_multipart([anchor, response]) | ||||||
| @@ -9,8 +9,8 @@ from event_handler_base import EventHandlerBase | |||||||
|  |  | ||||||
|  |  | ||||||
| class SourceCodeEventHandler(EventHandlerBase, SupervisorMixin): | class SourceCodeEventHandler(EventHandlerBase, SupervisorMixin): | ||||||
|     def __init__(self, anchor, filename, process_name=None, zmq_context=None): |     def __init__(self, messaging, anchor, filename, process_name=None): | ||||||
|         super().__init__(anchor, zmq_context) |         super().__init__(messaging, anchor) | ||||||
|         self.working_directory = LOGIN_APP_DIR |         self.working_directory = LOGIN_APP_DIR | ||||||
|         self.filename = filename |         self.filename = filename | ||||||
|         self.language = map_file_extension_to_language(filename) |         self.language = map_file_extension_to_language(filename) | ||||||
|   | |||||||
| @@ -4,8 +4,8 @@ from event_handler_base import EventHandlerBase | |||||||
|  |  | ||||||
|  |  | ||||||
| class StatefulEventHandler(EventHandlerBase): | class StatefulEventHandler(EventHandlerBase): | ||||||
|     def __init__(self, anchor, event_handler_function, zmq_context=None): |     def __init__(self, messaging, anchor, event_handler_function): | ||||||
|         super().__init__(anchor, zmq_context) |         super().__init__(messaging, anchor) | ||||||
|         self.event_handler_function = event_handler_function |         self.event_handler_function = event_handler_function | ||||||
|         self.generator = None |         self.generator = None | ||||||
|         self.subscribe('reset') |         self.subscribe('reset') | ||||||
| @@ -29,9 +29,9 @@ class StatefulEventHandler(EventHandlerBase): | |||||||
|                 'data': response_data, |                 'data': response_data, | ||||||
|             }) |             }) | ||||||
|             response = [r.encode('utf-8') for r in (response_anchor, response_data)] |             response = [r.encode('utf-8') for r in (response_anchor, response_data)] | ||||||
|             self.zmq_push_socket.send_multipart(response) |             self.messaging.send(*response) | ||||||
|  |  | ||||||
|         self.zmq_sub_stream.on_recv(event_handler_callback) |         self.messaging.register_callback(event_handler_callback) | ||||||
|  |  | ||||||
|     def unsubscribe_all(self): |     def unsubscribe_all(self): | ||||||
|         super().unsubscribe_all() |         super().unsubscribe_all() | ||||||
|   | |||||||
| @@ -9,8 +9,8 @@ from config import TERMINADO_DIR | |||||||
|  |  | ||||||
|  |  | ||||||
| class TerminadoEventHandler(EventHandlerBase, SupervisorMixin): | class TerminadoEventHandler(EventHandlerBase, SupervisorMixin): | ||||||
|     def __init__(self, anchor, process_name, zmq_context=None): |     def __init__(self, messaging, anchor, process_name): | ||||||
|         super().__init__(anchor, zmq_context) |         super().__init__(messaging, anchor) | ||||||
|         self.working_directory = TERMINADO_DIR |         self.working_directory = TERMINADO_DIR | ||||||
|         self.process_name = process_name |         self.process_name = process_name | ||||||
|         self.setup_terminado_server() |         self.setup_terminado_server() | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user