Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/celery/app/utils.py : 12%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2"""App utilities: Compat settings, bug-report tool, pickling apps."""
3from __future__ import absolute_import, unicode_literals
5import os
6import platform as _platform
7import re
8from collections import namedtuple
9from copy import deepcopy
10from types import ModuleType
12from kombu.utils.url import maybe_sanitize_url
14from celery.exceptions import ImproperlyConfigured
15from celery.five import items, keys, string_t, values
16from celery.platforms import pyimplementation
17from celery.utils.collections import ConfigurationView
18from celery.utils.imports import import_from_cwd, qualname, symbol_by_name
19from celery.utils.text import pretty
21from .defaults import (_OLD_DEFAULTS, _OLD_SETTING_KEYS, _TO_NEW_KEY,
22 _TO_OLD_KEY, DEFAULTS, SETTING_KEYS, find)
24try:
25 from collections.abc import Mapping
26except ImportError:
27 # TODO: Remove this when we drop Python 2.7 support
28 from collections import Mapping
31__all__ = (
32 'Settings', 'appstr', 'bugreport',
33 'filter_hidden_settings', 'find_app',
34)
36#: Format used to generate bug-report information.
37BUGREPORT_INFO = """
38software -> celery:{celery_v} kombu:{kombu_v} py:{py_v}
39 billiard:{billiard_v} {driver_v}
40platform -> system:{system} arch:{arch}
41 kernel version:{kernel_version} imp:{py_i}
42loader -> {loader}
43settings -> transport:{transport} results:{results}
45{human_settings}
46"""
48HIDDEN_SETTINGS = re.compile(
49 'API|TOKEN|KEY|SECRET|PASS|PROFANITIES_LIST|SIGNATURE|DATABASE',
50 re.IGNORECASE,
51)
53E_MIX_OLD_INTO_NEW = """
55Cannot mix new and old setting keys, please rename the
56following settings to the new format:
58{renames}
60"""
62E_MIX_NEW_INTO_OLD = """
64Cannot mix new setting names with old setting names, please
65rename the following settings to use the old format:
67{renames}
69Or change all of the settings to use the new format :)
71"""
73FMT_REPLACE_SETTING = '{replace:<36} -> {with_}'
76def appstr(app):
77 """String used in __repr__ etc, to id app instances."""
78 return '{0} at {1:#x}'.format(app.main or '__main__', id(app))
81class Settings(ConfigurationView):
82 """Celery settings object.
84 .. seealso:
86 :ref:`configuration` for a full list of configuration keys.
88 """
90 @property
91 def broker_read_url(self):
92 return (
93 os.environ.get('CELERY_BROKER_READ_URL') or
94 self.get('broker_read_url') or
95 self.broker_url
96 )
98 @property
99 def broker_write_url(self):
100 return (
101 os.environ.get('CELERY_BROKER_WRITE_URL') or
102 self.get('broker_write_url') or
103 self.broker_url
104 )
106 @property
107 def broker_url(self):
108 return (
109 os.environ.get('CELERY_BROKER_URL') or
110 self.first('broker_url', 'broker_host')
111 )
113 @property
114 def result_backend(self):
115 return (
116 os.environ.get('CELERY_RESULT_BACKEND') or
117 self.first('result_backend', 'CELERY_RESULT_BACKEND')
118 )
120 @property
121 def task_default_exchange(self):
122 return self.first(
123 'task_default_exchange',
124 'task_default_queue',
125 )
127 @property
128 def task_default_routing_key(self):
129 return self.first(
130 'task_default_routing_key',
131 'task_default_queue',
132 )
134 @property
135 def timezone(self):
136 # this way we also support django's time zone.
137 return self.first('timezone', 'time_zone')
139 def without_defaults(self):
140 """Return the current configuration, but without defaults."""
141 # the last stash is the default settings, so just skip that
142 return Settings({}, self.maps[:-1])
144 def value_set_for(self, key):
145 return key in self.without_defaults()
147 def find_option(self, name, namespace=''):
148 """Search for option by name.
150 Example:
151 >>> from proj.celery import app
152 >>> app.conf.find_option('disable_rate_limits')
153 ('worker', 'prefetch_multiplier',
154 <Option: type->bool default->False>))
156 Arguments:
157 name (str): Name of option, cannot be partial.
158 namespace (str): Preferred name-space (``None`` by default).
159 Returns:
160 Tuple: of ``(namespace, key, type)``.
161 """
162 return find(name, namespace)
164 def find_value_for_key(self, name, namespace='celery'):
165 """Shortcut to ``get_by_parts(*find_option(name)[:-1])``."""
166 return self.get_by_parts(*self.find_option(name, namespace)[:-1])
168 def get_by_parts(self, *parts):
169 """Return the current value for setting specified as a path.
171 Example:
172 >>> from proj.celery import app
173 >>> app.conf.get_by_parts('worker', 'disable_rate_limits')
174 False
175 """
176 return self['_'.join(part for part in parts if part)]
178 def finalize(self):
179 # See PendingConfiguration in celery/app/base.py
180 # first access will read actual configuration.
181 try:
182 self['__bogus__']
183 except KeyError:
184 pass
185 return self
187 def table(self, with_defaults=False, censored=True):
188 filt = filter_hidden_settings if censored else lambda v: v
189 dict_members = dir(dict)
190 self.finalize()
191 return filt({
192 k: v for k, v in items(
193 self if with_defaults else self.without_defaults())
194 if not k.startswith('_') and k not in dict_members
195 })
197 def humanize(self, with_defaults=False, censored=True):
198 """Return a human readable text showing configuration changes."""
199 return '\n'.join(
200 '{0}: {1}'.format(key, pretty(value, width=50))
201 for key, value in items(self.table(with_defaults, censored)))
204def _new_key_to_old(key, convert=_TO_OLD_KEY.get):
205 return convert(key, key)
208def _old_key_to_new(key, convert=_TO_NEW_KEY.get):
209 return convert(key, key)
212_settings_info_t = namedtuple('settings_info_t', (
213 'defaults', 'convert', 'key_t', 'mix_error',
214))
216_settings_info = _settings_info_t(
217 DEFAULTS, _TO_NEW_KEY, _old_key_to_new, E_MIX_OLD_INTO_NEW,
218)
219_old_settings_info = _settings_info_t(
220 _OLD_DEFAULTS, _TO_OLD_KEY, _new_key_to_old, E_MIX_NEW_INTO_OLD,
221)
224def detect_settings(conf, preconf=None, ignore_keys=None, prefix=None,
225 all_keys=None, old_keys=None):
226 preconf = {} if not preconf else preconf
227 ignore_keys = set() if not ignore_keys else ignore_keys
228 all_keys = SETTING_KEYS if not all_keys else all_keys
229 old_keys = _OLD_SETTING_KEYS if not old_keys else old_keys
231 source = conf
232 if conf is None:
233 source, conf = preconf, {}
234 have = set(keys(source)) - ignore_keys
235 is_in_new = have.intersection(all_keys)
236 is_in_old = have.intersection(old_keys)
238 info = None
239 if is_in_new:
240 # have new setting names
241 info, left = _settings_info, is_in_old
242 if is_in_old and len(is_in_old) > len(is_in_new):
243 # Majority of the settings are old.
244 info, left = _old_settings_info, is_in_new
245 if is_in_old:
246 # have old setting names, or a majority of the names are old.
247 if not info:
248 info, left = _old_settings_info, is_in_new
249 if is_in_new and len(is_in_new) > len(is_in_old):
250 # Majority of the settings are new
251 info, left = _settings_info, is_in_old
252 else:
253 # no settings, just use new format.
254 info, left = _settings_info, is_in_old
256 if prefix:
257 # always use new format if prefix is used.
258 info, left = _settings_info, set()
260 # only raise error for keys that the user didn't provide two keys
261 # for (e.g., both ``result_expires`` and ``CELERY_TASK_RESULT_EXPIRES``).
262 really_left = {key for key in left if info.convert[key] not in have}
263 if really_left:
264 # user is mixing old/new, or new/old settings, give renaming
265 # suggestions.
266 raise ImproperlyConfigured(info.mix_error.format(renames='\n'.join(
267 FMT_REPLACE_SETTING.format(replace=key, with_=info.convert[key])
268 for key in sorted(really_left)
269 )))
271 preconf = {info.convert.get(k, k): v for k, v in items(preconf)}
272 defaults = dict(deepcopy(info.defaults), **preconf)
273 return Settings(
274 preconf, [conf, defaults],
275 (_old_key_to_new, _new_key_to_old),
276 prefix=prefix,
277 )
280class AppPickler(object):
281 """Old application pickler/unpickler (< 3.1)."""
283 def __call__(self, cls, *args):
284 kwargs = self.build_kwargs(*args)
285 app = self.construct(cls, **kwargs)
286 self.prepare(app, **kwargs)
287 return app
289 def prepare(self, app, **kwargs):
290 app.conf.update(kwargs['changes'])
292 def build_kwargs(self, *args):
293 return self.build_standard_kwargs(*args)
295 def build_standard_kwargs(self, main, changes, loader, backend, amqp,
296 events, log, control, accept_magic_kwargs,
297 config_source=None):
298 return {'main': main, 'loader': loader, 'backend': backend,
299 'amqp': amqp, 'changes': changes, 'events': events,
300 'log': log, 'control': control, 'set_as_current': False,
301 'config_source': config_source}
303 def construct(self, cls, **kwargs):
304 return cls(**kwargs)
307def _unpickle_app(cls, pickler, *args):
308 """Rebuild app for versions 2.5+."""
309 return pickler()(cls, *args)
312def _unpickle_app_v2(cls, kwargs):
313 """Rebuild app for versions 3.1+."""
314 kwargs['set_as_current'] = False
315 return cls(**kwargs)
318def filter_hidden_settings(conf):
319 """Filter sensitive settings."""
320 def maybe_censor(key, value, mask='*' * 8):
321 if isinstance(value, Mapping):
322 return filter_hidden_settings(value)
323 if isinstance(key, string_t):
324 if HIDDEN_SETTINGS.search(key):
325 return mask
326 elif 'broker_url' in key.lower():
327 from kombu import Connection
328 return Connection(value).as_uri(mask=mask)
329 elif 'backend' in key.lower():
330 return maybe_sanitize_url(value, mask=mask)
332 return value
334 return {k: maybe_censor(k, v) for k, v in items(conf)}
337def bugreport(app):
338 """Return a string containing information useful in bug-reports."""
339 import billiard
340 import celery
341 import kombu
343 try:
344 conn = app.connection()
345 driver_v = '{0}:{1}'.format(conn.transport.driver_name,
346 conn.transport.driver_version())
347 transport = conn.transport_cls
348 except Exception: # pylint: disable=broad-except
349 transport = driver_v = ''
351 return BUGREPORT_INFO.format(
352 system=_platform.system(),
353 arch=', '.join(x for x in _platform.architecture() if x),
354 kernel_version=_platform.release(),
355 py_i=pyimplementation(),
356 celery_v=celery.VERSION_BANNER,
357 kombu_v=kombu.__version__,
358 billiard_v=billiard.__version__,
359 py_v=_platform.python_version(),
360 driver_v=driver_v,
361 transport=transport,
362 results=maybe_sanitize_url(app.conf.result_backend or 'disabled'),
363 human_settings=app.conf.humanize(),
364 loader=qualname(app.loader.__class__),
365 )
368def find_app(app, symbol_by_name=symbol_by_name, imp=import_from_cwd):
369 """Find app by name."""
370 from .base import Celery
372 try:
373 sym = symbol_by_name(app, imp=imp)
374 except AttributeError:
375 # last part was not an attribute, but a module
376 sym = imp(app)
377 if isinstance(sym, ModuleType) and ':' not in app:
378 try:
379 found = sym.app
380 if isinstance(found, ModuleType):
381 raise AttributeError()
382 except AttributeError:
383 try:
384 found = sym.celery
385 if isinstance(found, ModuleType):
386 raise AttributeError("attribute 'celery' is the celery module not the instance of celery")
387 except AttributeError:
388 if getattr(sym, '__path__', None):
389 try:
390 return find_app(
391 '{0}.celery'.format(app),
392 symbol_by_name=symbol_by_name, imp=imp,
393 )
394 except ImportError:
395 pass
396 for suspect in values(vars(sym)):
397 if isinstance(suspect, Celery):
398 return suspect
399 raise
400 else:
401 return found
402 else:
403 return found
404 return sym