Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/celery/utils/abstract.py : 21%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2"""Abstract classes."""
3from __future__ import absolute_import, unicode_literals
5from abc import ABCMeta, abstractmethod, abstractproperty
7from celery.five import with_metaclass
9try:
10 from collections.abc import Callable
11except ImportError:
12 # TODO: Remove this when we drop Python 2.7 support
13 from collections import Callable
16__all__ = ('CallableTask', 'CallableSignature')
19def _hasattr(C, attr):
20 return any(attr in B.__dict__ for B in C.__mro__)
23@with_metaclass(ABCMeta)
24class _AbstractClass(object):
25 __required_attributes__ = frozenset()
27 @classmethod
28 def _subclasshook_using(cls, parent, C):
29 return (
30 cls is parent and
31 all(_hasattr(C, attr) for attr in cls.__required_attributes__)
32 ) or NotImplemented
34 @classmethod
35 def register(cls, other):
36 # we override `register` to return other for use as a decorator.
37 type(cls).register(cls, other)
38 return other
41class CallableTask(_AbstractClass, Callable): # pragma: no cover
42 """Task interface."""
44 __required_attributes__ = frozenset({
45 'delay', 'apply_async', 'apply',
46 })
48 @abstractmethod
49 def delay(self, *args, **kwargs):
50 pass
52 @abstractmethod
53 def apply_async(self, *args, **kwargs):
54 pass
56 @abstractmethod
57 def apply(self, *args, **kwargs):
58 pass
60 @classmethod
61 def __subclasshook__(cls, C):
62 return cls._subclasshook_using(CallableTask, C)
65class CallableSignature(CallableTask): # pragma: no cover
66 """Celery Signature interface."""
68 __required_attributes__ = frozenset({
69 'clone', 'freeze', 'set', 'link', 'link_error', '__or__',
70 })
72 @abstractproperty
73 def name(self):
74 pass
76 @abstractproperty
77 def type(self):
78 pass
80 @abstractproperty
81 def app(self):
82 pass
84 @abstractproperty
85 def id(self):
86 pass
88 @abstractproperty
89 def task(self):
90 pass
92 @abstractproperty
93 def args(self):
94 pass
96 @abstractproperty
97 def kwargs(self):
98 pass
100 @abstractproperty
101 def options(self):
102 pass
104 @abstractproperty
105 def subtask_type(self):
106 pass
108 @abstractproperty
109 def chord_size(self):
110 pass
112 @abstractproperty
113 def immutable(self):
114 pass
116 @abstractmethod
117 def clone(self, args=None, kwargs=None):
118 pass
120 @abstractmethod
121 def freeze(self, id=None, group_id=None, chord=None, root_id=None):
122 pass
124 @abstractmethod
125 def set(self, immutable=None, **options):
126 pass
128 @abstractmethod
129 def link(self, callback):
130 pass
132 @abstractmethod
133 def link_error(self, errback):
134 pass
136 @abstractmethod
137 def __or__(self, other):
138 pass
140 @abstractmethod
141 def __invert__(self):
142 pass
144 @classmethod
145 def __subclasshook__(cls, C):
146 return cls._subclasshook_using(CallableSignature, C)