Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Functional Utilities.""" 

2from __future__ import absolute_import, unicode_literals 

3 

4import random 

5import sys 

6import threading 

7import inspect 

8 

9from collections import OrderedDict 

10 

11try: 

12 from collections.abc import Iterable, Mapping 

13except ImportError: 

14 from collections import Iterable, Mapping 

15 

16from itertools import count, repeat 

17from time import sleep, time 

18 

19from vine.utils import wraps 

20 

21from kombu.five import ( 

22 UserDict, items, keys, python_2_unicode_compatible, string_t, PY3, 

23) 

24 

25from .encoding import safe_repr as _safe_repr 

26 

27__all__ = ( 

28 'LRUCache', 'memoize', 'lazy', 'maybe_evaluate', 

29 'is_list', 'maybe_list', 'dictfilter', 

30) 

31 

32KEYWORD_MARK = object() 

33 

34 

35@python_2_unicode_compatible 

36class ChannelPromise(object): 

37 

38 def __init__(self, contract): 

39 self.__contract__ = contract 

40 

41 def __call__(self): 

42 try: 

43 return self.__value__ 

44 except AttributeError: 

45 value = self.__value__ = self.__contract__() 

46 return value 

47 

48 def __repr__(self): 

49 try: 

50 return repr(self.__value__) 

51 except AttributeError: 

52 return '<promise: 0x{0:x}>'.format(id(self.__contract__)) 

53 

54 

55class LRUCache(UserDict): 

56 """LRU Cache implementation using a doubly linked list to track access. 

57 

58 Arguments: 

59 limit (int): The maximum number of keys to keep in the cache. 

60 When a new key is inserted and the limit has been exceeded, 

61 the *Least Recently Used* key will be discarded from the 

62 cache. 

63 """ 

64 

65 def __init__(self, limit=None): 

66 self.limit = limit 

67 self.mutex = threading.RLock() 

68 self.data = OrderedDict() 

69 

70 def __getitem__(self, key): 

71 with self.mutex: 

72 value = self[key] = self.data.pop(key) 

73 return value 

74 

75 def update(self, *args, **kwargs): 

76 with self.mutex: 

77 data, limit = self.data, self.limit 

78 data.update(*args, **kwargs) 

79 if limit and len(data) > limit: 

80 # pop additional items in case limit exceeded 

81 for _ in range(len(data) - limit): 

82 data.popitem(last=False) 

83 

84 def popitem(self, last=True): 

85 with self.mutex: 

86 return self.data.popitem(last) 

87 

88 def __setitem__(self, key, value): 

89 # remove least recently used key. 

90 with self.mutex: 

91 if self.limit and len(self.data) >= self.limit: 

92 self.data.pop(next(iter(self.data))) 

93 self.data[key] = value 

94 

95 def __iter__(self): 

96 return iter(self.data) 

97 

98 def _iterate_items(self): 

99 with self.mutex: 

100 for k in self: 

101 try: 

102 yield (k, self.data[k]) 

103 except KeyError: # pragma: no cover 

104 pass 

105 iteritems = _iterate_items 

106 

107 def _iterate_values(self): 

108 with self.mutex: 

109 for k in self: 

110 try: 

111 yield self.data[k] 

112 except KeyError: # pragma: no cover 

113 pass 

114 

115 itervalues = _iterate_values 

116 

117 def _iterate_keys(self): 

118 # userdict.keys in py3k calls __getitem__ 

119 with self.mutex: 

120 return keys(self.data) 

121 iterkeys = _iterate_keys 

122 

123 def incr(self, key, delta=1): 

124 with self.mutex: 

125 # this acts as memcached does- store as a string, but return a 

126 # integer as long as it exists and we can cast it 

127 newval = int(self.data.pop(key)) + delta 

128 self[key] = str(newval) 

129 return newval 

130 

131 def __getstate__(self): 

132 d = dict(vars(self)) 

133 d.pop('mutex') 

134 return d 

135 

136 def __setstate__(self, state): 

137 self.__dict__ = state 

138 self.mutex = threading.RLock() 

139 

140 if sys.version_info[0] == 3: # pragma: no cover 

141 keys = _iterate_keys 

142 values = _iterate_values 

143 items = _iterate_items 

144 else: # noqa 

145 

146 def keys(self): 

147 return list(self._iterate_keys()) 

148 

149 def values(self): 

150 return list(self._iterate_values()) 

151 

152 def items(self): 

153 return list(self._iterate_items()) 

154 

155 

156def memoize(maxsize=None, keyfun=None, Cache=LRUCache): 

157 """Decorator to cache function return value.""" 

158 def _memoize(fun): 

159 mutex = threading.Lock() 

160 cache = Cache(limit=maxsize) 

161 

162 @wraps(fun) 

163 def _M(*args, **kwargs): 

164 if keyfun: 

165 key = keyfun(args, kwargs) 

166 else: 

167 key = args + (KEYWORD_MARK,) + tuple(sorted(kwargs.items())) 

168 try: 

169 with mutex: 

170 value = cache[key] 

171 except KeyError: 

172 value = fun(*args, **kwargs) 

173 _M.misses += 1 

174 with mutex: 

175 cache[key] = value 

176 else: 

177 _M.hits += 1 

178 return value 

179 

180 def clear(): 

181 """Clear the cache and reset cache statistics.""" 

182 cache.clear() 

183 _M.hits = _M.misses = 0 

184 

185 _M.hits = _M.misses = 0 

186 _M.clear = clear 

187 _M.original_func = fun 

188 return _M 

189 

190 return _memoize 

191 

192 

193@python_2_unicode_compatible 

194class lazy(object): 

195 """Holds lazy evaluation. 

