Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Django-specific customization.""" 

2from __future__ import absolute_import, unicode_literals 

3 

4import os 

5import sys 

6import warnings 

7from datetime import datetime 

8from importlib import import_module 

9 

10from kombu.utils.imports import symbol_by_name 

11from kombu.utils.objects import cached_property 

12 

13from celery import _state, signals 

14from celery.exceptions import FixupWarning, ImproperlyConfigured 

15 

16__all__ = ('DjangoFixup', 'fixup') 

17 

18ERR_NOT_INSTALLED = """\ 

19Environment variable DJANGO_SETTINGS_MODULE is defined 

20but Django isn't installed. Won't apply Django fix-ups! 

21""" 

22 

23 

24def _maybe_close_fd(fh): 

25 try: 

26 os.close(fh.fileno()) 

27 except (AttributeError, OSError, TypeError): 

28 # TypeError added for celery#962 

29 pass 

30 

31 

32def _verify_django_version(django): 

33 if django.VERSION < (1, 11): 

34 raise ImproperlyConfigured('Celery 4.x requires Django 1.11 or later.') 

35 

36 

37def fixup(app, env='DJANGO_SETTINGS_MODULE'): 

38 """Install Django fixup if settings module environment is set.""" 

39 SETTINGS_MODULE = os.environ.get(env) 

40 if SETTINGS_MODULE and 'django' not in app.loader_cls.lower(): 

41 try: 

42 import django # noqa 

43 except ImportError: 

44 warnings.warn(FixupWarning(ERR_NOT_INSTALLED)) 

45 else: 

46 _verify_django_version(django) 

47 return DjangoFixup(app).install() 

48 

49 

50class DjangoFixup(object): 

51 """Fixup installed when using Django.""" 

52 

53 def __init__(self, app): 

54 self.app = app 

55 if _state.default_app is None: 

56 self.app.set_default() 

57 self._worker_fixup = None 

58 

59 def install(self): 

60 # Need to add project directory to path. 

61 # The project directory has precedence over system modules, 

62 # so we prepend it to the path. 

63 sys.path.insert(0, os.getcwd()) 

64 

65 self._settings = symbol_by_name('django.conf:settings') 

66 self.app.loader.now = self.now 

67 

68 signals.import_modules.connect(self.on_import_modules) 

69 signals.worker_init.connect(self.on_worker_init) 

70 return self 

71 

72 @property 

73 def worker_fixup(self): 

74 if self._worker_fixup is None: 

75 self._worker_fixup = DjangoWorkerFixup(self.app) 

76 return self._worker_fixup 

77 

78 @worker_fixup.setter 

79 def worker_fixup(self, value): 

80 self._worker_fixup = value 

81 

82 def on_import_modules(self, **kwargs): 

83 # call django.setup() before task modules are imported 

84 self.worker_fixup.validate_models() 

85 

86 def on_worker_init(self, **kwargs): 

87 self.worker_fixup.install() 

88 

89 def now(self, utc=False): 

90 return datetime.utcnow() if utc else self._now() 

91 

92 def autodiscover_tasks(self): 

93 from django.apps import apps 

94 return [config.name for config in apps.get_app_configs()] 

95 

96 @cached_property 

97 def _now(self): 

98 return symbol_by_name('django.utils.timezone:now') 

99 

100 

101class DjangoWorkerFixup(object): 

102 _db_recycles = 0 

103 

104 def __init__(self, app): 

105 self.app = app 

106 self.db_reuse_max = self.app.conf.get('CELERY_DB_REUSE_MAX', None) 

107 self._db = import_module('django.db') 

108 self._cache = import_module('django.core.cache') 

109 self._settings = symbol_by_name('django.conf:settings') 

110 

111 self.interface_errors = ( 

112 symbol_by_name('django.db.utils.InterfaceError'), 

113 ) 

114 self.DatabaseError = symbol_by_name('django.db:DatabaseError') 

115 

116 def django_setup(self): 

117 import django 

118 django.setup() 

119 

120 def validate_models(self): 

121 from django.core.checks import run_checks 

122 self.django_setup() 

123 run_checks() 

124 

125 def install(self): 

126 signals.beat_embedded_init.connect(self.close_database) 

127 signals.worker_ready.connect(self.on_worker_ready) 

128 signals.task_prerun.connect(self.on_task_prerun) 

129 signals.task_postrun.connect(self.on_task_postrun) 

130 signals.worker_process_init.connect(self.on_worker_process_init) 

131 self.close_database() 

132 self.close_cache() 

133 return self 

134 

135 def on_worker_process_init(self, **kwargs): 

136 # Child process must validate models again if on Windows, 

137 # or if they were started using execv. 

138 if os.environ.get('FORKED_BY_MULTIPROCESSING'): 

139 self.validate_models() 

140 

141 # close connections: 

142 # the parent process may have established these, 

143 # so need to close them. 

144 

145 # calling db.close() on some DB connections will cause 

146 # the inherited DB conn to also get broken in the parent 

147 # process so we need to remove it without triggering any 

148 # network IO that close() might cause. 

149 for c in self._db.connections.all(): 

150 if c and c.connection: 

151 self._maybe_close_db_fd(c.connection) 

152 

153 # use the _ version to avoid DB_REUSE preventing the conn.close() call 

154 self._close_database(force=True) 

155 self.close_cache() 

156 

157 def _maybe_close_db_fd(self, fd): 

158 try: 

159 _maybe_close_fd(fd) 

160 except self.interface_errors: 

161 pass 

162 

163 def on_task_prerun(self, sender, **kwargs): 

164 """Called before every task.""" 

165 if not getattr(sender.request, 'is_eager', False): 

166 self.close_database() 

167 

168 def on_task_postrun(self, sender, **kwargs): 

169 # See https://groups.google.com/group/django-users/ 

170 # browse_thread/thread/78200863d0c07c6d/ 

171 if not getattr(sender.request, 'is_eager', False): 

172 self.close_database() 

173 self.close_cache() 

174 

175 def close_database(self, **kwargs): 

176 if not self.db_reuse_max: 

177 return self._close_database() 

178 if self._db_recycles >= self.db_reuse_max * 2: 

179 self._db_recycles = 0 

180 self._close_database() 

181 self._db_recycles += 1 

182 

183 def _close_database(self, force=False): 

184 for conn in self._db.connections.all(): 

185 try: 

186 if force: 

187 conn.close() 

188 else: 

189 conn.close_if_unusable_or_obsolete() 

190 except self.interface_errors: 

191 pass 

192 except self.DatabaseError as exc: 

193 str_exc = str(exc) 

194 if 'closed' not in str_exc and 'not connected' not in str_exc: 

195 raise 

196 

197 def close_cache(self): 

198 try: 

199 self._cache.close_caches() 

200 except (TypeError, AttributeError): 

201 pass 

202 

203 def on_worker_ready(self, **kwargs): 

204 if self._settings.DEBUG: 

205 warnings.warn('''Using settings.DEBUG leads to a memory 

206 leak, never use this setting in production environments!''')