Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2"""Loader base class.""" 

3from __future__ import absolute_import, unicode_literals 

4 

5import importlib 

6import os 

7import re 

8import sys 

9from datetime import datetime 

10 

11from kombu.utils import json 

12from kombu.utils.objects import cached_property 

13 

14from celery import signals 

15from celery.five import reraise, string_t 

16from celery.utils.collections import DictAttribute, force_mapping 

17from celery.utils.functional import maybe_list 

18from celery.utils.imports import (NotAPackage, find_module, import_from_cwd, 

19 symbol_by_name) 

20 

21__all__ = ('BaseLoader',) 

22 

23_RACE_PROTECTION = False 

24 

25CONFIG_INVALID_NAME = """\ 

26Error: Module '{module}' doesn't exist, or it's not a valid \ 

27Python module name. 

28""" 

29 

30CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """\ 

31Did you mean '{suggest}'? 

32""" 

33 

34unconfigured = object() 

35 

36 

37class BaseLoader(object): 

38 """Base class for loaders. 

39 

40 Loaders handles, 

41 

42 * Reading celery client/worker configurations. 

43 

44 * What happens when a task starts? 

45 See :meth:`on_task_init`. 

46 

47 * What happens when the worker starts? 

48 See :meth:`on_worker_init`. 

49 

50 * What happens when the worker shuts down? 

51 See :meth:`on_worker_shutdown`. 

52 

53 * What modules are imported to find tasks? 

54 """ 

55 

56 builtin_modules = frozenset() 

57 configured = False 

58 override_backends = {} 

59 worker_initialized = False 

60 

61 _conf = unconfigured 

62 

63 def __init__(self, app, **kwargs): 

64 self.app = app 

65 self.task_modules = set() 

66 

67 def now(self, utc=True): 

68 if utc: 

69 return datetime.utcnow() 

70 return datetime.now() 

71 

72 def on_task_init(self, task_id, task): 

73 """Called before a task is executed.""" 

74 

75 def on_process_cleanup(self): 

76 """Called after a task is executed.""" 

77 

78 def on_worker_init(self): 

79 """Called when the worker (:program:`celery worker`) starts.""" 

80 

81 def on_worker_shutdown(self): 

82 """Called when the worker (:program:`celery worker`) shuts down.""" 

83 

84 def on_worker_process_init(self): 

85 """Called when a child process starts.""" 

86 

87 def import_task_module(self, module): 

88 self.task_modules.add(module) 

89 return self.import_from_cwd(module) 

90 

91 def import_module(self, module, package=None): 

92 return importlib.import_module(module, package=package) 

93 

94 def import_from_cwd(self, module, imp=None, package=None): 

95 return import_from_cwd( 

96 module, 

97 self.import_module if imp is None else imp, 

98 package=package, 

99 ) 

100 

101 def import_default_modules(self): 

102 responses = signals.import_modules.send(sender=self.app) 

103 # Prior to this point loggers are not yet set up properly, need to 

104 # check responses manually and reraised exceptions if any, otherwise 

105 # they'll be silenced, making it incredibly difficult to debug. 

106 for _, response in responses: 

107 if isinstance(response, Exception): 

108 raise response 

109 return [self.import_task_module(m) for m in self.default_modules] 

110 

111 def init_worker(self): 

112 if not self.worker_initialized: 

113 self.worker_initialized = True 

114 self.import_default_modules() 

115 self.on_worker_init() 

116 

117 def shutdown_worker(self): 

118 self.on_worker_shutdown() 

119 

120 def init_worker_process(self): 

121 self.on_worker_process_init() 

122 

123 def config_from_object(self, obj, silent=False): 

124 if isinstance(obj, string_t): 

125 try: 

126 obj = self._smart_import(obj, imp=self.import_from_cwd) 

127 except (ImportError, AttributeError): 

128 if silent: 

129 return False 

130 raise 

131 self._conf = force_mapping(obj) 

132 return True 

133 

134 def _smart_import(self, path, imp=None): 

135 imp = self.import_module if imp is None else imp 

136 if ':' in path: 

137 # Path includes attribute so can just jump 

138 # here (e.g., ``os.path:abspath``). 

139 return symbol_by_name(path, imp=imp) 

140 

141 # Not sure if path is just a module name or if it includes an 

142 # attribute name (e.g., ``os.path``, vs, ``os.path.abspath``). 

143 try: 

144 return imp(path) 

145 except ImportError: 

146 # Not a module name, so try module + attribute. 

147 return symbol_by_name(path, imp=imp) 

