mirror of
				https://github.com/avatao-content/baseimage-tutorial-framework
				synced 2025-10-26 07:02:55 +00:00 
			
		
		
		
	Use new f-strings where possible
This commit is contained in:
		| @@ -40,9 +40,9 @@ class FileManager:  # pylint: disable=too-many-instance-attributes | |||||||
|     @workdir.setter |     @workdir.setter | ||||||
|     def workdir(self, directory): |     def workdir(self, directory): | ||||||
|         if not exists(directory) or not isdir(directory): |         if not exists(directory) or not isdir(directory): | ||||||
|             raise EnvironmentError('"{}" is not a directory!'.format(directory)) |             raise EnvironmentError(f'"{directory}" is not a directory!') | ||||||
|         if not self._is_in_whitelisted_dir(directory): |         if not self._is_in_whitelisted_dir(directory): | ||||||
|             raise EnvironmentError('Directory "{}" is not in whitelist!'.format(directory)) |             raise EnvironmentError(f'Directory "{directory}" is not in whitelist!') | ||||||
|         self._workdir = directory |         self._workdir = directory | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|   | |||||||
| @@ -13,6 +13,6 @@ class LinearFSM(FSMBase): | |||||||
|     """ |     """ | ||||||
|     def __init__(self, number_of_steps): |     def __init__(self, number_of_steps): | ||||||
|         self.states = list(map(str, range(number_of_steps))) |         self.states = list(map(str, range(number_of_steps))) | ||||||
|         self.transitions = [{'trigger': 'step_{}'.format(int(index)+1), 'source': index, 'dest': str(int(index)+1)} |         self.transitions = [{'trigger': f'step_{int(index)+1}', 'source': index, 'dest': str(int(index)+1)} | ||||||
|                             for index in self.states[:-1]] |                             for index in self.states[:-1]] | ||||||
|         super(LinearFSM, self).__init__() |         super(LinearFSM, self).__init__() | ||||||
|   | |||||||
| @@ -12,7 +12,7 @@ class ControllerConnector(ZMQConnectorBase): | |||||||
|     def __init__(self, zmq_context=None): |     def __init__(self, zmq_context=None): | ||||||
|         super(ControllerConnector, self).__init__(zmq_context) |         super(ControllerConnector, self).__init__(zmq_context) | ||||||
|         self._zmq_rep_socket = self._zmq_context.socket(zmq.REP) |         self._zmq_rep_socket = self._zmq_context.socket(zmq.REP) | ||||||
|         self._zmq_rep_socket.connect('tcp://localhost:{}'.format(TFWENV.CONTROLLER_PORT)) |         self._zmq_rep_socket.connect(f'tcp://localhost:{TFWENV.CONTROLLER_PORT}') | ||||||
|         self._zmq_rep_stream = ZMQStream(self._zmq_rep_socket) |         self._zmq_rep_stream = ZMQStream(self._zmq_rep_socket) | ||||||
|  |  | ||||||
|         self.register_callback = self._zmq_rep_stream.on_recv_stream |         self.register_callback = self._zmq_rep_stream.on_recv_stream | ||||||
|   | |||||||
| @@ -15,7 +15,7 @@ class ServerDownlinkConnector(ZMQConnectorBase): | |||||||
|     def __init__(self, zmq_context=None): |     def __init__(self, zmq_context=None): | ||||||
|         super(ServerDownlinkConnector, self).__init__(zmq_context) |         super(ServerDownlinkConnector, self).__init__(zmq_context) | ||||||
|         self._zmq_sub_socket = self._zmq_context.socket(zmq.SUB) |         self._zmq_sub_socket = self._zmq_context.socket(zmq.SUB) | ||||||
|         self._zmq_sub_socket.connect('tcp://localhost:{}'.format(TFWENV.PUBLISHER_PORT)) |         self._zmq_sub_socket.connect(f'tcp://localhost:{TFWENV.PUBLISHER_PORT}') | ||||||
|         self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) |         self._zmq_sub_stream = ZMQStream(self._zmq_sub_socket) | ||||||
|  |  | ||||||
|         self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE) |         self.subscribe = partial(self._zmq_sub_socket.setsockopt_string, zmq.SUBSCRIBE) | ||||||
| @@ -30,7 +30,7 @@ class ServerUplinkConnector(ZMQConnectorBase): | |||||||
|     def __init__(self, zmq_context=None): |     def __init__(self, zmq_context=None): | ||||||
|         super(ServerUplinkConnector, self).__init__(zmq_context) |         super(ServerUplinkConnector, self).__init__(zmq_context) | ||||||
|         self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH) |         self._zmq_push_socket = self._zmq_context.socket(zmq.PUSH) | ||||||
|         self._zmq_push_socket.connect('tcp://localhost:{}'.format(TFWENV.RECEIVER_PORT)) |         self._zmq_push_socket.connect(f'tcp://localhost:{TFWENV.RECEIVER_PORT}') | ||||||
|  |  | ||||||
|     def send_to_eventhandler(self, message): |     def send_to_eventhandler(self, message): | ||||||
|         """ |         """ | ||||||
|   | |||||||
| @@ -16,7 +16,7 @@ class EventHandlerDownlinkConnector(ZMQConnectorBase): | |||||||
|         super(EventHandlerDownlinkConnector, self).__init__(zmq_context) |         super(EventHandlerDownlinkConnector, self).__init__(zmq_context) | ||||||
|         self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL) |         self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL) | ||||||
|         self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket) |         self._zmq_pull_stream = ZMQStream(self._zmq_pull_socket) | ||||||
|         address = 'tcp://*:{}'.format(TFWENV.RECEIVER_PORT) |         address = f'tcp://*:{TFWENV.RECEIVER_PORT}' | ||||||
|         self._zmq_pull_socket.bind(address) |         self._zmq_pull_socket.bind(address) | ||||||
|         LOG.debug('Pull socket bound to %s', address) |         LOG.debug('Pull socket bound to %s', address) | ||||||
|  |  | ||||||
| @@ -25,7 +25,7 @@ class EventHandlerUplinkConnector(ZMQConnectorBase): | |||||||
|     def __init__(self, zmq_context=None): |     def __init__(self, zmq_context=None): | ||||||
|         super(EventHandlerUplinkConnector, self).__init__(zmq_context) |         super(EventHandlerUplinkConnector, self).__init__(zmq_context) | ||||||
|         self._zmq_pub_socket = self._zmq_context.socket(zmq.PUB) |         self._zmq_pub_socket = self._zmq_context.socket(zmq.PUB) | ||||||
|         address = 'tcp://*:{}'.format(TFWENV.PUBLISHER_PORT) |         address = f'tcp://*:{TFWENV.PUBLISHER_PORT}' | ||||||
|         self._zmq_pub_socket.bind(address) |         self._zmq_pub_socket.bind(address) | ||||||
|         LOG.debug('Pub socket bound to %s', address) |         LOG.debug('Pub socket bound to %s', address) | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user