Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2"""Configuration introspection and defaults.""" 

3from __future__ import absolute_import, unicode_literals 

4 

5import sys 

6from collections import deque, namedtuple 

7from datetime import timedelta 

8 

9from celery.five import items, keys, python_2_unicode_compatible 

10from celery.utils.functional import memoize 

11from celery.utils.serialization import strtobool 

12 

13__all__ = ('Option', 'NAMESPACES', 'flatten', 'find') 

14 

15is_jython = sys.platform.startswith('java') 

16is_pypy = hasattr(sys, 'pypy_version_info') 

17 

18DEFAULT_POOL = 'prefork' 

19if is_jython: 

20 DEFAULT_POOL = 'solo' 

21elif is_pypy: 

22 if sys.pypy_version_info[0:3] < (1, 5, 0): 

23 DEFAULT_POOL = 'solo' 

24 else: 

25 DEFAULT_POOL = 'prefork' 

26 

27DEFAULT_ACCEPT_CONTENT = ['json'] 

28DEFAULT_PROCESS_LOG_FMT = """ 

29 [%(asctime)s: %(levelname)s/%(processName)s] %(message)s 

30""".strip() 

31DEFAULT_TASK_LOG_FMT = """[%(asctime)s: %(levelname)s/%(processName)s] \ 

32%(task_name)s[%(task_id)s]: %(message)s""" 

33 

34DEFAULT_SECURITY_DIGEST = 'sha256' 

35 

36 

37OLD_NS = {'celery_{0}'} 

38OLD_NS_BEAT = {'celerybeat_{0}'} 

39OLD_NS_WORKER = {'celeryd_{0}'} 

40 

41searchresult = namedtuple('searchresult', ('namespace', 'key', 'type')) 

42 

43 

44def Namespace(__old__=None, **options): 

45 if __old__ is not None: 

46 for key, opt in items(options): 

47 if not opt.old: 

48 opt.old = {o.format(key) for o in __old__} 

49 return options 

50 

51 

52def old_ns(ns): 

53 return {'{0}_{{0}}'.format(ns)} 

54 

55 

56@python_2_unicode_compatible 

57class Option(object): 

58 """Describes a Celery configuration option.""" 

59 

60 alt = None 

61 deprecate_by = None 

62 remove_by = None 

63 old = set() 

64 typemap = {'string': str, 'int': int, 'float': float, 'any': lambda v: v, 

65 'bool': strtobool, 'dict': dict, 'tuple': tuple} 

66 

67 def __init__(self, default=None, *args, **kwargs): 

68 self.default = default 

69 self.type = kwargs.get('type') or 'string' 

70 for attr, value in items(kwargs): 

71 setattr(self, attr, value) 

72 

73 def to_python(self, value): 

74 return self.typemap[self.type](value) 

75 

76 def __repr__(self): 

77 return '<Option: type->{0} default->{1!r}>'.format(self.type, 

78 self.default) 

79 

80 