148 

149 def _import_config_module(self, name): 

150 try: 

151 self.find_module(name) 

152 except NotAPackage: 

153 if name.endswith('.py'): 

154 reraise(NotAPackage, NotAPackage(CONFIG_WITH_SUFFIX.format( 

155 module=name, suggest=name[:-3])), sys.exc_info()[2]) 

156 reraise(NotAPackage, NotAPackage(CONFIG_INVALID_NAME.format( 

157 module=name)), sys.exc_info()[2]) 

158 else: 

159 return self.import_from_cwd(name) 

160 

161 def find_module(self, module): 

162 return find_module(module) 

163 

164 def cmdline_config_parser(self, args, namespace='celery', 

165 re_type=re.compile(r'\((\w+)\)'), 

166 extra_types=None, 

167 override_types=None): 

168 extra_types = extra_types if extra_types else {'json': json.loads} 

169 override_types = override_types if override_types else { 

170 'tuple': 'json', 

171 'list': 'json', 

172 'dict': 'json' 

173 } 

174 from celery.app.defaults import Option, NAMESPACES 

175 namespace = namespace and namespace.lower() 

176 typemap = dict(Option.typemap, **extra_types) 

177 

178 def getarg(arg): 

179 """Parse single configuration from command-line.""" 

180 # ## find key/value 

181 # ns.key=value|ns_key=value (case insensitive) 

182 key, value = arg.split('=', 1) 

183 key = key.lower().replace('.', '_') 

184 

185 # ## find name-space. 

186 # .key=value|_key=value expands to default name-space. 

187 if key[0] == '_': 

188 ns, key = namespace, key[1:] 

189 else: 

190 # find name-space part of key 

191 ns, key = key.split('_', 1) 

192 

193 ns_key = (ns and ns + '_' or '') + key 

194 

195 # (type)value makes cast to custom type. 

196 cast = re_type.match(value) 

197 if cast: 

198 type_ = cast.groups()[0] 

199 type_ = override_types.get(type_, type_) 

200 value = value[len(cast.group()):] 

201 value = typemap[type_](value) 

202 else: 

203 try: 

204 value = NAMESPACES[ns.lower()][key].to_python(value) 

205 except ValueError as exc: 

206 # display key name in error message. 

207 raise ValueError('{0!r}: {1}'.format(ns_key, exc)) 

208 return ns_key, value 

209 return dict(getarg(arg) for arg in args) 

210 

211 def read_configuration(self, env='CELERY_CONFIG_MODULE'): 

212 try: 

213 custom_config = os.environ[env] 

214 except KeyError: 

215 pass 

216 else: 

217 if custom_config: 

218 usercfg = self._import_config_module(custom_config) 

219 return DictAttribute(usercfg) 

220 

221 def autodiscover_tasks(self, packages, related_name='tasks'): 

222 self.task_modules.update( 

223 mod.__name__ for mod in autodiscover_tasks(packages or (), 

224 related_name) if mod) 

225 

226 @cached_property 

227 def default_modules(self): 

228 return ( 

229 tuple(self.builtin_modules) + 

230 tuple(maybe_list(self.app.conf.imports)) + 

231 tuple(maybe_list(self.app.conf.include)) 

232 ) 

233 

234 @property 

235 def conf(self): 

236 """Loader configuration.""" 

237 if self._conf is unconfigured: 

238 self._conf = self.read_configuration() 

239 return self._conf 

240 

241 

242def autodiscover_tasks(packages, related_name='tasks'): 

243 global _RACE_PROTECTION 

244 

245 if _RACE_PROTECTION: 

246 return () 

247 _RACE_PROTECTION = True 

248 try: 

249 return [find_related_module(pkg, related_name) for pkg in packages] 

250 finally: 

251 _RACE_PROTECTION = False 

252 

253 

254def find_related_module(package, related_name): 

255 """Find module in package.""" 

256 # Django 1.7 allows for speciying a class name in INSTALLED_APPS. 

257 # (Issue #2248). 

258 try: 

259 module = importlib.import_module(package) 

260 if not related_name and module: 

261 return module 

262 except ImportError: 

263 package, _, _ = package.rpartition('.') 

264 if not package: 

265 raise 

266 

267 module_name = '{0}.{1}'.format(package, related_name) 

268 

269 try: 

270 return importlib.import_module(module_name) 

271 except ImportError as e: 

272 import_exc_name = getattr(e, 'name', module_name) 

273 if import_exc_name is not None and import_exc_name != module_name: 

274 raise e 

275 return