Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/celery/utils/threads.py : 7%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2"""Threading primitives and utilities."""
3from __future__ import absolute_import, print_function, unicode_literals
5import os
6import socket
7import sys
8import threading
9import traceback
10from contextlib import contextmanager
12from celery.five import THREAD_TIMEOUT_MAX, items, python_2_unicode_compatible
13from celery.local import Proxy
15try:
16 from greenlet import getcurrent as get_ident
17except ImportError: # pragma: no cover
18 try:
19 from _thread import get_ident # noqa
20 except ImportError:
21 try:
22 from thread import get_ident # noqa
23 except ImportError: # pragma: no cover
24 try:
25 from _dummy_thread import get_ident # noqa
26 except ImportError:
27 from dummy_thread import get_ident # noqa
30__all__ = (
31 'bgThread', 'Local', 'LocalStack', 'LocalManager',
32 'get_ident', 'default_socket_timeout',
33)
35USE_FAST_LOCALS = os.environ.get('USE_FAST_LOCALS')
38@contextmanager
39def default_socket_timeout(timeout):
40 """Context temporarily setting the default socket timeout."""
41 prev = socket.getdefaulttimeout()
42 socket.setdefaulttimeout(timeout)
43 yield
44 socket.setdefaulttimeout(prev)
47class bgThread(threading.Thread):
48 """Background service thread."""
50 def __init__(self, name=None, **kwargs):
51 super(bgThread, self).__init__()
52 self._is_shutdown = threading.Event()
53 self._is_stopped = threading.Event()
54 self.daemon = True
55 self.name = name or self.__class__.__name__
57 def body(self):
58 raise NotImplementedError()
60 def on_crash(self, msg, *fmt, **kwargs):
61 print(msg.format(*fmt), file=sys.stderr)
62 traceback.print_exc(None, sys.stderr)
64 def run(self):
65 body = self.body
66 shutdown_set = self._is_shutdown.is_set
67 try:
68 while not shutdown_set():
69 try:
70 body()
71 except Exception as exc: # pylint: disable=broad-except
72 try:
73 self.on_crash('{0!r} crashed: {1!r}', self.name, exc)
74 self._set_stopped()
75 finally:
76 sys.stderr.flush()
77 os._exit(1) # exiting by normal means won't work
78 finally:
79 self._set_stopped()
81 def _set_stopped(self):
82 try:
83 self._is_stopped.set()
84 except TypeError: # pragma: no cover
85 # we lost the race at interpreter shutdown,
86 # so gc collected built-in modules.
87 pass
89 def stop(self):
90 """Graceful shutdown."""
91 self._is_shutdown.set()
92 self._is_stopped.wait()
93 if self.is_alive():
94 self.join(THREAD_TIMEOUT_MAX)
97def release_local(local):
98 """Release the contents of the local for the current context.
100 This makes it possible to use locals without a manager.
102 With this function one can release :class:`Local` objects as well as
103 :class:`StackLocal` objects. However it's not possible to
104 release data held by proxies that way, one always has to retain
105 a reference to the underlying local object in order to be able
106 to release it.
108 Example:
109 >>> loc = Local()
110 >>> loc.foo = 42
111 >>> release_local(loc)
112 >>> hasattr(loc, 'foo')
113 False
114 """
115 local.__release_local__()
118class Local(object):
119 """Local object."""
121 __slots__ = ('__storage__', '__ident_func__')
123 def __init__(self):
124 object.__setattr__(self, '__storage__', {})
125 object.__setattr__(self, '__ident_func__', get_ident)
127 def __iter__(self):
128 return iter(items(self.__storage__))
130 def __call__(self, proxy):
131 """Create a proxy for a name."""
132 return Proxy(self, proxy)
134 def __release_local__(self):
135 self.__storage__.pop(self.__ident_func__(), None)
137 def __getattr__(self, name):
138 try:
139 return self.__storage__[self.__ident_func__()][name]
140 except KeyError:
141 raise AttributeError(name)
143 def __setattr__(self, name, value):
144 ident = self.__ident_func__()
145 storage = self.__storage__
146 try:
147 storage[ident][name] = value
148 except KeyError:
149 storage[ident] = {name: value}
151 def __delattr__(self, name):
152 try:
153 del self.__storage__[self.__ident_func__()][name]
154 except KeyError:
155 raise AttributeError(name)
158class _LocalStack(object):
159 """Local stack.
161 This class works similar to a :class:`Local` but keeps a stack
162 of objects instead. This is best explained with an example::
164 >>> ls = LocalStack()
165 >>> ls.push(42)
166 >>> ls.top
167 42
168 >>> ls.push(23)
169 >>> ls.top
170 23
171 >>> ls.pop()
172 23
173 >>> ls.top
174 42
176 They can be force released by using a :class:`LocalManager` or with
177 the :func:`release_local` function but the correct way is to pop the
178 item from the stack after using. When the stack is empty it will
179 no longer be bound to the current context (and as such released).
181 By calling the stack without arguments it will return a proxy that
182 resolves to the topmost item on the stack.
183 """
185 def __init__(self):
186 self._local = Local()
188 def __release_local__(self):
189 self._local.__release_local__()
191 def _get__ident_func__(self):
192 return self._local.__ident_func__
194 def _set__ident_func__(self, value):
195 object.__setattr__(self._local, '__ident_func__', value)
196 __ident_func__ = property(_get__ident_func__, _set__ident_func__)
197 del _get__ident_func__, _set__ident_func__
199 def __call__(self):
200 def _lookup():
201 rv = self.top
202 if rv is None:
203 raise RuntimeError('object unbound')
204 return rv
205 return Proxy(_lookup)
207 def push(self, obj):
208 """Push a new item to the stack."""
209 rv = getattr(self._local, 'stack', None)
210 if rv is None:
211 # pylint: disable=assigning-non-slot
212 # This attribute is defined now.
213 self._local.stack = rv = []
214 rv.append(obj)
215 return rv
217 def pop(self):
218 """Remove the topmost item from the stack.
220 Note:
221 Will return the old value or `None` if the stack was already empty.
222 """
223 stack = getattr(self._local, 'stack', None)
224 if stack is None:
225 return None
226 elif len(stack) == 1:
227 release_local(self._local)
228 return stack[-1]
229 else:
230 return stack.pop()
232 def __len__(self):
233 stack = getattr(self._local, 'stack', None)
234 return len(stack) if stack else 0
236 @property
237 def stack(self):
238 # get_current_worker_task uses this to find
239 # the original task that was executed by the worker.
240 stack = getattr(self._local, 'stack', None)
241 if stack is not None:
242 return stack
243 return []
245 @property
246 def top(self):
247 """The topmost item on the stack.
249 Note:
250 If the stack is empty, :const:`None` is returned.
251 """
252 try:
253 return self._local.stack[-1]
254 except (AttributeError, IndexError):
255 return None
258@python_2_unicode_compatible
259class LocalManager(object):
260 """Local objects cannot manage themselves.
262 For that you need a local manager.
263 You can pass a local manager multiple locals or add them
264 later by appending them to ``manager.locals``. Every time the manager
265 cleans up, it will clean up all the data left in the locals for this
266 context.
268 The ``ident_func`` parameter can be added to override the default ident
269 function for the wrapped locals.
270 """
272 def __init__(self, locals=None, ident_func=None):
273 if locals is None:
274 self.locals = []
275 elif isinstance(locals, Local):
276 self.locals = [locals]
277 else:
278 self.locals = list(locals)
279 if ident_func is not None:
280 self.ident_func = ident_func
281 for local in self.locals:
282 object.__setattr__(local, '__ident_func__', ident_func)
283 else:
284 self.ident_func = get_ident
286 def get_ident(self):
287 """Return context identifier.
289 This is the indentifer the local objects use internally
290 for this context. You cannot override this method to change the
291 behavior but use it to link other context local objects (such as
292 SQLAlchemy's scoped sessions) to the Werkzeug locals.
293 """
294 return self.ident_func()
296 def cleanup(self):
297 """Manually clean up the data in the locals for this context.
299 Call this at the end of the request or use ``make_middleware()``.
300 """
301 for local in self.locals:
302 release_local(local)
304 def __repr__(self):
305 return '<{0} storages: {1}>'.format(
306 self.__class__.__name__, len(self.locals))
309class _FastLocalStack(threading.local):
311 def __init__(self):
312 self.stack = []
313 self.push = self.stack.append
314 self.pop = self.stack.pop
315 super(_FastLocalStack, self).__init__()
317 @property
318 def top(self):
319 try:
320 return self.stack[-1]
321 except (AttributeError, IndexError):
322 return None
324 def __len__(self):
325 return len(self.stack)
328if USE_FAST_LOCALS: # pragma: no cover
329 LocalStack = _FastLocalStack
330else: # pragma: no cover
331 # - See #706
332 # since each thread has its own greenlet we can just use those as
333 # identifiers for the context. If greenlets aren't available we
334 # fall back to the current thread ident.
335 LocalStack = _LocalStack # noqa