81NAMESPACES = Namespace( 

82 accept_content=Option(DEFAULT_ACCEPT_CONTENT, type='list', old=OLD_NS), 

83 result_accept_content=Option(None, type='list'), 

84 enable_utc=Option(True, type='bool'), 

85 imports=Option((), type='tuple', old=OLD_NS), 

86 include=Option((), type='tuple', old=OLD_NS), 

87 timezone=Option(type='string', old=OLD_NS), 

88 beat=Namespace( 

89 __old__=OLD_NS_BEAT, 

90 

91 max_loop_interval=Option(0, type='float'), 

92 schedule=Option({}, type='dict'), 

93 scheduler=Option('celery.beat:PersistentScheduler'), 

94 schedule_filename=Option('celerybeat-schedule'), 

95 sync_every=Option(0, type='int'), 

96 ), 

97 broker=Namespace( 

98 url=Option(None, type='string'), 

99 read_url=Option(None, type='string'), 

100 write_url=Option(None, type='string'), 

101 transport=Option(type='string'), 

102 transport_options=Option({}, type='dict'), 

103 connection_timeout=Option(4, type='float'), 

104 connection_retry=Option(True, type='bool'), 

105 connection_max_retries=Option(100, type='int'), 

106 failover_strategy=Option(None, type='string'), 

107 heartbeat=Option(120, type='int'), 

108 heartbeat_checkrate=Option(3.0, type='int'), 

109 login_method=Option(None, type='string'), 

110 pool_limit=Option(10, type='int'), 

111 use_ssl=Option(False, type='bool'), 

112 

113 host=Option(type='string'), 

114 port=Option(type='int'), 

115 user=Option(type='string'), 

116 password=Option(type='string'), 

117 vhost=Option(type='string'), 

118 ), 

119 cache=Namespace( 

120 __old__=old_ns('celery_cache'), 

121 

122 backend=Option(), 

123 backend_options=Option({}, type='dict'), 

124 ), 

125 cassandra=Namespace( 

126 entry_ttl=Option(type='float'), 

127 keyspace=Option(type='string'), 

128 port=Option(type='string'), 

129 read_consistency=Option(type='string'), 

130 servers=Option(type='list'), 

131 table=Option(type='string'), 

132 write_consistency=Option(type='string'), 

133 auth_provider=Option(type='string'), 

134 auth_kwargs=Option(type='string'), 

135 options=Option({}, type='dict'), 

136 ), 

137 s3=Namespace( 

138 access_key_id=Option(type='string'), 

139 secret_access_key=Option(type='string'), 

140 bucket=Option(type='string'), 

141 base_path=Option(type='string'), 

142 endpoint_url=Option(type='string'), 

143 region=Option(type='string'), 

144 ), 

145 azureblockblob=Namespace( 

146 container_name=Option('celery', type='string'), 

147 retry_initial_backoff_sec=Option(2, type='int'), 

148 retry_increment_base=Option(2, type='int'), 

149 retry_max_attempts=Option(3, type='int'), 

150 ), 

151 control=Namespace( 

152 queue_ttl=Option(300.0, type='float'), 

153 queue_expires=Option(10.0, type='float'), 

154 exchange=Option('celery', type='string'), 

155 ), 

156 couchbase=Namespace( 

157 __old__=old_ns('celery_couchbase'), 

158 

159 backend_settings=Option(None, type='dict'), 

160 ), 

161 arangodb=Namespace( 

162 __old__=old_ns('celery_arangodb'), 

163 backend_settings=Option(None, type='dict') 

164 ), 

165 mongodb=Namespace( 

166 __old__=old_ns('celery_mongodb'), 

167 

168 backend_settings=Option(type='dict'), 

169 ), 

170 cosmosdbsql=Namespace( 

171 database_name=Option('celerydb', type='string'), 

172 collection_name=Option('celerycol', type='string'), 

173 consistency_level=Option('Session', type='string'), 

174 max_retry_attempts=Option(9, type='int'), 

175 max_retry_wait_time=Option(30, type='int'), 

176 ), 

177 event=Namespace( 

178 __old__=old_ns('celery_event'), 

179 

180 queue_expires=Option(60.0, type='float'), 

181 queue_ttl=Option(5.0, type='float'), 

182 queue_prefix=Option('celeryev'), 

183 serializer=Option('json'), 

184 exchange=Option('celeryev', type='string'), 

185 ), 

186 redis=Namespace( 

187 __old__=old_ns('celery_redis'), 

188 

189 backend_use_ssl=Option(type='dict'), 

190 db=Option(type='int'), 

191 host=Option(type='string'), 

192 max_connections=Option(type='int'), 

193 password=Option(type='string'), 

194 port=Option(type='int'), 

195 socket_timeout=Option(120.0, type='float'), 

196 socket_connect_timeout=Option(None, type='float'), 

197 retry_on_timeout=Option(False, type='bool'), 

198 socket_keepalive=Option(False, type='bool'), 

199 ), 

200 result=Namespace( 

201 __old__=old_ns('celery_result'), 

202 

203 backend=Option(type='string'), 

204 cache_max=Option( 

205 -1, 

206 type='int', old={'celery_max_cached_results'}, 

207 ), 

208 compression=Option(type='str'), 

209 exchange=Option('celeryresults'), 

210 exchange_type=Option('direct'), 

211 expires=Option( 

212 timedelta(days=1), 

213 type='float', old={'celery_task_result_expires'}, 

214 ), 

215 persistent=Option(None, type='bool'), 

216 extended=Option(False, type='bool'), 

217 serializer=Option('json'), 

218 backend_transport_options=Option({}, type='dict'), 

219 chord_retry_interval=Option(1.0, type='float'), 

220 chord_join_timeout=Option(3.0, type='float'), 

221 backend_max_sleep_between_retries_ms=Option(10000, type='int'), 

222 backend_max_retries=Option(float("inf"), type='float'), 

223 backend_base_sleep_between_retries_ms=Option(10, type='int'), 

224 backend_always_retry=Option(False, type='bool'), 

225 ), 

226 elasticsearch=Namespace( 

227 __old__=old_ns('celery_elasticsearch'), 

228 

229 retry_on_timeout=Option(type='bool'), 

230 max_retries=Option(type='int'), 

231 timeout=Option(type='float'), 

232 save_meta_as_text=Option(True, type='bool'), 

233 ), 

234 riak=Namespace( 

235 __old__=old_ns('celery_riak'), 

236 

237 backend_settings=Option(type='dict'), 

238 ), 

239 security=Namespace( 

240 __old__=old_ns('celery_security'), 

241 

242 certificate=Option(type='string'), 

243 cert_store=Option(type='string'), 

244 key=Option(type='string'), 

245 digest=Option(DEFAULT_SECURITY_DIGEST, type='string'), 

246 ), 

247 database=Namespace( 

248 url=Option(old={'celery_result_dburi'}), 

249 engine_options=Option( 

250 type='dict', old={'celery_result_engine_options'}, 

251 ), 

252 short_lived_sessions=Option( 

253 False, type='bool', old={'celery_result_db_short_lived_sessions'}, 

254 ), 

255 table_schemas=Option(type='dict'), 

256 table_names=Option(type='dict', old={'celery_result_db_tablenames'}), 

257 ), 

258 task=Namespace( 

259 __old__=OLD_NS, 

260 acks_late=Option(False, type='bool'), 

261 acks_on_failure_or_timeout=Option(True, type='bool'), 

262 always_eager=Option(False, type='bool'), 

263 annotations=Option(type='any'), 

264 compression=Option(type='string', old={'celery_message_compression'}), 

265 create_missing_queues=Option(True, type='bool'), 

266 inherit_parent_priority=Option(False, type='bool'), 

267 default_delivery_mode=Option(2, type='string'), 

268 default_queue=Option('celery'), 

269 default_exchange=Option(None, type='string'), # taken from queue 

270 default_exchange_type=Option('direct'), 

271 default_routing_key=Option(None, type='string'), # taken from queue 

272 default_rate_limit=Option(type='string'), 

273 default_priority=Option(None, type='string'), 

274 eager_propagates=Option( 

275 False, type='bool', old={'celery_eager_propagates_exceptions'}, 

276 ), 

277 ignore_result=Option(False, type='bool'), 

278 protocol=Option(2, type='int', old={'celery_task_protocol'}), 

279 publish_retry=Option( 

280 True, type='bool', old={'celery_task_publish_retry'}, 

281 ), 

282 publish_retry_policy=Option( 

283 {'max_retries': 3, 

284 'interval_start': 0, 

285 'interval_max': 1, 

286 'interval_step': 0.2}, 

287 type='dict', old={'celery_task_publish_retry_policy'}, 

288 ), 

289 queues=Option(type='dict'), 

290 queue_ha_policy=Option(None, type='string'), 

291 queue_max_priority=Option(None, type='int'), 

292 reject_on_worker_lost=Option(type='bool'), 

293 remote_tracebacks=Option(False, type='bool'), 

294 routes=Option(type='any'), 

295 send_sent_event=Option( 

296 False, type='bool', old={'celery_send_task_sent_event'}, 

297 ), 

298 serializer=Option('json', old={'celery_task_serializer'}), 

299 soft_time_limit=Option( 

300 type='float', old={'celeryd_task_soft_time_limit'}, 

301 ), 

302 time_limit=Option( 

303 type='float', old={'celeryd_task_time_limit'}, 

304 ), 

305 store_errors_even_if_ignored=Option(False, type='bool'), 

306 track_started=Option(False, type='bool'), 

307 ), 

308 worker=Namespace( 

309 __old__=OLD_NS_WORKER, 

310 agent=Option(None, type='string'), 

311 autoscaler=Option('celery.worker.autoscale:Autoscaler'), 

312 concurrency=Option(0, type='int'), 

313 consumer=Option('celery.worker.consumer:Consumer', type='string'), 

314 direct=Option(False, type='bool', old={'celery_worker_direct'}), 

315 disable_rate_limits=Option( 

316 False, type='bool', old={'celery_disable_rate_limits'}, 

317 ), 

318 enable_remote_control=Option( 

319 True, type='bool', old={'celery_enable_remote_control'}, 

320 ), 

321 hijack_root_logger=Option(True, type='bool'), 

322 log_color=Option(type='bool'), 

323 log_format=Option(DEFAULT_PROCESS_LOG_FMT), 

324 lost_wait=Option(10.0, type='float', old={'celeryd_worker_lost_wait'}), 

325 max_memory_per_child=Option(type='int'), 

326 max_tasks_per_child=Option(type='int'), 

327 pool=Option(DEFAULT_POOL), 

328 pool_putlocks=Option(True, type='bool'), 

329 pool_restarts=Option(False, type='bool'), 

330 proc_alive_timeout=Option(4.0, type='float'), 

331 prefetch_multiplier=Option(4, type='int'), 

332 redirect_stdouts=Option( 

333 True, type='bool', old={'celery_redirect_stdouts'}, 

334 ), 

335 redirect_stdouts_level=Option( 

336 'WARNING', old={'celery_redirect_stdouts_level'}, 

337 ), 

338 send_task_events=Option( 

339 False, type='bool', old={'celery_send_events'}, 

340 ), 

341 state_db=Option(), 

342 task_log_format=Option(DEFAULT_TASK_LOG_FMT), 

343 timer=Option(type='string'), 

344 timer_precision=Option(1.0, type='float'), 

345 ), 

346) 

