Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2"""Threading primitives and utilities.""" 

3from __future__ import absolute_import, print_function, unicode_literals 

4 

5import os 

6import socket 

7import sys 

8import threading 

9import traceback 

10from contextlib import contextmanager 

11 

12from celery.five import THREAD_TIMEOUT_MAX, items, python_2_unicode_compatible 

13from celery.local import Proxy 

14 

15try: 

16 from greenlet import getcurrent as get_ident 

17except ImportError: # pragma: no cover 

18 try: 

19 from _thread import get_ident # noqa 

20 except ImportError: 

21 try: 

22 from thread import get_ident # noqa 

23 except ImportError: # pragma: no cover 

24 try: 

25 from _dummy_thread import get_ident # noqa 

26 except ImportError: 

27 from dummy_thread import get_ident # noqa 

28 

29 

30__all__ = ( 

31 'bgThread', 'Local', 'LocalStack', 'LocalManager', 

32 'get_ident', 'default_socket_timeout', 

33) 

34 

35USE_FAST_LOCALS = os.environ.get('USE_FAST_LOCALS') 

36 

37 

38@contextmanager 

39def default_socket_timeout(timeout): 

40 """Context temporarily setting the default socket timeout.""" 

41 prev = socket.getdefaulttimeout() 

42 socket.setdefaulttimeout(timeout) 

43 yield 

44 socket.setdefaulttimeout(prev) 

45 

46 

47class bgThread(threading.Thread): 

48 """Background service thread.""" 

49 

50 def __init__(self, name=None, **kwargs): 

51 super(bgThread, self).__init__() 

52 self._is_shutdown = threading.Event() 

53 self._is_stopped = threading.Event() 

54 self.daemon = True 

55 self.name = name or self.__class__.__name__ 

56 

57 def body(self): 

58 raise NotImplementedError() 

59 

60 def on_crash(self, msg, *fmt, **kwargs): 

61 print(msg.format(*fmt), file=sys.stderr) 

62 traceback.print_exc(None, sys.stderr) 

63 

64 def run(self): 

65 body = self.body 

66 shutdown_set = self._is_shutdown.is_set 

67 try: 

68 while not shutdown_set(): 

69 try: 

70 body() 

71 except Exception as exc: # pylint: disable=broad-except 

72 try: 

73 self.on_crash('{0!r} crashed: {1!r}', self.name, exc) 

74 self._set_stopped() 

75 finally: 

76 sys.stderr.flush() 

77 os._exit(1) # exiting by normal means won't work 

78 finally: 

79 self._set_stopped() 

80 

81 def _set_stopped(self): 

82 try: 

83 self._is_stopped.set() 

84 except TypeError: # pragma: no cover 

85 # we lost the race at interpreter shutdown, 

86 # so gc collected built-in modules. 

87 pass 

88 

89 def stop(self): 

90 """Graceful shutdown.""" 

91 self._is_shutdown.set() 

92 self._is_stopped.wait() 

93 if self.is_alive(): 

94 self.join(THREAD_TIMEOUT_MAX) 

95 

96 

97def release_local(local): 

98 """Release the contents of the local for the current context. 

99 

100 This makes it possible to use locals without a manager. 

101 

102 With this function one can release :class:`Local` objects as well as 

103 :class:`StackLocal` objects. However it's not possible to 

104 release data held by proxies that way, one always has to retain 

105 a reference to the underlying local object in order to be able 

106 to release it. 

107 

108 Example: 

109 >>> loc = Local() 

110 >>> loc.foo = 42 

111 >>> release_local(loc) 

112 >>> hasattr(loc, 'foo') 

113 False 

114 """ 

115 local.__release_local__() 

116 

117 

118class Local(object): 

119 """Local object.""" 

120 

121 __slots__ = ('__storage__', '__ident_func__') 

122 

123 def __init__(self): 

124 object.__setattr__(self, '__storage__', {}) 

125 object.__setattr__(self, '__ident_func__', get_ident) 

126 

127 def __iter__(self): 

128 return iter(items(self.__storage__)) 

129 

130 def __call__(self, proxy): 

131 """Create a proxy for a name.""" 

132 return Proxy(self, proxy) 

133 

134 def __release_local__(self): 

135 self.__storage__.pop(self.__ident_func__(), None) 

136 

137 def __getattr__(self, name): 

138 try: 

139 return self.__storage__[self.__ident_func__()][name] 

140 except KeyError: 

141 raise AttributeError(name) 

142 

143 def __setattr__(self, name, value): 

144 ident = self.__ident_func__() 

145 storage = self.__storage__ 

146 try: 

147 storage[ident][name] = value 

148 except KeyError: 

149 storage[ident] = {name: value} 

150 

151 def __delattr__(self, name): 

152 try: 

153 del self.__storage__[self.__ident_func__()][name] 

154 except KeyError: 

155 raise AttributeError(name) 

156 

157 

158class _LocalStack(object): 

