Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/celery/app/annotations.py : 48%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2"""Task Annotations.
4Annotations is a nice term for monkey-patching task classes
5in the configuration.
7This prepares and performs the annotations in the
8:setting:`task_annotations` setting.
9"""
10from __future__ import absolute_import, unicode_literals
12from celery.five import string_t
13from celery.utils.functional import firstmethod, mlazy
14from celery.utils.imports import instantiate
16_first_match = firstmethod('annotate')
17_first_match_any = firstmethod('annotate_any')
19__all__ = ('MapAnnotation', 'prepare', 'resolve_all')
22class MapAnnotation(dict):
23 """Annotation map: task_name => attributes."""
25 def annotate_any(self):
26 try:
27 return dict(self['*'])
28 except KeyError:
29 pass
31 def annotate(self, task):
32 try:
33 return dict(self[task.name])
34 except KeyError:
35 pass
38def prepare(annotations):
39 """Expand the :setting:`task_annotations` setting."""
40 def expand_annotation(annotation):
41 if isinstance(annotation, dict):
42 return MapAnnotation(annotation)
43 elif isinstance(annotation, string_t):
44 return mlazy(instantiate, annotation)
45 return annotation
47 if annotations is None:
48 return ()
49 elif not isinstance(annotations, (list, tuple)):
50 annotations = (annotations,)
51 return [expand_annotation(anno) for anno in annotations]
54def resolve_all(anno, task):
55 """Resolve all pending annotations."""
56 return (x for x in (_first_match(anno, task), _first_match_any(anno)) if x)