Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/celery/utils/dispatch/signal.py : 0%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2"""Implementation of the Observer pattern."""
3from __future__ import absolute_import, unicode_literals
5import sys
6import threading
7import warnings
8import weakref
10from kombu.utils.functional import retry_over_time
12from celery.exceptions import CDeprecationWarning
13from celery.five import PY3, python_2_unicode_compatible, range, text_t
14from celery.local import PromiseProxy, Proxy
15from celery.utils.functional import fun_accepts_kwargs
16from celery.utils.log import get_logger
17from celery.utils.time import humanize_seconds
19try:
20 from weakref import WeakMethod
21except ImportError:
22 from .weakref_backports import WeakMethod # noqa
24__all__ = ('Signal',)
26logger = get_logger(__name__)
29def _make_id(target): # pragma: no cover
30 if isinstance(target, Proxy):
31 target = target._get_current_object()
32 if isinstance(target, (bytes, text_t)):
33 # see Issue #2475
34 return target
35 if hasattr(target, '__func__'):
36 return id(target.__func__)
37 return id(target)
40def _boundmethod_safe_weakref(obj):
41 """Get weakref constructor appropriate for `obj`. `obj` may be a bound method.
43 Bound method objects must be special-cased because they're usually garbage
44 collected immediately, even if the instance they're bound to persists.
46 Returns:
47 a (weakref constructor, main object) tuple. `weakref constructor` is
48 either :class:`weakref.ref` or :class:`weakref.WeakMethod`. `main
49 object` is the instance that `obj` is bound to if it is a bound method;
50 otherwise `main object` is simply `obj.
51 """
52 try:
53 obj.__func__
54 obj.__self__
55 # Bound method
56 return WeakMethod, obj.__self__
57 except AttributeError:
58 # Not a bound method
59 return weakref.ref, obj
62def _make_lookup_key(receiver, sender, dispatch_uid):
63 if dispatch_uid:
64 return (dispatch_uid, _make_id(sender))
65 else:
66 return (_make_id(receiver), _make_id(sender))
69NONE_ID = _make_id(None)
71NO_RECEIVERS = object()
73RECEIVER_RETRY_ERROR = """\
74Could not process signal receiver %(receiver)s. Retrying %(when)s...\
75"""
78@python_2_unicode_compatible
79class Signal(object): # pragma: no cover
80 """Create new signal.
82 Keyword Arguments:
83 providing_args (List): A list of the arguments this signal can pass
84 along in a :meth:`send` call.
85 use_caching (bool): Enable receiver cache.
86 name (str): Name of signal, used for debugging purposes.
87 """
89 #: Holds a dictionary of
90 #: ``{receiverkey (id): weakref(receiver)}`` mappings.
91 receivers = None
93 def __init__(self, providing_args=None, use_caching=False, name=None):
94 self.receivers = []
95 self.providing_args = set(
96 providing_args if providing_args is not None else [])
97 self.lock = threading.Lock()
98 self.use_caching = use_caching
99 self.name = name
100 # For convenience we create empty caches even if they are not used.
101 # A note about caching: if use_caching is defined, then for each
102 # distinct sender we cache the receivers that sender has in
103 # 'sender_receivers_cache'. The cache is cleaned when .connect() or
104 # .disconnect() is called and populated on .send().
105 self.sender_receivers_cache = (
106 weakref.WeakKeyDictionary() if use_caching else {}
107 )
108 self._dead_receivers = False
110 def _connect_proxy(self, fun, sender, weak, dispatch_uid):
111 return self.connect(
112 fun, sender=sender._get_current_object(),
113 weak=weak, dispatch_uid=dispatch_uid,
114 )
116 def connect(self, *args, **kwargs):
117 """Connect receiver to sender for signal.
119 Arguments:
120 receiver (Callable): A function or an instance method which is to
121 receive signals. Receivers must be hashable objects.
123 if weak is :const:`True`, then receiver must be
124 weak-referenceable.
126 Receivers must be able to accept keyword arguments.
128 If receivers have a `dispatch_uid` attribute, the receiver will
129 not be added if another receiver already exists with that
130 `dispatch_uid`.
132 sender (Any): The sender to which the receiver should respond.
133 Must either be a Python object, or :const:`None` to
134 receive events from any sender.
136 weak (bool): Whether to use weak references to the receiver.
137 By default, the module will attempt to use weak references to
138 the receiver objects. If this parameter is false, then strong
139 references will be used.
141 dispatch_uid (Hashable): An identifier used to uniquely identify a
142 particular instance of a receiver. This will usually be a
143 string, though it may be anything hashable.
145 retry (bool): If the signal receiver raises an exception
146 (e.g. ConnectionError), the receiver will be retried until it
147 runs successfully. A strong ref to the receiver will be stored
148 and the `weak` option will be ignored.
149 """
150 def _handle_options(sender=None, weak=True, dispatch_uid=None,
151 retry=False):
153 def _connect_signal(fun):
155 options = {'dispatch_uid': dispatch_uid,
156 'weak': weak}
158 def _retry_receiver(retry_fun):
160 def _try_receiver_over_time(*args, **kwargs):
161 def on_error(exc, intervals, retries):
162 interval = next(intervals)
163 err_msg = RECEIVER_RETRY_ERROR % \
164 {'receiver': retry_fun,
165 'when': humanize_seconds(interval, 'in', ' ')}
166 logger.error(err_msg)
167 return interval
169 return retry_over_time(retry_fun, Exception, args,
170 kwargs, on_error)
172 return _try_receiver_over_time
174 if retry:
175 options['weak'] = False
176 if not dispatch_uid:
177 # if there's no dispatch_uid then we need to set the
178 # dispatch uid to the original func id so we can look
179 # it up later with the original func id
180 options['dispatch_uid'] = _make_id(fun)
181 fun = _retry_receiver(fun)
183 self._connect_signal(fun, sender, options['weak'],
184 options['dispatch_uid'])
185 return fun
187 return _connect_signal
189 if args and callable(args[0]):
190 return _handle_options(*args[1:], **kwargs)(args[0])
191 return _handle_options(*args, **kwargs)
193 def _connect_signal(self, receiver, sender, weak, dispatch_uid):
194 assert callable(receiver), 'Signal receivers must be callable'
195 if not fun_accepts_kwargs(receiver):
196 raise ValueError(
197 'Signal receiver must accept keyword arguments.')
199 if isinstance(sender, PromiseProxy):
200 sender.__then__(
201 self._connect_proxy, receiver, sender, weak, dispatch_uid,
202 )
203 return receiver
205 lookup_key = _make_lookup_key(receiver, sender, dispatch_uid)
207 if weak:
208 ref, receiver_object = _boundmethod_safe_weakref(receiver)
209 if PY3:
210 receiver = ref(receiver)
211 weakref.finalize(receiver_object, self._remove_receiver)
212 else:
213 receiver = ref(receiver, self._remove_receiver)
215 with self.lock:
216 self._clear_dead_receivers()
217 for r_key, _ in self.receivers:
218 if r_key == lookup_key:
219 break
220 else:
221 self.receivers.append((lookup_key, receiver))
222 self.sender_receivers_cache.clear()
224 return receiver
226 def disconnect(self, receiver=None, sender=None, weak=None,
227 dispatch_uid=None):
228 """Disconnect receiver from sender for signal.
230 If weak references are used, disconnect needn't be called.
231 The receiver will be removed from dispatch automatically.
233 Arguments:
234 receiver (Callable): The registered receiver to disconnect.
235 May be none if `dispatch_uid` is specified.
237 sender (Any): The registered sender to disconnect.
239 weak (bool): The weakref state to disconnect.
241 dispatch_uid (Hashable): The unique identifier of the receiver
242 to disconnect.
243 """
244 if weak is not None:
245 warnings.warn(
246 'Passing `weak` to disconnect has no effect.',
247 CDeprecationWarning, stacklevel=2)
249 lookup_key = _make_lookup_key(receiver, sender, dispatch_uid)
251 disconnected = False
252 with self.lock:
253 self._clear_dead_receivers()
254 for index in range(len(self.receivers)):
255 (r_key, _) = self.receivers[index]
256 if r_key == lookup_key:
257 disconnected = True
258 del self.receivers[index]
259 break
260 self.sender_receivers_cache.clear()
261 return disconnected
263 def has_listeners(self, sender=None):
264 return bool(self._live_receivers(sender))
266 def send(self, sender, **named):
267 """Send signal from sender to all connected receivers.
269 If any receiver raises an error, the error propagates back through
270 send, terminating the dispatch loop, so it is quite possible to not
271 have all receivers called if a raises an error.
273 Arguments:
274 sender (Any): The sender of the signal.
275 Either a specific object or :const:`None`.
276 **named (Any): Named arguments which will be passed to receivers.
278 Returns:
279 List: of tuple pairs: `[(receiver, response), … ]`.
280 """
281 responses = []
282 if not self.receivers or \
283 self.sender_receivers_cache.get(sender) is NO_RECEIVERS:
284 return responses
286 for receiver in self._live_receivers(sender):
287 try:
288 response = receiver(signal=self, sender=sender, **named)
289 except Exception as exc: # pylint: disable=broad-except
290 if not hasattr(exc, '__traceback__'):
291 exc.__traceback__ = sys.exc_info()[2]
292 logger.exception(
293 'Signal handler %r raised: %r', receiver, exc)
294 responses.append((receiver, exc))
295 else:
296 responses.append((receiver, response))
297 return responses
298 send_robust = send # Compat with Django interface.
300 def _clear_dead_receivers(self):
301 # Warning: caller is assumed to hold self.lock
302 if self._dead_receivers:
303 self._dead_receivers = False
304 new_receivers = []
305 for r in self.receivers:
306 if isinstance(r[1], weakref.ReferenceType) and r[1]() is None:
307 continue
308 new_receivers.append(r)
309 self.receivers = new_receivers
311 def _live_receivers(self, sender):
312 """Filter sequence of receivers to get resolved, live receivers.
314 This checks for weak references and resolves them, then returning only
315 live receivers.
316 """
317 receivers = None
318 if self.use_caching and not self._dead_receivers:
319 receivers = self.sender_receivers_cache.get(sender)
320 # We could end up here with NO_RECEIVERS even if we do check this
321 # case in .send() prior to calling _Live_receivers() due to
322 # concurrent .send() call.
323 if receivers is NO_RECEIVERS:
324 return []
325 if receivers is None:
326 with self.lock:
327 self._clear_dead_receivers()
328 senderkey = _make_id(sender)
329 receivers = []
330 for (receiverkey, r_senderkey), receiver in self.receivers:
331 if r_senderkey == NONE_ID or r_senderkey == senderkey:
332 receivers.append(receiver)
333 if self.use_caching:
334 if not receivers:
335 self.sender_receivers_cache[sender] = NO_RECEIVERS
336 else:
337 # Note: we must cache the weakref versions.
338 self.sender_receivers_cache[sender] = receivers
339 non_weak_receivers = []
340 for receiver in receivers:
341 if isinstance(receiver, weakref.ReferenceType):
342 # Dereference the weak reference.
343 receiver = receiver()
344 if receiver is not None:
345 non_weak_receivers.append(receiver)
346 else:
347 non_weak_receivers.append(receiver)
348 return non_weak_receivers
350 def _remove_receiver(self, receiver=None):
351 """Remove dead receivers from connections."""
352 # Mark that the self..receivers first has dead weakrefs. If so,
353 # we will clean those up in connect, disconnect and _live_receivers
354 # while holding self.lock. Note that doing the cleanup here isn't a
355 # good idea, _remove_receiver() will be called as a side effect of
356 # garbage collection, and so the call can happen wh ile we are already
357 # holding self.lock.
358 self._dead_receivers = True
360 def __repr__(self):
361 """``repr(signal)``."""
362 return '<{0}: {1} providing_args={2!r}>'.format(
363 type(self).__name__, self.name, self.providing_args)
365 def __str__(self):
366 """``str(signal)``."""
367 return repr(self)