159 """Local stack. 

160 

161 This class works similar to a :class:`Local` but keeps a stack 

162 of objects instead. This is best explained with an example:: 

163 

164 >>> ls = LocalStack() 

165 >>> ls.push(42) 

166 >>> ls.top 

167 42 

168 >>> ls.push(23) 

169 >>> ls.top 

170 23 

171 >>> ls.pop() 

172 23 

173 >>> ls.top 

174 42 

175 

176 They can be force released by using a :class:`LocalManager` or with 

177 the :func:`release_local` function but the correct way is to pop the 

178 item from the stack after using. When the stack is empty it will 

179 no longer be bound to the current context (and as such released). 

180 

181 By calling the stack without arguments it will return a proxy that 

182 resolves to the topmost item on the stack. 

183 """ 

184 

185 def __init__(self): 

186 self._local = Local() 

187 

188 def __release_local__(self): 

189 self._local.__release_local__() 

190 

191 def _get__ident_func__(self): 

192 return self._local.__ident_func__ 

193 

194 def _set__ident_func__(self, value): 

195 object.__setattr__(self._local, '__ident_func__', value) 

196 __ident_func__ = property(_get__ident_func__, _set__ident_func__) 

197 del _get__ident_func__, _set__ident_func__ 

198 

199 def __call__(self): 

200 def _lookup(): 

201 rv = self.top 

202 if rv is None: 

203 raise RuntimeError('object unbound') 

204 return rv 

205 return Proxy(_lookup) 

206 

207 def push(self, obj): 

208 """Push a new item to the stack.""" 

209 rv = getattr(self._local, 'stack', None) 

210 if rv is None: 

211 # pylint: disable=assigning-non-slot 

212 # This attribute is defined now. 

213 self._local.stack = rv = [] 

214 rv.append(obj) 

215 return rv 

216 

217 def pop(self): 

218 """Remove the topmost item from the stack. 

219 

220 Note: 

221 Will return the old value or `None` if the stack was already empty. 

222 """ 

223 stack = getattr(self._local, 'stack', None) 

224 if stack is None: 

225 return None 

226 elif len(stack) == 1: 

227 release_local(self._local) 

228 return stack[-1] 

229 else: 

230 return stack.pop() 

231 

232 def __len__(self): 

233 stack = getattr(self._local, 'stack', None) 

234 return len(stack) if stack else 0 

235 

236 @property 

237 def stack(self): 

238 # get_current_worker_task uses this to find 

239 # the original task that was executed by the worker. 

240 stack = getattr(self._local, 'stack', None) 

241 if stack is not None: 

242 return stack 

243 return [] 

244 

245 @property 

246 def top(self): 

247 """The topmost item on the stack. 

248 

249 Note: 

250 If the stack is empty, :const:`None` is returned. 

251 """ 

252 try: 

253 return self._local.stack[-1] 

254 except (AttributeError, IndexError): 

255 return None 

256 

257 

258@python_2_unicode_compatible 

259class LocalManager(object): 

260 """Local objects cannot manage themselves. 

261 

262 For that you need a local manager. 

263 You can pass a local manager multiple locals or add them 

264 later by appending them to ``manager.locals``. Every time the manager 

265 cleans up, it will clean up all the data left in the locals for this 

266 context. 

267 

268 The ``ident_func`` parameter can be added to override the default ident 

269 function for the wrapped locals. 

270 """ 

271 

272 def __init__(self, locals=None, ident_func=None): 

273 if locals is None: 

274 self.locals = [] 

275 elif isinstance(locals, Local): 

276 self.locals = [locals] 

277 else: 

278 self.locals = list(locals) 

279 if ident_func is not None: 

280 self.ident_func = ident_func 

281 for local in self.locals: 

282 object.__setattr__(local, '__ident_func__', ident_func) 

283 else: 

284 self.ident_func = get_ident 

285 

286 def get_ident(self): 

287 """Return context identifier. 

288 

289 This is the indentifer the local objects use internally 

290 for this context. You cannot override this method to change the 

291 behavior but use it to link other context local objects (such as 

292 SQLAlchemy's scoped sessions) to the Werkzeug locals. 

293 """ 

294 return self.ident_func() 

295 

296 def cleanup(self): 

297 """Manually clean up the data in the locals for this context. 

298 

299 Call this at the end of the request or use ``make_middleware()``. 

300 """ 

301 for local in self.locals: 

302 release_local(local) 

303 

304 def __repr__(self): 

305 return '<{0} storages: {1}>'.format( 

306 self.__class__.__name__, len(self.locals)) 

307 

308 

309class _FastLocalStack(threading.local): 

310 

311 def __init__(self): 

312 self.stack = [] 

313 self.push = self.stack.append 

314 self.pop = self.stack.pop 

315 super(_FastLocalStack, self).__init__() 

316 

317 @property 

318 def top(self): 

319 try: 

320 return self.stack[-1] 

321 except (AttributeError, IndexError): 

322 return None 

323 

324 def __len__(self): 

325 return len(self.stack) 

326 

327 

328if USE_FAST_LOCALS: # pragma: no cover 

329 LocalStack = _FastLocalStack 

330else: # pragma: no cover 

331 # - See #706 

332 # since each thread has its own greenlet we can just use those as 

333 # identifiers for the context. If greenlets aren't available we 

334 # fall back to the current thread ident. 

335 LocalStack = _LocalStack # noqa