Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/kombu/utils/functional.py : 1%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Functional Utilities."""
2from __future__ import absolute_import, unicode_literals
4import random
5import sys
6import threading
7import inspect
9from collections import OrderedDict
11try:
12 from collections.abc import Iterable, Mapping
13except ImportError:
14 from collections import Iterable, Mapping
16from itertools import count, repeat
17from time import sleep, time
19from vine.utils import wraps
21from kombu.five import (
22 UserDict, items, keys, python_2_unicode_compatible, string_t, PY3,
23)
25from .encoding import safe_repr as _safe_repr
27__all__ = (
28 'LRUCache', 'memoize', 'lazy', 'maybe_evaluate',
29 'is_list', 'maybe_list', 'dictfilter',
30)
32KEYWORD_MARK = object()
35@python_2_unicode_compatible
36class ChannelPromise(object):
38 def __init__(self, contract):
39 self.__contract__ = contract
41 def __call__(self):
42 try:
43 return self.__value__
44 except AttributeError:
45 value = self.__value__ = self.__contract__()
46 return value
48 def __repr__(self):
49 try:
50 return repr(self.__value__)
51 except AttributeError:
52 return '<promise: 0x{0:x}>'.format(id(self.__contract__))
55class LRUCache(UserDict):
56 """LRU Cache implementation using a doubly linked list to track access.
58 Arguments:
59 limit (int): The maximum number of keys to keep in the cache.
60 When a new key is inserted and the limit has been exceeded,
61 the *Least Recently Used* key will be discarded from the
62 cache.
63 """
65 def __init__(self, limit=None):
66 self.limit = limit
67 self.mutex = threading.RLock()
68 self.data = OrderedDict()
70 def __getitem__(self, key):
71 with self.mutex:
72 value = self[key] = self.data.pop(key)
73 return value
75 def update(self, *args, **kwargs):
76 with self.mutex:
77 data, limit = self.data, self.limit
78 data.update(*args, **kwargs)
79 if limit and len(data) > limit:
80 # pop additional items in case limit exceeded
81 for _ in range(len(data) - limit):
82 data.popitem(last=False)
84 def popitem(self, last=True):
85 with self.mutex:
86 return self.data.popitem(last)
88 def __setitem__(self, key, value):
89 # remove least recently used key.
90 with self.mutex:
91 if self.limit and len(self.data) >= self.limit:
92 self.data.pop(next(iter(self.data)))
93 self.data[key] = value
95 def __iter__(self):
96 return iter(self.data)
98 def _iterate_items(self):
99 with self.mutex:
100 for k in self:
101 try:
102 yield (k, self.data[k])
103 except KeyError: # pragma: no cover
104 pass
105 iteritems = _iterate_items
107 def _iterate_values(self):
108 with self.mutex:
109 for k in self:
110 try:
111 yield self.data[k]
112 except KeyError: # pragma: no cover
113 pass
115 itervalues = _iterate_values
117 def _iterate_keys(self):
118 # userdict.keys in py3k calls __getitem__
119 with self.mutex:
120 return keys(self.data)
121 iterkeys = _iterate_keys
123 def incr(self, key, delta=1):
124 with self.mutex:
125 # this acts as memcached does- store as a string, but return a
126 # integer as long as it exists and we can cast it
127 newval = int(self.data.pop(key)) + delta
128 self[key] = str(newval)
129 return newval
131 def __getstate__(self):
132 d = dict(vars(self))
133 d.pop('mutex')
134 return d
136 def __setstate__(self, state):
137 self.__dict__ = state
138 self.mutex = threading.RLock()
140 if sys.version_info[0] == 3: # pragma: no cover
141 keys = _iterate_keys
142 values = _iterate_values
143 items = _iterate_items
144 else: # noqa
146 def keys(self):
147 return list(self._iterate_keys())
149 def values(self):
150 return list(self._iterate_values())
152 def items(self):
153 return list(self._iterate_items())
156def memoize(maxsize=None, keyfun=None, Cache=LRUCache):
157 """Decorator to cache function return value."""
158 def _memoize(fun):
159 mutex = threading.Lock()
160 cache = Cache(limit=maxsize)
162 @wraps(fun)
163 def _M(*args, **kwargs):
164 if keyfun:
165 key = keyfun(args, kwargs)
166 else:
167 key = args + (KEYWORD_MARK,) + tuple(sorted(kwargs.items()))
168 try:
169 with mutex:
170 value = cache[key]
171 except KeyError:
172 value = fun(*args, **kwargs)
173 _M.misses += 1
174 with mutex:
175 cache[key] = value
176 else:
177 _M.hits += 1
178 return value
180 def clear():
181 """Clear the cache and reset cache statistics."""
182 cache.clear()
183 _M.hits = _M.misses = 0
185 _M.hits = _M.misses = 0
186 _M.clear = clear
187 _M.original_func = fun
188 return _M
190 return _memoize
193@python_2_unicode_compatible
194class lazy(object):
195 """Holds lazy evaluation.
197 Evaluated when called or if the :meth:`evaluate` method is called.
198 The function is re-evaluated on every call.
200 Overloaded operations that will evaluate the promise:
201 :meth:`__str__`, :meth:`__repr__`, :meth:`__cmp__`.
202 """
204 def __init__(self, fun, *args, **kwargs):
205 self._fun = fun
206 self._args = args
207 self._kwargs = kwargs
209 def __call__(self):
210 return self.evaluate()
212 def evaluate(self):
213 return self._fun(*self._args, **self._kwargs)
215 def __str__(self):
216 return str(self())
218 def __repr__(self):
219 return repr(self())
221 def __eq__(self, rhs):
222 return self() == rhs
224 def __ne__(self, rhs):
225 return self() != rhs
227 def __deepcopy__(self, memo):
228 memo[id(self)] = self
229 return self
231 def __reduce__(self):
232 return (self.__class__, (self._fun,), {'_args': self._args,
233 '_kwargs': self._kwargs})
235 if sys.version_info[0] < 3:
237 def __cmp__(self, rhs):
238 if isinstance(rhs, self.__class__):
239 return -cmp(rhs, self())
240 return cmp(self(), rhs)
243def maybe_evaluate(value):
244 """Evaluate value only if value is a :class:`lazy` instance."""
245 if isinstance(value, lazy):
246 return value.evaluate()
247 return value
250def is_list(obj, scalars=(Mapping, string_t), iters=(Iterable,)):
251 """Return true if the object is iterable.
253 Note:
254 Returns false if object is a mapping or string.
255 """
256 return isinstance(obj, iters) and not isinstance(obj, scalars or ())
259def maybe_list(obj, scalars=(Mapping, string_t)):
260 """Return list of one element if ``l`` is a scalar."""
261 return obj if obj is None or is_list(obj, scalars) else [obj]
264def dictfilter(d=None, **kw):
265 """Remove all keys from dict ``d`` whose value is :const:`None`."""
266 d = kw if d is None else (dict(d, **kw) if kw else d)
267 return {k: v for k, v in items(d) if v is not None}
270def shufflecycle(it):
271 it = list(it) # don't modify callers list
272 shuffle = random.shuffle
273 for _ in repeat(None):
274 shuffle(it)
275 yield it[0]
278def fxrange(start=1.0, stop=None, step=1.0, repeatlast=False):
279 cur = start * 1.0
280 while 1:
281 if not stop or cur <= stop:
282 yield cur
283 cur += step
284 else:
285 if not repeatlast:
286 break
287 yield cur - step
290def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0):
291 sum_, cur = 0, start * 1.0
292 while 1:
293 if sum_ >= max:
294 break
295 yield cur
296 if stop:
297 cur = min(cur + step, stop)
298 else:
299 cur += step
300 sum_ += cur
303def retry_over_time(fun, catch, args=None, kwargs=None, errback=None,
304 max_retries=None, interval_start=2, interval_step=2,
305 interval_max=30, callback=None, timeout=None):
306 """Retry the function over and over until max retries is exceeded.
308 For each retry we sleep a for a while before we try again, this interval
309 is increased for every retry until the max seconds is reached.
311 Arguments:
312 fun (Callable): The function to try
313 catch (Tuple[BaseException]): Exceptions to catch, can be either
314 tuple or a single exception class.
316 Keyword Arguments:
317 args (Tuple): Positional arguments passed on to the function.
318 kwargs (Dict): Keyword arguments passed on to the function.
319 errback (Callable): Callback for when an exception in ``catch``
320 is raised. The callback must take three arguments:
321 ``exc``, ``interval_range`` and ``retries``, where ``exc``
322 is the exception instance, ``interval_range`` is an iterator
323 which return the time in seconds to sleep next, and ``retries``
324 is the number of previous retries.
325 max_retries (int): Maximum number of retries before we give up.
326 If neither of this and timeout is set, we will retry forever.
327 If one of this and timeout is reached, stop.
328 interval_start (float): How long (in seconds) we start sleeping
329 between retries.
330 interval_step (float): By how much the interval is increased for
331 each retry.
332 interval_max (float): Maximum number of seconds to sleep
333 between retries.
334 timeout (int): Maximum seconds waiting before we give up.
335 """
336 kwargs = {} if not kwargs else kwargs
337 args = [] if not args else args
338 interval_range = fxrange(interval_start,
339 interval_max + interval_start,
340 interval_step, repeatlast=True)
341 end = time() + timeout if timeout else None
342 for retries in count():
343 try:
344 return fun(*args, **kwargs)
345 except catch as exc:
346 if max_retries is not None and retries >= max_retries:
347 raise
348 if end and time() > end:
349 raise
350 if callback:
351 callback()
352 tts = float(errback(exc, interval_range, retries) if errback
353 else next(interval_range))
354 if tts:
355 for _ in range(int(tts)):
356 if callback:
357 callback()
358 sleep(1.0)
359 # sleep remainder after int truncation above.
360 sleep(abs(int(tts) - tts))
363def reprkwargs(kwargs, sep=', ', fmt='{0}={1}'):
364 return sep.join(fmt.format(k, _safe_repr(v)) for k, v in items(kwargs))
367def reprcall(name, args=(), kwargs=None, sep=', '):
368 kwargs = {} if not kwargs else kwargs
369 return '{0}({1}{2}{3})'.format(
370 name, sep.join(map(_safe_repr, args or ())),
371 (args and kwargs) and sep or '',
372 reprkwargs(kwargs, sep),
373 )
376def accepts_argument(func, argument_name):
377 if PY3:
378 argument_spec = inspect.getfullargspec(func)
379 return (
380 argument_name in argument_spec.args or
381 argument_name in argument_spec.kwonlyargs
382 )
384 argument_spec = inspect.getargspec(func)
385 argument_names = argument_spec.args
386 return argument_name in argument_names
389# Compat names (before kombu 3.0)
390promise = lazy
391maybe_promise = maybe_evaluate