196 

197 Evaluated when called or if the :meth:`evaluate` method is called. 

198 The function is re-evaluated on every call. 

199 

200 Overloaded operations that will evaluate the promise: 

201 :meth:`__str__`, :meth:`__repr__`, :meth:`__cmp__`. 

202 """ 

203 

204 def __init__(self, fun, *args, **kwargs): 

205 self._fun = fun 

206 self._args = args 

207 self._kwargs = kwargs 

208 

209 def __call__(self): 

210 return self.evaluate() 

211 

212 def evaluate(self): 

213 return self._fun(*self._args, **self._kwargs) 

214 

215 def __str__(self): 

216 return str(self()) 

217 

218 def __repr__(self): 

219 return repr(self()) 

220 

221 def __eq__(self, rhs): 

222 return self() == rhs 

223 

224 def __ne__(self, rhs): 

225 return self() != rhs 

226 

227 def __deepcopy__(self, memo): 

228 memo[id(self)] = self 

229 return self 

230 

231 def __reduce__(self): 

232 return (self.__class__, (self._fun,), {'_args': self._args, 

233 '_kwargs': self._kwargs}) 

234 

235 if sys.version_info[0] < 3: 

236 

237 def __cmp__(self, rhs): 

238 if isinstance(rhs, self.__class__): 

239 return -cmp(rhs, self()) 

240 return cmp(self(), rhs) 

241 

242 

243def maybe_evaluate(value): 

244 """Evaluate value only if value is a :class:`lazy` instance.""" 

245 if isinstance(value, lazy): 

246 return value.evaluate() 

247 return value 

248 

249 

250def is_list(obj, scalars=(Mapping, string_t), iters=(Iterable,)): 

251 """Return true if the object is iterable. 

252 

253 Note: 

254 Returns false if object is a mapping or string. 

