baseimage-tutorial-framework/lib/tfw/decorators/rate_limiter.py
2018-07-31 09:14:33 +02:00

91 lines
2.9 KiB
Python

# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from functools import wraps, partial
from time import time, sleep
from tfw.decorators.lazy_property import lazy_property
# TODO: add this to sphinx docs
class RateLimiter:
"""
Decorator class for rate limiting.
When applied to a function this decorator will apply rate limiting
if the function is invoked more frequently than rate_per_seconds.
By default rate limiting means sleeping until the next invocation time
as per __init__ parameter rate_per_seconds.
Note that this decorator BLOCKS THE THREAD it is being executed on,
so it is only acceptable for stuff running on a separate thread.
If this is no good for you please refer to AsyncRateLimiter in this module,
which is designed not to block and use the IOLoop it is being called from,
or redefine the action argument of __init__ (which defaults to time.sleep).
"""
def __init__(self, rate_per_second, action=sleep):
"""
:param rate_per_second: max frequency the decorated method should be
invoked with
:param action: what to do when rate limiting. defaults to time.sleep,
receives the number of seconds until the next invocation
should occour as an argument
"""
self.min_interval = 1 / float(rate_per_second)
self.action = action
self.fun = None
self.last_call = time()
def __call__(self, fun):
@wraps(fun)
def wrapper(*args, **kwargs):
self.fun = partial(fun, *args, **kwargs)
limit_seconds = self._limit_rate()
if limit_seconds:
self.action(limit_seconds)
return
self.fun()
return wrapper
def _limit_rate(self):
seconds_since_last_call = time() - self.last_call
seconds_to_next_call = self.min_interval - seconds_since_last_call
self.last_call = time()
if seconds_to_next_call > 0:
return seconds_to_next_call
return 0
# TODO document this
class AsyncRateLimiter(RateLimiter):
def __init__(self, rate_per_second, ioloop_factory):
self._ioloop_factory = ioloop_factory
self._ioloop = None
self._last_callback = None
self._make_action_thread_safe()
super().__init__(
rate_per_second=rate_per_second,
action=self.async_action
)
def _make_action_thread_safe(self):
self.async_action = partial(self.ioloop.add_callback, self.async_action)
@lazy_property
def ioloop(self):
return self._ioloop_factory()
def async_action(self, seconds_to_next_call):
if self._last_callback:
self.ioloop.remove_timeout(self._last_callback)
self._last_callback = self.ioloop.call_later(
seconds_to_next_call,
self.fun
)