Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/celery/app/base.py : 25%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2"""Actual App instance implementation."""
3from __future__ import absolute_import, unicode_literals
5import os
6import threading
7import warnings
8from collections import defaultdict, deque
9from datetime import datetime
10from operator import attrgetter
12from kombu import pools
13from kombu.clocks import LamportClock
14from kombu.common import oid_from
15from kombu.utils.compat import register_after_fork
16from kombu.utils.objects import cached_property
17from kombu.utils.uuid import uuid
18from vine import starpromise
19from vine.utils import wraps
21from celery import platforms, signals
22from celery._state import (_announce_app_finalized, _deregister_app,
23 _register_app, _set_current_app, _task_stack,
24 connect_on_app_finalize, get_current_app,
25 get_current_worker_task, set_default_app)
26from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured, Ignore, Retry
27from celery.five import (UserDict, bytes_if_py2, python_2_unicode_compatible,
28 values)
29from celery.loaders import get_loader_cls
30from celery.local import PromiseProxy, maybe_evaluate
31from celery.utils import abstract
32from celery.utils.collections import AttributeDictMixin
33from celery.utils.dispatch import Signal
34from celery.utils.functional import first, head_from_fun, maybe_list
35from celery.utils.imports import gen_task_name, instantiate, symbol_by_name
36from celery.utils.log import get_logger
37from celery.utils.objects import FallbackContext, mro_lookup
38from celery.utils.time import (get_exponential_backoff_interval, timezone,
39 to_utc)
41# Load all builtin tasks
42from . import builtins # noqa
43from . import backends
44from .annotations import prepare as prepare_annotations
45from .defaults import DEFAULT_SECURITY_DIGEST, find_deprecated_settings
46from .registry import TaskRegistry
47from .utils import (AppPickler, Settings, _new_key_to_old, _old_key_to_new,
48 _unpickle_app, _unpickle_app_v2, appstr, bugreport,
49 detect_settings)
51__all__ = ('Celery',)
53logger = get_logger(__name__)
55BUILTIN_FIXUPS = {
56 'celery.fixups.django:fixup',
57}
58USING_EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
60ERR_ENVVAR_NOT_SET = """
61The environment variable {0!r} is not set,
62and as such the configuration could not be loaded.
64Please set this variable and make sure it points to
65a valid configuration module.
67Example:
68 {0}="proj.celeryconfig"
69"""
72def app_has_custom(app, attr):
73 """Return true if app has customized method `attr`.
75 Note:
76 This is used for optimizations in cases where we know
77 how the default behavior works, but need to account
78 for someone using inheritance to override a method/property.
79 """
80 return mro_lookup(app.__class__, attr, stop={Celery, object},
81 monkey_patched=[__name__])
84def _unpickle_appattr(reverse_name, args):
85 """Unpickle app."""
86 # Given an attribute name and a list of args, gets
87 # the attribute from the current app and calls it.
88 return get_current_app()._rgetattr(reverse_name)(*args)
91def _after_fork_cleanup_app(app):
92 # This is used with multiprocessing.register_after_fork,
93 # so need to be at module level.
94 try:
95 app._after_fork()
96 except Exception as exc: # pylint: disable=broad-except
97 logger.info('after forker raised exception: %r', exc, exc_info=1)
100class PendingConfiguration(UserDict, AttributeDictMixin):
101 # `app.conf` will be of this type before being explicitly configured,
102 # meaning the app can keep any configuration set directly
103 # on `app.conf` before the `app.config_from_object` call.
104 #
105 # accessing any key will finalize the configuration,
106 # replacing `app.conf` with a concrete settings object.
108 callback = None
109 _data = None
111 def __init__(self, conf, callback):
112 object.__setattr__(self, '_data', conf)
113 object.__setattr__(self, 'callback', callback)
115 def __setitem__(self, key, value):
116 self._data[key] = value
118 def clear(self):
119 self._data.clear()
121 def update(self, *args, **kwargs):
122 self._data.update(*args, **kwargs)
124 def setdefault(self, *args, **kwargs):
125 return self._data.setdefault(*args, **kwargs)
127 def __contains__(self, key):
128 # XXX will not show finalized configuration
129 # setdefault will cause `key in d` to happen,
130 # so for setdefault to be lazy, so does contains.
131 return key in self._data
133 def __len__(self):
134 return len(self.data)
136 def __repr__(self):
137 return repr(self.data)
139 @cached_property
140 def data(self):
141 return self.callback()
144@python_2_unicode_compatible
145class Celery(object):
146 """Celery application.
148 Arguments:
149 main (str): Name of the main module if running as `__main__`.
150 This is used as the prefix for auto-generated task names.
152 Keyword Arguments:
153 broker (str): URL of the default broker used.
154 backend (Union[str, Type[celery.backends.base.Backend]]):
155 The result store backend class, or the name of the backend
156 class to use.
158 Default is the value of the :setting:`result_backend` setting.
159 autofinalize (bool): If set to False a :exc:`RuntimeError`
160 will be raised if the task registry or tasks are used before
161 the app is finalized.
162 set_as_current (bool): Make this the global current app.
163 include (List[str]): List of modules every worker should import.
165 amqp (Union[str, Type[AMQP]]): AMQP object or class name.
166 events (Union[str, Type[celery.app.events.Events]]): Events object or
167 class name.
168 log (Union[str, Type[Logging]]): Log object or class name.
169 control (Union[str, Type[celery.app.control.Control]]): Control object
170 or class name.
171 tasks (Union[str, Type[TaskRegistry]]): A task registry, or the name of
172 a registry class.
173 fixups (List[str]): List of fix-up plug-ins (e.g., see
174 :mod:`celery.fixups.django`).
175 config_source (Union[str, class]): Take configuration from a class,
176 or object. Attributes may include any settings described in
177 the documentation.
178 task_cls (Union[str, Type[celery.app.task.Task]]): base task class to
179 use. See :ref:`this section <custom-task-cls-app-wide>` for usage.
180 """
182 #: This is deprecated, use :meth:`reduce_keys` instead
183 Pickler = AppPickler
185 SYSTEM = platforms.SYSTEM
186 IS_macOS, IS_WINDOWS = platforms.IS_macOS, platforms.IS_WINDOWS
188 #: Name of the `__main__` module. Required for standalone scripts.
189 #:
190 #: If set this will be used instead of `__main__` when automatically
191 #: generating task names.
192 main = None
194 #: Custom options for command-line programs.
195 #: See :ref:`extending-commandoptions`
196 user_options = None
198 #: Custom bootsteps to extend and modify the worker.
199 #: See :ref:`extending-bootsteps`.
200 steps = None
202 builtin_fixups = BUILTIN_FIXUPS
204 amqp_cls = 'celery.app.amqp:AMQP'
205 backend_cls = None
206 events_cls = 'celery.app.events:Events'
207 loader_cls = None
208 log_cls = 'celery.app.log:Logging'
209 control_cls = 'celery.app.control:Control'
210 task_cls = 'celery.app.task:Task'
211 registry_cls = 'celery.app.registry:TaskRegistry'
213 _fixups = None
214 _pool = None
215 _conf = None
216 _after_fork_registered = False
218 #: Signal sent when app is loading configuration.
219 on_configure = None
221 #: Signal sent after app has prepared the configuration.
222 on_after_configure = None
224 #: Signal sent after app has been finalized.
225 on_after_finalize = None
227 #: Signal sent by every new process after fork.
228 on_after_fork = None
230 def __init__(self, main=None, loader=None, backend=None,
231 amqp=None, events=None, log=None, control=None,
232 set_as_current=True, tasks=None, broker=None, include=None,
233 changes=None, config_source=None, fixups=None, task_cls=None,
234 autofinalize=True, namespace=None, strict_typing=True,
235 **kwargs):
236 self.clock = LamportClock()
237 self.main = main
238 self.amqp_cls = amqp or self.amqp_cls
239 self.events_cls = events or self.events_cls
240 self.loader_cls = loader or self._get_default_loader()
241 self.log_cls = log or self.log_cls
242 self.control_cls = control or self.control_cls
243 self.task_cls = task_cls or self.task_cls
244 self.set_as_current = set_as_current
245 self.registry_cls = symbol_by_name(self.registry_cls)
246 self.user_options = defaultdict(set)
247 self.steps = defaultdict(set)
248 self.autofinalize = autofinalize
249 self.namespace = namespace
250 self.strict_typing = strict_typing
252 self.configured = False
253 self._config_source = config_source
254 self._pending_defaults = deque()
255 self._pending_periodic_tasks = deque()
257 self.finalized = False
258 self._finalize_mutex = threading.Lock()
259 self._pending = deque()
260 self._tasks = tasks
261 if not isinstance(self._tasks, TaskRegistry):
262 self._tasks = self.registry_cls(self._tasks or {})
264 # If the class defines a custom __reduce_args__ we need to use
265 # the old way of pickling apps: pickling a list of
266 # args instead of the new way that pickles a dict of keywords.
267 self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
269 # these options are moved to the config to
270 # simplify pickling of the app object.
271 self._preconf = changes or {}
272 self._preconf_set_by_auto = set()
273 self.__autoset('broker_url', broker)
274 self.__autoset('result_backend', backend)
275 self.__autoset('include', include)
276 self.__autoset('broker_use_ssl', kwargs.get('broker_use_ssl'))
277 self.__autoset('redis_backend_use_ssl', kwargs.get('redis_backend_use_ssl'))
278 self._conf = Settings(
279 PendingConfiguration(
280 self._preconf, self._finalize_pending_conf),
281 prefix=self.namespace,
282 keys=(_old_key_to_new, _new_key_to_old),
283 )
285 # - Apply fix-ups.
286 self.fixups = set(self.builtin_fixups) if fixups is None else fixups
287 # ...store fixup instances in _fixups to keep weakrefs alive.
288 self._fixups = [symbol_by_name(fixup)(self) for fixup in self.fixups]
290 if self.set_as_current:
291 self.set_current()
293 # Signals
294 if self.on_configure is None:
295 # used to be a method pre 4.0
296 self.on_configure = Signal(name='app.on_configure')
297 self.on_after_configure = Signal(
298 name='app.on_after_configure',
299 providing_args={'source'},
300 )
301 self.on_after_finalize = Signal(name='app.on_after_finalize')
302 self.on_after_fork = Signal(name='app.on_after_fork')
304 self.on_init()
305 _register_app(self)
307 def _get_default_loader(self):
308 # the --loader command-line argument sets the environment variable.
309 return (
310 os.environ.get('CELERY_LOADER') or
311 self.loader_cls or
312 'celery.loaders.app:AppLoader'
313 )
315 def on_init(self):
316 """Optional callback called at init."""
318 def __autoset(self, key, value):
319 if value:
320 self._preconf[key] = value
321 self._preconf_set_by_auto.add(key)
323 def set_current(self):
324 """Make this the current app for this thread."""
325 _set_current_app(self)
327 def set_default(self):
328 """Make this the default app for all threads."""
329 set_default_app(self)
331 def _ensure_after_fork(self):
332 if not self._after_fork_registered:
333 self._after_fork_registered = True
334 if register_after_fork is not None:
335 register_after_fork(self, _after_fork_cleanup_app)
337 def close(self):
338 """Clean up after the application.
340 Only necessary for dynamically created apps, and you should
341 probably use the :keyword:`with` statement instead.
343 Example:
344 >>> with Celery(set_as_current=False) as app:
345 ... with app.connection_for_write() as conn:
346 ... pass
347 """
348 self._pool = None
349 _deregister_app(self)
351 def start(self, argv=None):
352 """Run :program:`celery` using `argv`.
354 Uses :data:`sys.argv` if `argv` is not specified.
355 """
356 return instantiate(
357 'celery.bin.celery:CeleryCommand', app=self
358 ).execute_from_commandline(argv)
360 def worker_main(self, argv=None):
361 """Run :program:`celery worker` using `argv`.
363 Uses :data:`sys.argv` if `argv` is not specified.
364 """
365 return instantiate(
366 'celery.bin.worker:worker', app=self
367 ).execute_from_commandline(argv)
369 def task(self, *args, **opts):
370 """Decorator to create a task class out of any callable.
372 See :ref:`Task options<task-options>` for a list of the
373 arguments that can be passed to this decorator.
375 Examples:
376 .. code-block:: python
378 @app.task
379 def refresh_feed(url):
380 store_feed(feedparser.parse(url))
382 with setting extra options:
384 .. code-block:: python
386 @app.task(exchange='feeds')
387 def refresh_feed(url):
388 return store_feed(feedparser.parse(url))
390 Note:
391 App Binding: For custom apps the task decorator will return
392 a proxy object, so that the act of creating the task is not
393 performed until the task is used or the task registry is accessed.
395 If you're depending on binding to be deferred, then you must
396 not access any attributes on the returned object until the
397 application is fully set up (finalized).
398 """
399 if USING_EXECV and opts.get('lazy', True):
400 # When using execv the task in the original module will point to a
401 # different app, so doing things like 'add.request' will point to
402 # a different task instance. This makes sure it will always use
403 # the task instance from the current app.
404 # Really need a better solution for this :(
405 from . import shared_task
406 return shared_task(*args, lazy=False, **opts)
408 def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts):
409 _filt = filter
411 def _create_task_cls(fun):
412 if shared:
413 def cons(app):
414 return app._task_from_fun(fun, **opts)
415 cons.__name__ = fun.__name__
416 connect_on_app_finalize(cons)
417 if not lazy or self.finalized:
418 ret = self._task_from_fun(fun, **opts)
419 else:
420 # return a proxy object that evaluates on first use
421 ret = PromiseProxy(self._task_from_fun, (fun,), opts,
422 __doc__=fun.__doc__)
423 self._pending.append(ret)
424 if _filt:
425 return _filt(ret)
426 return ret
428 return _create_task_cls
430 if len(args) == 1:
431 if callable(args[0]):
432 return inner_create_task_cls(**opts)(*args)
433 raise TypeError('argument 1 to @task() must be a callable')
434 if args:
435 raise TypeError(
436 '@task() takes exactly 1 argument ({0} given)'.format(
437 sum([len(args), len(opts)])))
438 return inner_create_task_cls(**opts)
440 def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
441 if not self.finalized and not self.autofinalize:
442 raise RuntimeError('Contract breach: app not finalized')
443 name = name or self.gen_task_name(fun.__name__, fun.__module__)
444 base = base or self.Task
446 if name not in self._tasks:
447 run = fun if bind else staticmethod(fun)
448 task = type(fun.__name__, (base,), dict({
449 'app': self,
450 'name': name,
451 'run': run,
452 '_decorated': True,
453 '__doc__': fun.__doc__,
454 '__module__': fun.__module__,
455 '__header__': staticmethod(head_from_fun(fun, bound=bind)),
456 '__wrapped__': run}, **options))()
457 # for some reason __qualname__ cannot be set in type()
458 # so we have to set it here.
459 try:
460 task.__qualname__ = fun.__qualname__
461 except AttributeError:
462 pass
463 self._tasks[task.name] = task
464 task.bind(self) # connects task to this app
466 autoretry_for = tuple(
467 options.get('autoretry_for',
468 getattr(task, 'autoretry_for', ()))
469 )
470 retry_kwargs = options.get(
471 'retry_kwargs', getattr(task, 'retry_kwargs', {})
472 )
473 retry_backoff = int(
474 options.get('retry_backoff',
475 getattr(task, 'retry_backoff', False))
476 )
477 retry_backoff_max = int(
478 options.get('retry_backoff_max',
479 getattr(task, 'retry_backoff_max', 600))
480 )
481 retry_jitter = options.get(
482 'retry_jitter', getattr(task, 'retry_jitter', True)
483 )
485 if autoretry_for and not hasattr(task, '_orig_run'):
487 @wraps(task.run)
488 def run(*args, **kwargs):
489 try:
490 return task._orig_run(*args, **kwargs)
491 except Ignore:
492 # If Ignore signal occures task shouldn't be retried,
493 # even if it suits autoretry_for list
494 raise
495 except Retry:
496 raise
497 except autoretry_for as exc:
498 if retry_backoff:
499 retry_kwargs['countdown'] = \
500 get_exponential_backoff_interval(
501 factor=retry_backoff,
502 retries=task.request.retries,
503 maximum=retry_backoff_max,
504 full_jitter=retry_jitter)
505 raise task.retry(exc=exc, **retry_kwargs)
507 task._orig_run, task.run = task.run, run
508 else:
509 task = self._tasks[name]
510 return task
512 def register_task(self, task):
513 """Utility for registering a task-based class.
515 Note:
516 This is here for compatibility with old Celery 1.0
517 style task classes, you should not need to use this for
518 new projects.
519 """
520 if not task.name:
521 task_cls = type(task)
522 task.name = self.gen_task_name(
523 task_cls.__name__, task_cls.__module__)
524 self.tasks[task.name] = task
525 task._app = self
526 task.bind(self)
527 return task
529 def gen_task_name(self, name, module):
530 return gen_task_name(self, name, module)
532 def finalize(self, auto=False):
533 """Finalize the app.
535 This loads built-in tasks, evaluates pending task decorators,
536 reads configuration, etc.
537 """
538 with self._finalize_mutex:
539 if not self.finalized:
540 if auto and not self.autofinalize:
541 raise RuntimeError('Contract breach: app not finalized')
542 self.finalized = True
543 _announce_app_finalized(self)
545 pending = self._pending
546 while pending:
547 maybe_evaluate(pending.popleft())
549 for task in values(self._tasks):
550 task.bind(self)
552 self.on_after_finalize.send(sender=self)
554 def add_defaults(self, fun):
555 """Add default configuration from dict ``d``.
557 If the argument is a callable function then it will be regarded
558 as a promise, and it won't be loaded until the configuration is
559 actually needed.
561 This method can be compared to:
563 .. code-block:: pycon
565 >>> celery.conf.update(d)
567 with a difference that 1) no copy will be made and 2) the dict will
568 not be transferred when the worker spawns child processes, so
569 it's important that the same configuration happens at import time
570 when pickle restores the object on the other side.
571 """
572 if not callable(fun):
573 d, fun = fun, lambda: d
574 if self.configured:
575 return self._conf.add_defaults(fun())
576 self._pending_defaults.append(fun)
578 def config_from_object(self, obj,
579 silent=False, force=False, namespace=None):
580 """Read configuration from object.
582 Object is either an actual object or the name of a module to import.
584 Example:
585 >>> celery.config_from_object('myapp.celeryconfig')
587 >>> from myapp import celeryconfig
588 >>> celery.config_from_object(celeryconfig)
590 Arguments:
591 silent (bool): If true then import errors will be ignored.
592 force (bool): Force reading configuration immediately.
593 By default the configuration will be read only when required.
594 """
595 self._config_source = obj
596 self.namespace = namespace or self.namespace
597 if force or self.configured:
598 self._conf = None
599 if self.loader.config_from_object(obj, silent=silent):
600 return self.conf
602 def config_from_envvar(self, variable_name, silent=False, force=False):
603 """Read configuration from environment variable.
605 The value of the environment variable must be the name
606 of a module to import.
608 Example:
609 >>> os.environ['CELERY_CONFIG_MODULE'] = 'myapp.celeryconfig'
610 >>> celery.config_from_envvar('CELERY_CONFIG_MODULE')
611 """
612 module_name = os.environ.get(variable_name)
613 if not module_name:
614 if silent:
615 return False
616 raise ImproperlyConfigured(
617 ERR_ENVVAR_NOT_SET.strip().format(variable_name))
618 return self.config_from_object(module_name, silent=silent, force=force)
620 def config_from_cmdline(self, argv, namespace='celery'):
621 self._conf.update(
622 self.loader.cmdline_config_parser(argv, namespace)
623 )
625 def setup_security(self, allowed_serializers=None, key=None, cert=None,
626 store=None, digest=DEFAULT_SECURITY_DIGEST,
627 serializer='json'):
628 """Setup the message-signing serializer.
630 This will affect all application instances (a global operation).
632 Disables untrusted serializers and if configured to use the ``auth``
633 serializer will register the ``auth`` serializer with the provided
634 settings into the Kombu serializer registry.
636 Arguments:
637 allowed_serializers (Set[str]): List of serializer names, or
638 content_types that should be exempt from being disabled.
639 key (str): Name of private key file to use.
640 Defaults to the :setting:`security_key` setting.
641 cert (str): Name of certificate file to use.
642 Defaults to the :setting:`security_certificate` setting.
643 store (str): Directory containing certificates.
644 Defaults to the :setting:`security_cert_store` setting.
645 digest (str): Digest algorithm used when signing messages.
646 Default is ``sha256``.
647 serializer (str): Serializer used to encode messages after
648 they've been signed. See :setting:`task_serializer` for
649 the serializers supported. Default is ``json``.
650 """
651 from celery.security import setup_security
652 return setup_security(allowed_serializers, key, cert,
653 store, digest, serializer, app=self)
655 def autodiscover_tasks(self, packages=None,
656 related_name='tasks', force=False):
657 """Auto-discover task modules.
659 Searches a list of packages for a "tasks.py" module (or use
660 related_name argument).
662 If the name is empty, this will be delegated to fix-ups (e.g., Django).
664 For example if you have a directory layout like this:
666 .. code-block:: text
668 foo/__init__.py
669 tasks.py
670 models.py
672 bar/__init__.py
673 tasks.py
674 models.py
676 baz/__init__.py
677 models.py
679 Then calling ``app.autodiscover_tasks(['foo', 'bar', 'baz'])`` will
680 result in the modules ``foo.tasks`` and ``bar.tasks`` being imported.
682 Arguments:
683 packages (List[str]): List of packages to search.
684 This argument may also be a callable, in which case the
685 value returned is used (for lazy evaluation).
686 related_name (str): The name of the module to find. Defaults
687 to "tasks": meaning "look for 'module.tasks' for every
688 module in ``packages``.". If ``None`` will only try to import
689 the package, i.e. "look for 'module'".
690 force (bool): By default this call is lazy so that the actual
691 auto-discovery won't happen until an application imports
692 the default modules. Forcing will cause the auto-discovery
693 to happen immediately.
694 """
695 if force:
696 return self._autodiscover_tasks(packages, related_name)
697 signals.import_modules.connect(starpromise(
698 self._autodiscover_tasks, packages, related_name,
699 ), weak=False, sender=self)
701 def _autodiscover_tasks(self, packages, related_name, **kwargs):
702 if packages:
703 return self._autodiscover_tasks_from_names(packages, related_name)
704 return self._autodiscover_tasks_from_fixups(related_name)
706 def _autodiscover_tasks_from_names(self, packages, related_name):
707 # packages argument can be lazy
708 return self.loader.autodiscover_tasks(
709 packages() if callable(packages) else packages, related_name,
710 )
712 def _autodiscover_tasks_from_fixups(self, related_name):
713 return self._autodiscover_tasks_from_names([
714 pkg for fixup in self._fixups
715 for pkg in fixup.autodiscover_tasks()
716 if hasattr(fixup, 'autodiscover_tasks')
717 ], related_name=related_name)
719 def send_task(self, name, args=None, kwargs=None, countdown=None,
720 eta=None, task_id=None, producer=None, connection=None,
721 router=None, result_cls=None, expires=None,
722 publisher=None, link=None, link_error=None,
723 add_to_parent=True, group_id=None, retries=0, chord=None,
724 reply_to=None, time_limit=None, soft_time_limit=None,
725 root_id=None, parent_id=None, route_name=None,
726 shadow=None, chain=None, task_type=None, **options):
727 """Send task by name.
729 Supports the same arguments as :meth:`@-Task.apply_async`.
731 Arguments:
732 name (str): Name of task to call (e.g., `"tasks.add"`).
733 result_cls (AsyncResult): Specify custom result class.
734 """
735 parent = have_parent = None
736 amqp = self.amqp
737 task_id = task_id or uuid()
738 producer = producer or publisher # XXX compat
739 router = router or amqp.router
740 conf = self.conf
741 if conf.task_always_eager: # pragma: no cover
742 warnings.warn(AlwaysEagerIgnored(
743 'task_always_eager has no effect on send_task',
744 ), stacklevel=2)
746 ignored_result = options.pop('ignore_result', False)
747 options = router.route(
748 options, route_name or name, args, kwargs, task_type)
750 if not root_id or not parent_id:
751 parent = self.current_worker_task
752 if parent:
753 if not root_id:
754 root_id = parent.request.root_id or parent.request.id
755 if not parent_id:
756 parent_id = parent.request.id
758 if conf.task_inherit_parent_priority:
759 options.setdefault('priority',
760 parent.request.delivery_info.get('priority'))
762 message = amqp.create_task_message(
763 task_id, name, args, kwargs, countdown, eta, group_id,
764 expires, retries, chord,
765 maybe_list(link), maybe_list(link_error),
766 reply_to or self.oid, time_limit, soft_time_limit,
767 self.conf.task_send_sent_event,
768 root_id, parent_id, shadow, chain,
769 argsrepr=options.get('argsrepr'),
770 kwargsrepr=options.get('kwargsrepr'),
771 )
773 if connection:
774 producer = amqp.Producer(connection, auto_declare=False)
776 with self.producer_or_acquire(producer) as P:
777 with P.connection._reraise_as_library_errors():
778 if not ignored_result:
779 self.backend.on_task_call(P, task_id)
780 amqp.send_task_message(P, name, message, **options)
781 result = (result_cls or self.AsyncResult)(task_id)
782 # We avoid using the constructor since a custom result class
783 # can be used, in which case the constructor may still use
784 # the old signature.
785 result.ignored = ignored_result
787 if add_to_parent:
788 if not have_parent:
789 parent, have_parent = self.current_worker_task, True
790 if parent:
791 parent.add_trail(result)
792 return result
794 def connection_for_read(self, url=None, **kwargs):
795 """Establish connection used for consuming.
797 See Also:
798 :meth:`connection` for supported arguments.
799 """
800 return self._connection(url or self.conf.broker_read_url, **kwargs)
802 def connection_for_write(self, url=None, **kwargs):
803 """Establish connection used for producing.
805 See Also:
806 :meth:`connection` for supported arguments.
807 """
808 return self._connection(url or self.conf.broker_write_url, **kwargs)
810 def connection(self, hostname=None, userid=None, password=None,
811 virtual_host=None, port=None, ssl=None,
812 connect_timeout=None, transport=None,
813 transport_options=None, heartbeat=None,
814 login_method=None, failover_strategy=None, **kwargs):
815 """Establish a connection to the message broker.
817 Please use :meth:`connection_for_read` and
818 :meth:`connection_for_write` instead, to convey the intent
819 of use for this connection.
821 Arguments:
822 url: Either the URL or the hostname of the broker to use.
823 hostname (str): URL, Hostname/IP-address of the broker.
824 If a URL is used, then the other argument below will
825 be taken from the URL instead.
826 userid (str): Username to authenticate as.
827 password (str): Password to authenticate with
828 virtual_host (str): Virtual host to use (domain).
829 port (int): Port to connect to.
830 ssl (bool, Dict): Defaults to the :setting:`broker_use_ssl`
831 setting.
832 transport (str): defaults to the :setting:`broker_transport`
833 setting.
834 transport_options (Dict): Dictionary of transport specific options.
835 heartbeat (int): AMQP Heartbeat in seconds (``pyamqp`` only).
836 login_method (str): Custom login method to use (AMQP only).
837 failover_strategy (str, Callable): Custom failover strategy.
838 **kwargs: Additional arguments to :class:`kombu.Connection`.
840 Returns:
841 kombu.Connection: the lazy connection instance.
842 """
843 return self.connection_for_write(
844 hostname or self.conf.broker_write_url,
845 userid=userid, password=password,
846 virtual_host=virtual_host, port=port, ssl=ssl,
847 connect_timeout=connect_timeout, transport=transport,
848 transport_options=transport_options, heartbeat=heartbeat,
849 login_method=login_method, failover_strategy=failover_strategy,
850 **kwargs
851 )
853 def _connection(self, url, userid=None, password=None,
854 virtual_host=None, port=None, ssl=None,
855 connect_timeout=None, transport=None,
856 transport_options=None, heartbeat=None,
857 login_method=None, failover_strategy=None, **kwargs):
858 conf = self.conf
859 return self.amqp.Connection(
860 url,
861 userid or conf.broker_user,
862 password or conf.broker_password,
863 virtual_host or conf.broker_vhost,
864 port or conf.broker_port,
865 transport=transport or conf.broker_transport,
866 ssl=self.either('broker_use_ssl', ssl),
867 heartbeat=heartbeat,
868 login_method=login_method or conf.broker_login_method,
869 failover_strategy=(
870 failover_strategy or conf.broker_failover_strategy
871 ),
872 transport_options=dict(
873 conf.broker_transport_options, **transport_options or {}
874 ),
875 connect_timeout=self.either(
876 'broker_connection_timeout', connect_timeout
877 ),
878 )
879 broker_connection = connection
881 def _acquire_connection(self, pool=True):
882 """Helper for :meth:`connection_or_acquire`."""
883 if pool:
884 return self.pool.acquire(block=True)
885 return self.connection_for_write()
887 def connection_or_acquire(self, connection=None, pool=True, *_, **__):
888 """Context used to acquire a connection from the pool.
890 For use within a :keyword:`with` statement to get a connection
891 from the pool if one is not already provided.
893 Arguments:
894 connection (kombu.Connection): If not provided, a connection
895 will be acquired from the connection pool.
896 """
897 return FallbackContext(connection, self._acquire_connection, pool=pool)
898 default_connection = connection_or_acquire # XXX compat
900 def producer_or_acquire(self, producer=None):
901 """Context used to acquire a producer from the pool.
903 For use within a :keyword:`with` statement to get a producer
904 from the pool if one is not already provided
906 Arguments:
907 producer (kombu.Producer): If not provided, a producer
908 will be acquired from the producer pool.
909 """
910 return FallbackContext(
911 producer, self.producer_pool.acquire, block=True,
912 )
913 default_producer = producer_or_acquire # XXX compat
915 def prepare_config(self, c):
916 """Prepare configuration before it is merged with the defaults."""
917 return find_deprecated_settings(c)
919 def now(self):
920 """Return the current time and date as a datetime."""
921 now_in_utc = to_utc(datetime.utcnow())
922 return now_in_utc.astimezone(self.timezone)
924 def select_queues(self, queues=None):
925 """Select subset of queues.
927 Arguments:
928 queues (Sequence[str]): a list of queue names to keep.
929 """
930 return self.amqp.queues.select(queues)
932 def either(self, default_key, *defaults):
933 """Get key from configuration or use default values.
935 Fallback to the value of a configuration key if none of the
936 `*values` are true.
937 """
938 return first(None, [
939 first(None, defaults), starpromise(self.conf.get, default_key),
940 ])
942 def bugreport(self):
943 """Return information useful in bug reports."""
944 return bugreport(self)
946 def _get_backend(self):
947 backend, url = backends.by_url(
948 self.backend_cls or self.conf.result_backend,
949 self.loader)
950 return backend(app=self, url=url)
952 def _finalize_pending_conf(self):
953 """Get config value by key and finalize loading the configuration.
955 Note:
956 This is used by PendingConfiguration:
957 as soon as you access a key the configuration is read.
958 """
959 conf = self._conf = self._load_config()
960 return conf
962 def _load_config(self):
963 if isinstance(self.on_configure, Signal):
964 self.on_configure.send(sender=self)
965 else:
966 # used to be a method pre 4.0
967 self.on_configure()
968 if self._config_source:
969 self.loader.config_from_object(self._config_source)
970 self.configured = True
971 settings = detect_settings(
972 self.prepare_config(self.loader.conf), self._preconf,
973 ignore_keys=self._preconf_set_by_auto, prefix=self.namespace,
974 )
975 if self._conf is not None:
976 # replace in place, as someone may have referenced app.conf,
977 # done some changes, accessed a key, and then try to make more
978 # changes to the reference and not the finalized value.
979 self._conf.swap_with(settings)
980 else:
981 self._conf = settings
983 # load lazy config dict initializers.
984 pending_def = self._pending_defaults
985 while pending_def:
986 self._conf.add_defaults(maybe_evaluate(pending_def.popleft()()))
988 # load lazy periodic tasks
989 pending_beat = self._pending_periodic_tasks
990 while pending_beat:
991 self._add_periodic_task(*pending_beat.popleft())
993 self.on_after_configure.send(sender=self, source=self._conf)
994 return self._conf
996 def _after_fork(self):
997 self._pool = None
998 try:
999 self.__dict__['amqp']._producer_pool = None
1000 except (AttributeError, KeyError):
1001 pass
1002 self.on_after_fork.send(sender=self)
1004 def signature(self, *args, **kwargs):
1005 """Return a new :class:`~celery.Signature` bound to this app."""
1006 kwargs['app'] = self
1007 return self._canvas.signature(*args, **kwargs)
1009 def add_periodic_task(self, schedule, sig,
1010 args=(), kwargs=(), name=None, **opts):
1011 key, entry = self._sig_to_periodic_task_entry(
1012 schedule, sig, args, kwargs, name, **opts)
1013 if self.configured:
1014 self._add_periodic_task(key, entry)
1015 else:
1016 self._pending_periodic_tasks.append((key, entry))
1017 return key
1019 def _sig_to_periodic_task_entry(self, schedule, sig,
1020 args=(), kwargs=None, name=None, **opts):
1021 kwargs = {} if not kwargs else kwargs
1022 sig = (sig.clone(args, kwargs)
1023 if isinstance(sig, abstract.CallableSignature)
1024 else self.signature(sig.name, args, kwargs))
1025 return name or repr(sig), {
1026 'schedule': schedule,
1027 'task': sig.name,
1028 'args': sig.args,
1029 'kwargs': sig.kwargs,
1030 'options': dict(sig.options, **opts),
1031 }
1033 def _add_periodic_task(self, key, entry):
1034 self._conf.beat_schedule[key] = entry
1036 def create_task_cls(self):
1037 """Create a base task class bound to this app."""
1038 return self.subclass_with_self(
1039 self.task_cls, name='Task', attribute='_app',
1040 keep_reduce=True, abstract=True,
1041 )
1043 def subclass_with_self(self, Class, name=None, attribute='app',
1044 reverse=None, keep_reduce=False, **kw):
1045 """Subclass an app-compatible class.
1047 App-compatible means that the class has a class attribute that
1048 provides the default app it should use, for example:
1049 ``class Foo: app = None``.
1051 Arguments:
1052 Class (type): The app-compatible class to subclass.
1053 name (str): Custom name for the target class.
1054 attribute (str): Name of the attribute holding the app,
1055 Default is 'app'.
1056 reverse (str): Reverse path to this object used for pickling
1057 purposes. For example, to get ``app.AsyncResult``,
1058 use ``"AsyncResult"``.
1059 keep_reduce (bool): If enabled a custom ``__reduce__``
1060 implementation won't be provided.
1061 """
1062 Class = symbol_by_name(Class)
1063 reverse = reverse if reverse else Class.__name__
1065 def __reduce__(self):
1066 return _unpickle_appattr, (reverse, self.__reduce_args__())
1068 attrs = dict(
1069 {attribute: self},
1070 __module__=Class.__module__,
1071 __doc__=Class.__doc__,
1072 **kw)
1073 if not keep_reduce:
1074 attrs['__reduce__'] = __reduce__
1076 return type(bytes_if_py2(name or Class.__name__), (Class,), attrs)
1078 def _rgetattr(self, path):
1079 return attrgetter(path)(self)
1081 def __enter__(self):
1082 return self
1084 def __exit__(self, *exc_info):
1085 self.close()
1087 def __repr__(self):
1088 return '<{0} {1}>'.format(type(self).__name__, appstr(self))
1090 def __reduce__(self):
1091 if self._using_v1_reduce:
1092 return self.__reduce_v1__()
1093 return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
1095 def __reduce_v1__(self):
1096 # Reduce only pickles the configuration changes,
1097 # so the default configuration doesn't have to be passed
1098 # between processes.
1099 return (
1100 _unpickle_app,
1101 (self.__class__, self.Pickler) + self.__reduce_args__(),
1102 )
1104 def __reduce_keys__(self):
1105 """Keyword arguments used to reconstruct the object when unpickling."""
1106 return {
1107 'main': self.main,
1108 'changes':
1109 self._conf.changes if self.configured else self._preconf,
1110 'loader': self.loader_cls,
1111 'backend': self.backend_cls,
1112 'amqp': self.amqp_cls,
1113 'events': self.events_cls,
1114 'log': self.log_cls,
1115 'control': self.control_cls,
1116 'fixups': self.fixups,
1117 'config_source': self._config_source,
1118 'task_cls': self.task_cls,
1119 'namespace': self.namespace,
1120 }
1122 def __reduce_args__(self):
1123 """Deprecated method, please use :meth:`__reduce_keys__` instead."""
1124 return (self.main, self._conf.changes if self.configured else {},
1125 self.loader_cls, self.backend_cls, self.amqp_cls,
1126 self.events_cls, self.log_cls, self.control_cls,
1127 False, self._config_source)
1129 @cached_property
1130 def Worker(self):
1131 """Worker application.
1133 See Also:
1134 :class:`~@Worker`.
1135 """
1136 return self.subclass_with_self('celery.apps.worker:Worker')
1138 @cached_property
1139 def WorkController(self, **kwargs):
1140 """Embeddable worker.
1142 See Also:
1143 :class:`~@WorkController`.
1144 """
1145 return self.subclass_with_self('celery.worker:WorkController')
1147 @cached_property
1148 def Beat(self, **kwargs):
1149 """:program:`celery beat` scheduler application.
1151 See Also:
1152 :class:`~@Beat`.
1153 """
1154 return self.subclass_with_self('celery.apps.beat:Beat')
1156 @cached_property
1157 def Task(self):
1158 """Base task class for this app."""
1159 return self.create_task_cls()
1161 @cached_property
1162 def annotations(self):
1163 return prepare_annotations(self.conf.task_annotations)
1165 @cached_property
1166 def AsyncResult(self):
1167 """Create new result instance.
1169 See Also:
1170 :class:`celery.result.AsyncResult`.
1171 """
1172 return self.subclass_with_self('celery.result:AsyncResult')
1174 @cached_property
1175 def ResultSet(self):
1176 return self.subclass_with_self('celery.result:ResultSet')
1178 @cached_property
1179 def GroupResult(self):
1180 """Create new group result instance.
1182 See Also:
1183 :class:`celery.result.GroupResult`.
1184 """
1185 return self.subclass_with_self('celery.result:GroupResult')
1187 @property
1188 def pool(self):
1189 """Broker connection pool: :class:`~@pool`.
1191 Note:
1192 This attribute is not related to the workers concurrency pool.
1193 """
1194 if self._pool is None:
1195 self._ensure_after_fork()
1196 limit = self.conf.broker_pool_limit
1197 pools.set_limit(limit)
1198 self._pool = pools.connections[self.connection_for_write()]
1199 return self._pool
1201 @property
1202 def current_task(self):
1203 """Instance of task being executed, or :const:`None`."""
1204 return _task_stack.top
1206 @property
1207 def current_worker_task(self):
1208 """The task currently being executed by a worker or :const:`None`.
1210 Differs from :data:`current_task` in that it's not affected
1211 by tasks calling other tasks directly, or eagerly.
1212 """
1213 return get_current_worker_task()
1215 @cached_property
1216 def oid(self):
1217 """Universally unique identifier for this app."""
1218 # since 4.0: thread.get_ident() is not included when
1219 # generating the process id. This is due to how the RPC
1220 # backend now dedicates a single thread to receive results,
1221 # which would not work if each thread has a separate id.
1222 return oid_from(self, threads=False)
1224 @cached_property
1225 def amqp(self):
1226 """AMQP related functionality: :class:`~@amqp`."""
1227 return instantiate(self.amqp_cls, app=self)
1229 @cached_property
1230 def backend(self):
1231 """Current backend instance."""
1232 return self._get_backend()
1234 @property
1235 def conf(self):
1236 """Current configuration."""
1237 if self._conf is None:
1238 self._conf = self._load_config()
1239 return self._conf
1241 @conf.setter
1242 def conf(self, d): # noqa
1243 self._conf = d
1245 @cached_property
1246 def control(self):
1247 """Remote control: :class:`~@control`."""
1248 return instantiate(self.control_cls, app=self)
1250 @cached_property
1251 def events(self):
1252 """Consuming and sending events: :class:`~@events`."""
1253 return instantiate(self.events_cls, app=self)
1255 @cached_property
1256 def loader(self):
1257 """Current loader instance."""
1258 return get_loader_cls(self.loader_cls)(app=self)
1260 @cached_property
1261 def log(self):
1262 """Logging: :class:`~@log`."""
1263 return instantiate(self.log_cls, app=self)
1265 @cached_property
1266 def _canvas(self):
1267 from celery import canvas
1268 return canvas
1270 @cached_property
1271 def tasks(self):
1272 """Task registry.
1274 Warning:
1275 Accessing this attribute will also auto-finalize the app.
1276 """
1277 self.finalize(auto=True)
1278 return self._tasks
1280 @property
1281 def producer_pool(self):
1282 return self.amqp.producer_pool
1284 def uses_utc_timezone(self):
1285 """Check if the application uses the UTC timezone."""
1286 return self.timezone == timezone.utc
1288 @cached_property
1289 def timezone(self):
1290 """Current timezone for this app.
1292 This is a cached property taking the time zone from the
1293 :setting:`timezone` setting.
1294 """
1295 conf = self.conf
1296 if not conf.timezone:
1297 if conf.enable_utc:
1298 return timezone.utc
1299 else:
1300 return timezone.local
1301 return timezone.get_timezone(conf.timezone)
1304App = Celery # noqa: E305 XXX compat