baseimage-tutorial-framework/lib/tfw/util.py
2018-02-13 16:29:45 +01:00

55 lines
1.4 KiB
Python

import xmlrpc.client, zmq
from contextlib import suppress
from xmlrpc.client import Fault as SupervisorFault
from time import time, sleep
from functools import wraps
from tfw.config import tfwenv
def create_source_code_response_data(filename, content, language):
return {
'filename': filename,
'content': content,
'language': language
}
class SupervisorMixin:
supervisor = xmlrpc.client.ServerProxy(tfwenv.SUPERVISOR_HTTP_URI).supervisor
def stop_process(self):
with suppress(SupervisorFault):
self.supervisor.stopProcess(self.process_name)
def start_process(self):
self.supervisor.startProcess(self.process_name)
def restart_process(self):
self.stop_process()
self.start_process()
class ZMQConnectorBase:
def __init__(self, zmq_context=None):
self._zmq_context = zmq_context or zmq.Context.instance()
class RateLimiter:
def __init__(self, rate_per_second):
self.min_interval = 1 / float(rate_per_second)
self.last_call = time()
def __call__(self, fun):
@wraps(fun)
def wrapper(*args, **kwargs):
self._limit_rate()
fun(*args, **kwargs)
return wrapper
def _limit_rate(self):
since_last_call = time() - self.last_call
to_next_call = self.min_interval - since_last_call
self.last_call = time()
if to_next_call > 0:
sleep(to_next_call)