347 

348 

349def _flatten_keys(ns, key, opt): 

350 return [(ns + key, opt)] 

351 

352 

353def _to_compat(ns, key, opt): 

354 if opt.old: 

355 return [ 

356 (oldkey.format(key).upper(), ns + key, opt) 

357 for oldkey in opt.old 

358 ] 

359 return [((ns + key).upper(), ns + key, opt)] 

360 

361 

362def flatten(d, root='', keyfilter=_flatten_keys): 

363 """Flatten settings.""" 

364 stack = deque([(root, d)]) 

365 while stack: 

366 ns, options = stack.popleft() 

367 for key, opt in items(options): 

368 if isinstance(opt, dict): 

369 stack.append((ns + key + '_', opt)) 

370 else: 

371 for ret in keyfilter(ns, key, opt): 

372 yield ret 

373 

374 

375DEFAULTS = { 

376 key: opt.default for key, opt in flatten(NAMESPACES) 

377} 

378__compat = list(flatten(NAMESPACES, keyfilter=_to_compat)) 

379_OLD_DEFAULTS = {old_key: opt.default for old_key, _, opt in __compat} 

380_TO_OLD_KEY = {new_key: old_key for old_key, new_key, _ in __compat} 

381_TO_NEW_KEY = {old_key: new_key for old_key, new_key, _ in __compat} 

