Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/celery/app/defaults.py : 10%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2"""Configuration introspection and defaults."""
3from __future__ import absolute_import, unicode_literals
5import sys
6from collections import deque, namedtuple
7from datetime import timedelta
9from celery.five import items, keys, python_2_unicode_compatible
10from celery.utils.functional import memoize
11from celery.utils.serialization import strtobool
13__all__ = ('Option', 'NAMESPACES', 'flatten', 'find')
15is_jython = sys.platform.startswith('java')
16is_pypy = hasattr(sys, 'pypy_version_info')
18DEFAULT_POOL = 'prefork'
19if is_jython:
20 DEFAULT_POOL = 'solo'
21elif is_pypy:
22 if sys.pypy_version_info[0:3] < (1, 5, 0):
23 DEFAULT_POOL = 'solo'
24 else:
25 DEFAULT_POOL = 'prefork'
27DEFAULT_ACCEPT_CONTENT = ['json']
28DEFAULT_PROCESS_LOG_FMT = """
29 [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
30""".strip()
31DEFAULT_TASK_LOG_FMT = """[%(asctime)s: %(levelname)s/%(processName)s] \
32%(task_name)s[%(task_id)s]: %(message)s"""
34DEFAULT_SECURITY_DIGEST = 'sha256'
37OLD_NS = {'celery_{0}'}
38OLD_NS_BEAT = {'celerybeat_{0}'}
39OLD_NS_WORKER = {'celeryd_{0}'}
41searchresult = namedtuple('searchresult', ('namespace', 'key', 'type'))
44def Namespace(__old__=None, **options):
45 if __old__ is not None:
46 for key, opt in items(options):
47 if not opt.old:
48 opt.old = {o.format(key) for o in __old__}
49 return options
52def old_ns(ns):
53 return {'{0}_{{0}}'.format(ns)}
56@python_2_unicode_compatible
57class Option(object):
58 """Describes a Celery configuration option."""
60 alt = None
61 deprecate_by = None
62 remove_by = None
63 old = set()
64 typemap = {'string': str, 'int': int, 'float': float, 'any': lambda v: v,
65 'bool': strtobool, 'dict': dict, 'tuple': tuple}
67 def __init__(self, default=None, *args, **kwargs):
68 self.default = default
69 self.type = kwargs.get('type') or 'string'
70 for attr, value in items(kwargs):
71 setattr(self, attr, value)
73 def to_python(self, value):
74 return self.typemap[self.type](value)
76 def __repr__(self):
77 return '<Option: type->{0} default->{1!r}>'.format(self.type,
78 self.default)
81NAMESPACES = Namespace(
82 accept_content=Option(DEFAULT_ACCEPT_CONTENT, type='list', old=OLD_NS),
83 result_accept_content=Option(None, type='list'),
84 enable_utc=Option(True, type='bool'),
85 imports=Option((), type='tuple', old=OLD_NS),
86 include=Option((), type='tuple', old=OLD_NS),
87 timezone=Option(type='string', old=OLD_NS),
88 beat=Namespace(
89 __old__=OLD_NS_BEAT,
91 max_loop_interval=Option(0, type='float'),
92 schedule=Option({}, type='dict'),
93 scheduler=Option('celery.beat:PersistentScheduler'),
94 schedule_filename=Option('celerybeat-schedule'),
95 sync_every=Option(0, type='int'),
96 ),
97 broker=Namespace(
98 url=Option(None, type='string'),
99 read_url=Option(None, type='string'),
100 write_url=Option(None, type='string'),
101 transport=Option(type='string'),
102 transport_options=Option({}, type='dict'),
103 connection_timeout=Option(4, type='float'),
104 connection_retry=Option(True, type='bool'),
105 connection_max_retries=Option(100, type='int'),
106 failover_strategy=Option(None, type='string'),
107 heartbeat=Option(120, type='int'),
108 heartbeat_checkrate=Option(3.0, type='int'),
109 login_method=Option(None, type='string'),
110 pool_limit=Option(10, type='int'),
111 use_ssl=Option(False, type='bool'),
113 host=Option(type='string'),
114 port=Option(type='int'),
115 user=Option(type='string'),
116 password=Option(type='string'),
117 vhost=Option(type='string'),
118 ),
119 cache=Namespace(
120 __old__=old_ns('celery_cache'),
122 backend=Option(),
123 backend_options=Option({}, type='dict'),
124 ),
125 cassandra=Namespace(
126 entry_ttl=Option(type='float'),
127 keyspace=Option(type='string'),
128 port=Option(type='string'),
129 read_consistency=Option(type='string'),
130 servers=Option(type='list'),
131 table=Option(type='string'),
132 write_consistency=Option(type='string'),
133 auth_provider=Option(type='string'),
134 auth_kwargs=Option(type='string'),
135 options=Option({}, type='dict'),
136 ),
137 s3=Namespace(
138 access_key_id=Option(type='string'),
139 secret_access_key=Option(type='string'),
140 bucket=Option(type='string'),
141 base_path=Option(type='string'),
142 endpoint_url=Option(type='string'),
143 region=Option(type='string'),
144 ),
145 azureblockblob=Namespace(
146 container_name=Option('celery', type='string'),
147 retry_initial_backoff_sec=Option(2, type='int'),
148 retry_increment_base=Option(2, type='int'),
149 retry_max_attempts=Option(3, type='int'),
150 ),
151 control=Namespace(
152 queue_ttl=Option(300.0, type='float'),
153 queue_expires=Option(10.0, type='float'),
154 exchange=Option('celery', type='string'),
155 ),
156 couchbase=Namespace(
157 __old__=old_ns('celery_couchbase'),
159 backend_settings=Option(None, type='dict'),
160 ),
161 arangodb=Namespace(
162 __old__=old_ns('celery_arangodb'),
163 backend_settings=Option(None, type='dict')
164 ),
165 mongodb=Namespace(
166 __old__=old_ns('celery_mongodb'),
168 backend_settings=Option(type='dict'),
169 ),
170 cosmosdbsql=Namespace(
171 database_name=Option('celerydb', type='string'),
172 collection_name=Option('celerycol', type='string'),
173 consistency_level=Option('Session', type='string'),
174 max_retry_attempts=Option(9, type='int'),
175 max_retry_wait_time=Option(30, type='int'),
176 ),
177 event=Namespace(
178 __old__=old_ns('celery_event'),
180 queue_expires=Option(60.0, type='float'),
181 queue_ttl=Option(5.0, type='float'),
182 queue_prefix=Option('celeryev'),
183 serializer=Option('json'),
184 exchange=Option('celeryev', type='string'),
185 ),
186 redis=Namespace(
187 __old__=old_ns('celery_redis'),
189 backend_use_ssl=Option(type='dict'),
190 db=Option(type='int'),
191 host=Option(type='string'),
192 max_connections=Option(type='int'),
193 password=Option(type='string'),
194 port=Option(type='int'),
195 socket_timeout=Option(120.0, type='float'),
196 socket_connect_timeout=Option(None, type='float'),
197 retry_on_timeout=Option(False, type='bool'),
198 socket_keepalive=Option(False, type='bool'),
199 ),
200 result=Namespace(
201 __old__=old_ns('celery_result'),
203 backend=Option(type='string'),
204 cache_max=Option(
205 -1,
206 type='int', old={'celery_max_cached_results'},
207 ),
208 compression=Option(type='str'),
209 exchange=Option('celeryresults'),
210 exchange_type=Option('direct'),
211 expires=Option(
212 timedelta(days=1),
213 type='float', old={'celery_task_result_expires'},
214 ),
215 persistent=Option(None, type='bool'),
216 extended=Option(False, type='bool'),
217 serializer=Option('json'),
218 backend_transport_options=Option({}, type='dict'),
219 chord_retry_interval=Option(1.0, type='float'),
220 chord_join_timeout=Option(3.0, type='float'),
221 backend_max_sleep_between_retries_ms=Option(10000, type='int'),
222 backend_max_retries=Option(float("inf"), type='float'),
223 backend_base_sleep_between_retries_ms=Option(10, type='int'),
224 backend_always_retry=Option(False, type='bool'),
225 ),
226 elasticsearch=Namespace(
227 __old__=old_ns('celery_elasticsearch'),
229 retry_on_timeout=Option(type='bool'),
230 max_retries=Option(type='int'),
231 timeout=Option(type='float'),
232 save_meta_as_text=Option(True, type='bool'),
233 ),
234 riak=Namespace(
235 __old__=old_ns('celery_riak'),
237 backend_settings=Option(type='dict'),
238 ),
239 security=Namespace(
240 __old__=old_ns('celery_security'),
242 certificate=Option(type='string'),
243 cert_store=Option(type='string'),
244 key=Option(type='string'),
245 digest=Option(DEFAULT_SECURITY_DIGEST, type='string'),
246 ),
247 database=Namespace(
248 url=Option(old={'celery_result_dburi'}),
249 engine_options=Option(
250 type='dict', old={'celery_result_engine_options'},
251 ),
252 short_lived_sessions=Option(
253 False, type='bool', old={'celery_result_db_short_lived_sessions'},
254 ),
255 table_schemas=Option(type='dict'),
256 table_names=Option(type='dict', old={'celery_result_db_tablenames'}),
257 ),
258 task=Namespace(
259 __old__=OLD_NS,
260 acks_late=Option(False, type='bool'),
261 acks_on_failure_or_timeout=Option(True, type='bool'),
262 always_eager=Option(False, type='bool'),
263 annotations=Option(type='any'),
264 compression=Option(type='string', old={'celery_message_compression'}),
265 create_missing_queues=Option(True, type='bool'),
266 inherit_parent_priority=Option(False, type='bool'),
267 default_delivery_mode=Option(2, type='string'),
268 default_queue=Option('celery'),
269 default_exchange=Option(None, type='string'), # taken from queue
270 default_exchange_type=Option('direct'),
271 default_routing_key=Option(None, type='string'), # taken from queue
272 default_rate_limit=Option(type='string'),
273 default_priority=Option(None, type='string'),
274 eager_propagates=Option(
275 False, type='bool', old={'celery_eager_propagates_exceptions'},
276 ),
277 ignore_result=Option(False, type='bool'),
278 protocol=Option(2, type='int', old={'celery_task_protocol'}),
279 publish_retry=Option(
280 True, type='bool', old={'celery_task_publish_retry'},
281 ),
282 publish_retry_policy=Option(
283 {'max_retries': 3,
284 'interval_start': 0,
285 'interval_max': 1,
286 'interval_step': 0.2},
287 type='dict', old={'celery_task_publish_retry_policy'},
288 ),
289 queues=Option(type='dict'),
290 queue_ha_policy=Option(None, type='string'),
291 queue_max_priority=Option(None, type='int'),
292 reject_on_worker_lost=Option(type='bool'),
293 remote_tracebacks=Option(False, type='bool'),
294 routes=Option(type='any'),
295 send_sent_event=Option(
296 False, type='bool', old={'celery_send_task_sent_event'},
297 ),
298 serializer=Option('json', old={'celery_task_serializer'}),
299 soft_time_limit=Option(
300 type='float', old={'celeryd_task_soft_time_limit'},
301 ),
302 time_limit=Option(
303 type='float', old={'celeryd_task_time_limit'},
304 ),
305 store_errors_even_if_ignored=Option(False, type='bool'),
306 track_started=Option(False, type='bool'),
307 ),
308 worker=Namespace(
309 __old__=OLD_NS_WORKER,
310 agent=Option(None, type='string'),
311 autoscaler=Option('celery.worker.autoscale:Autoscaler'),
312 concurrency=Option(0, type='int'),
313 consumer=Option('celery.worker.consumer:Consumer', type='string'),
314 direct=Option(False, type='bool', old={'celery_worker_direct'}),
315 disable_rate_limits=Option(
316 False, type='bool', old={'celery_disable_rate_limits'},
317 ),
318 enable_remote_control=Option(
319 True, type='bool', old={'celery_enable_remote_control'},
320 ),
321 hijack_root_logger=Option(True, type='bool'),
322 log_color=Option(type='bool'),
323 log_format=Option(DEFAULT_PROCESS_LOG_FMT),
324 lost_wait=Option(10.0, type='float', old={'celeryd_worker_lost_wait'}),
325 max_memory_per_child=Option(type='int'),
326 max_tasks_per_child=Option(type='int'),
327 pool=Option(DEFAULT_POOL),
328 pool_putlocks=Option(True, type='bool'),
329 pool_restarts=Option(False, type='bool'),
330 proc_alive_timeout=Option(4.0, type='float'),
331 prefetch_multiplier=Option(4, type='int'),
332 redirect_stdouts=Option(
333 True, type='bool', old={'celery_redirect_stdouts'},
334 ),
335 redirect_stdouts_level=Option(
336 'WARNING', old={'celery_redirect_stdouts_level'},
337 ),
338 send_task_events=Option(
339 False, type='bool', old={'celery_send_events'},
340 ),
341 state_db=Option(),
342 task_log_format=Option(DEFAULT_TASK_LOG_FMT),
343 timer=Option(type='string'),
344 timer_precision=Option(1.0, type='float'),
345 ),
346)
349def _flatten_keys(ns, key, opt):
350 return [(ns + key, opt)]
353def _to_compat(ns, key, opt):
354 if opt.old:
355 return [
356 (oldkey.format(key).upper(), ns + key, opt)
357 for oldkey in opt.old
358 ]
359 return [((ns + key).upper(), ns + key, opt)]
362def flatten(d, root='', keyfilter=_flatten_keys):
363 """Flatten settings."""
364 stack = deque([(root, d)])
365 while stack:
366 ns, options = stack.popleft()
367 for key, opt in items(options):
368 if isinstance(opt, dict):
369 stack.append((ns + key + '_', opt))
370 else:
371 for ret in keyfilter(ns, key, opt):
372 yield ret
375DEFAULTS = {
376 key: opt.default for key, opt in flatten(NAMESPACES)
377}
378__compat = list(flatten(NAMESPACES, keyfilter=_to_compat))
379_OLD_DEFAULTS = {old_key: opt.default for old_key, _, opt in __compat}
380_TO_OLD_KEY = {new_key: old_key for old_key, new_key, _ in __compat}
381_TO_NEW_KEY = {old_key: new_key for old_key, new_key, _ in __compat}
382__compat = None
384SETTING_KEYS = set(keys(DEFAULTS))
385_OLD_SETTING_KEYS = set(keys(_TO_NEW_KEY))
388def find_deprecated_settings(source): # pragma: no cover
389 from celery.utils import deprecated
390 for name, opt in flatten(NAMESPACES):
391 if (opt.deprecate_by or opt.remove_by) and getattr(source, name, None):
392 deprecated.warn(description='The {0!r} setting'.format(name),
393 deprecation=opt.deprecate_by,
394 removal=opt.remove_by,
395 alternative='Use the {0.alt} instead'.format(opt))
396 return source
399@memoize(maxsize=None)
400def find(name, namespace='celery'):
401 """Find setting by name."""
402 # - Try specified name-space first.
403 namespace = namespace.lower()
404 try:
405 return searchresult(
406 namespace, name.lower(), NAMESPACES[namespace][name.lower()],
407 )
408 except KeyError:
409 # - Try all the other namespaces.
410 for ns, opts in items(NAMESPACES):
411 if ns.lower() == name.lower():
412 return searchresult(None, ns, opts)
413 elif isinstance(opts, dict):
414 try:
415 return searchresult(ns, name.lower(), opts[name.lower()])
416 except KeyError:
417 pass
418 # - See if name is a qualname last.
419 return searchresult(None, name.lower(), DEFAULTS[name.lower()])