255 """ 

256 return isinstance(obj, iters) and not isinstance(obj, scalars or ()) 

257 

258 

259def maybe_list(obj, scalars=(Mapping, string_t)): 

260 """Return list of one element if ``l`` is a scalar.""" 

261 return obj if obj is None or is_list(obj, scalars) else [obj] 

262 

263 

264def dictfilter(d=None, **kw): 

265 """Remove all keys from dict ``d`` whose value is :const:`None`.""" 

266 d = kw if d is None else (dict(d, **kw) if kw else d) 

267 return {k: v for k, v in items(d) if v is not None} 

268 

269 

270def shufflecycle(it): 

271 it = list(it) # don't modify callers list 

272 shuffle = random.shuffle 

273 for _ in repeat(None): 

274 shuffle(it) 

275 yield it[0] 

276 

277 

278def fxrange(start=1.0, stop=None, step=1.0, repeatlast=False): 

279 cur = start * 1.0 

280 while 1: 

281 if not stop or cur <= stop: 

282 yield cur 

283 cur += step 

284 else: 

285 if not repeatlast: 

286 break 

287 yield cur - step 

288 

289 

290def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0): 

291 sum_, cur = 0, start * 1.0 

292 while 1: 

293 if sum_ >= max: 

294 break 

295 yield cur 

296 if stop: 

297 cur = min(cur + step, stop) 

298 else: 

299 cur += step 

300 sum_ += cur 

301 

302 

303def retry_over_time(fun, catch, args=None, kwargs=None, errback=None, 

304 max_retries=None, interval_start=2, interval_step=2, 

305 interval_max=30, callback=None, timeout=None): 

306 """Retry the function over and over until max retries is exceeded. 

307 

308 For each retry we sleep a for a while before we try again, this interval 

309 is increased for every retry until the max seconds is reached. 

310 

311 Arguments: 

312 fun (Callable): The function to try 

313 catch (Tuple[BaseException]): Exceptions to catch, can be either 

314 tuple or a single exception class. 

315 

316 Keyword Arguments: 

317 args (Tuple): Positional arguments passed on to the function. 

318 kwargs (Dict): Keyword arguments passed on to the function. 

319 errback (Callable): Callback for when an exception in ``catch`` 

320 is raised. The callback must take three arguments: 

321 ``exc``, ``interval_range`` and ``retries``, where ``exc`` 

322 is the exception instance, ``interval_range`` is an iterator 

323 which return the time in seconds to sleep next, and ``retries`` 

324 is the number of previous retries. 

325 max_retries (int): Maximum number of retries before we give up. 

326 If neither of this and timeout is set, we will retry forever. 

327 If one of this and timeout is reached, stop. 

328 interval_start (float): How long (in seconds) we start sleeping 

329 between retries. 

330 interval_step (float): By how much the interval is increased for 

331 each retry. 

332 interval_max (float): Maximum number of seconds to sleep 

333 between retries. 

334 timeout (int): Maximum seconds waiting before we give up. 

335 """ 

336 kwargs = {} if not kwargs else kwargs 

337 args = [] if not args else args 

338 interval_range = fxrange(interval_start, 

339 interval_max + interval_start, 

340 interval_step, repeatlast=True) 

341 end = time() + timeout if timeout else None 

342 for retries in count(): 

343 try: 

344 return fun(*args, **kwargs) 

345 except catch as exc: 

346 if max_retries is not None and retries >= max_retries: 

347 raise 

348 if end and time() > end: 

349 raise 

350 if callback: 

351 callback() 

352 tts = float(errback(exc, interval_range, retries) if errback 

353 else next(interval_range)) 

354 if tts: 

355 for _ in range(int(tts)): 

356 if callback: 

357 callback() 

358 sleep(1.0) 

359 # sleep remainder after int truncation above. 

360 sleep(abs(int(tts) - tts)) 

361 

362 

363def reprkwargs(kwargs, sep=', ', fmt='{0}={1}'): 

364 return sep.join(fmt.format(k, _safe_repr(v)) for k, v in items(kwargs)) 

365 

366 

367def reprcall(name, args=(), kwargs=None, sep=', '): 

368 kwargs = {} if not kwargs else kwargs 

369 return '{0}({1}{2}{3})'.format( 

370 name, sep.join(map(_safe_repr, args or ())), 

371 (args and kwargs) and sep or '', 

372 reprkwargs(kwargs, sep), 

373 ) 

374 

375 

376def accepts_argument(func, argument_name): 

377 if PY3: 

378 argument_spec = inspect.getfullargspec(func) 

379 return ( 

380 argument_name in argument_spec.args or 

381 argument_name in argument_spec.kwonlyargs 

382 ) 

383 

384 argument_spec = inspect.getargspec(func) 

385 argument_names = argument_spec.args 

386 return argument_name in argument_names 

387 

388 

389# Compat names (before kombu 3.0) 

390promise = lazy 

391maybe_promise = maybe_evaluate