382__compat = None 

383 

384SETTING_KEYS = set(keys(DEFAULTS)) 

385_OLD_SETTING_KEYS = set(keys(_TO_NEW_KEY)) 

386 

387 

388def find_deprecated_settings(source): # pragma: no cover 

389 from celery.utils import deprecated 

390 for name, opt in flatten(NAMESPACES): 

391 if (opt.deprecate_by or opt.remove_by) and getattr(source, name, None): 

392 deprecated.warn(description='The {0!r} setting'.format(name), 

393 deprecation=opt.deprecate_by, 

394 removal=opt.remove_by, 

395 alternative='Use the {0.alt} instead'.format(opt)) 

396 return source 

397 

398 

399@memoize(maxsize=None) 

400def find(name, namespace='celery'): 

401 """Find setting by name.""" 

402 # - Try specified name-space first. 

403 namespace = namespace.lower() 

404 try: 

405 return searchresult( 

406 namespace, name.lower(), NAMESPACES[namespace][name.lower()], 

407 ) 

408 except KeyError: 

409 # - Try all the other namespaces. 

410 for ns, opts in items(NAMESPACES): 

411 if ns.lower() == name.lower(): 

412 return searchresult(None, ns, opts) 

413 elif isinstance(opts, dict): 

414 try: 

415 return searchresult(ns, name.lower(), opts[name.lower()]) 

416 except KeyError: 

417 pass 

418 # - See if name is a qualname last. 

419 return searchresult(None, name.lower(), DEFAULTS[name.lower()])