# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. from functools import wraps, partial from time import time, sleep from tfw.decorators.lazy_property import lazy_property # TODO: add this to sphinx docs class RateLimiter: """ Decorator class for rate limiting. When applied to a function this decorator will apply rate limiting if the function is invoked more frequently than rate_per_seconds. By default rate limiting means sleeping until the next invocation time as per __init__ parameter rate_per_seconds. Note that this decorator BLOCKS THE THREAD it is being executed on, so it is only acceptable for stuff running on a separate thread. If this is no good for you please refer to AsyncRateLimiter in this module, which is designed not to block and use the IOLoop it is being called from, or redefine the action argument of __init__ (which defaults to time.sleep). """ def __init__(self, rate_per_second, action=sleep): """ :param rate_per_second: max frequency the decorated method should be invoked with :param action: what to do when rate limiting. defaults to time.sleep, receives the number of seconds until the next invocation should occour as an argument """ self.min_interval = 1 / float(rate_per_second) self.action = action self.fun = None self.last_call = time() def __call__(self, fun): @wraps(fun) def wrapper(*args, **kwargs): self.fun = partial(fun, *args, **kwargs) limit_seconds = self._limit_rate() if limit_seconds: self.action(limit_seconds) return self.fun() return wrapper def _limit_rate(self): seconds_since_last_call = time() - self.last_call seconds_to_next_call = self.min_interval - seconds_since_last_call self.last_call = time() if seconds_to_next_call > 0: return seconds_to_next_call return 0 class AsyncRateLimiter(RateLimiter): def __init__(self, rate_per_second, ioloop_factory): self._ioloop_factory = ioloop_factory self._ioloop = None self._last_callback = None super().__init__( rate_per_second=rate_per_second, action=self.async_action ) @lazy_property def ioloop(self): return self._ioloop_factory() def async_action(self, seconds_to_next_call): if self._last_callback: self.ioloop.remove_timeout(self._last_callback) self._last_callback = self.ioloop.call_later( seconds_to_next_call, self.fun )