Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2"""Internal state. 

3 

4This is an internal module containing thread state 

5like the ``current_app``, and ``current_task``. 

6 

7This module shouldn't be used directly. 

8""" 

9from __future__ import absolute_import, print_function, unicode_literals 

10 

11import os 

12import sys 

13import threading 

14import weakref 

15 

16from celery.local import Proxy 

17from celery.utils.threads import LocalStack 

18 

19__all__ = ( 

20 'set_default_app', 'get_current_app', 'get_current_task', 

21 'get_current_worker_task', 'current_app', 'current_task', 

22 'connect_on_app_finalize', 

23) 

24 

25#: Global default app used when no current app. 

26default_app = None 

27 

28#: Function returning the app provided or the default app if none. 

29#: 

30#: The environment variable :envvar:`CELERY_TRACE_APP` is used to 

31#: trace app leaks. When enabled an exception is raised if there 

32#: is no active app. 

33app_or_default = None 

34 

35#: List of all app instances (weakrefs), mustn't be used directly. 

36_apps = weakref.WeakSet() 

37 

38#: Global set of functions to call whenever a new app is finalized. 

39#: Shared tasks, and built-in tasks are created by adding callbacks here. 

40_on_app_finalizers = set() 

41 

42_task_join_will_block = False 

43 

44 

45def connect_on_app_finalize(callback): 

46 """Connect callback to be called when any app is finalized.""" 

47 _on_app_finalizers.add(callback) 

48 return callback 

49 

50 

51def _announce_app_finalized(app): 

52 callbacks = set(_on_app_finalizers) 

53 for callback in callbacks: 

54 callback(app) 

55 

56 

57def _set_task_join_will_block(blocks): 

58 global _task_join_will_block 

59 _task_join_will_block = blocks 

60 

61 

62def task_join_will_block(): 

63 return _task_join_will_block 

64 

65 

66class _TLS(threading.local): 

67 #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute 

68 #: sets this, so it will always contain the last instantiated app, 

69 #: and is the default app returned by :func:`app_or_default`. 

70 current_app = None 

71 

72 

73_tls = _TLS() 

74 

75_task_stack = LocalStack() 

76 

77 

78#: Function used to push a task to the thread local stack 

79#: keeping track of the currently executing task. 

80#: You must remember to pop the task after. 

81push_current_task = _task_stack.push 

82 

83#: Function used to pop a task from the thread local stack 

84#: keeping track of the currently executing task. 

85pop_current_task = _task_stack.pop 

86 

87 

88def set_default_app(app): 

89 """Set default app.""" 

90 global default_app 

91 default_app = app 

92 

93 

94def _get_current_app(): 

95 if default_app is None: 

96 #: creates the global fallback app instance. 

97 from celery.app.base import Celery 

98 set_default_app(Celery( 

99 'default', fixups=[], set_as_current=False, 

100 loader=os.environ.get('CELERY_LOADER') or 'default', 

101 )) 

102 return _tls.current_app or default_app 

103 

104 

105def _set_current_app(app): 

106 _tls.current_app = app 

107 

108 

109if os.environ.get('C_STRICT_APP'): # pragma: no cover 

110 def get_current_app(): 

111 """Return the current app.""" 

112 raise RuntimeError('USES CURRENT APP') 

113elif os.environ.get('C_WARN_APP'): # pragma: no cover 

114 def get_current_app(): # noqa 

115 import traceback 

116 print('-- USES CURRENT_APP', file=sys.stderr) # noqa+ 

117 traceback.print_stack(file=sys.stderr) 

118 return _get_current_app() 

119else: 

120 get_current_app = _get_current_app 

121 

122 

123def get_current_task(): 

124 """Currently executing task.""" 

125 return _task_stack.top 

126 

127 

128def get_current_worker_task(): 

129 """Currently executing task, that was applied by the worker. 

130 

131 This is used to differentiate between the actual task 

132 executed by the worker and any task that was called within 

133 a task (using ``task.__call__`` or ``task.apply``) 

134 """ 

135 for task in reversed(_task_stack.stack): 

136 if not task.request.called_directly: 

137 return task 

138 

139 

140#: Proxy to current app. 

141current_app = Proxy(get_current_app) 

142 

143#: Proxy to current task. 

144current_task = Proxy(get_current_task) 

145 

146 

147def _register_app(app): 

148 _apps.add(app) 

149 

150 

151def _deregister_app(app): 

152 _apps.discard(app) 

153 

154 

155def _get_active_apps(): 

156 return _apps 

157 

158 

159def _app_or_default(app=None): 

160 if app is None: 

161 return get_current_app() 

162 return app 

163 

164 

165def _app_or_default_trace(app=None): # pragma: no cover 

166 from traceback import print_stack 

167 try: 

168 from billiard.process import current_process 

169 except ImportError: 

170 current_process = None 

171 if app is None: 

172 if getattr(_tls, 'current_app', None): 

173 print('-- RETURNING TO CURRENT APP --') # noqa+ 

174 print_stack() 

175 return _tls.current_app 

176 if not current_process or current_process()._name == 'MainProcess': 

177 raise Exception('DEFAULT APP') 

178 print('-- RETURNING TO DEFAULT APP --') # noqa+ 

179 print_stack() 

180 return default_app 

181 return app 

182 

183 

184def enable_trace(): 

185 """Enable tracing of app instances.""" 

186 global app_or_default 

187 app_or_default = _app_or_default_trace 

188 

189 

190def disable_trace(): 

191 """Disable tracing of app instances.""" 

192 global app_or_default 

193 app_or_default = _app_or_default 

194 

195 

196if os.environ.get('CELERY_TRACE_APP'): # pragma: no cover 

197 enable_trace() 

198else: 

199 disable_trace()