Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import inspect 

2import sys 

3from . import _tracing 

4from .callers import _Result 

5from .hooks import HookImpl, _HookRelay, _HookCaller, normalize_hookimpl_opts 

6import warnings 

7 

8if sys.version_info >= (3, 8): 

9 from importlib import metadata as importlib_metadata 

10else: 

11 import importlib_metadata 

12 

13 

14def _warn_for_function(warning, function): 

15 warnings.warn_explicit( 

16 warning, 

17 type(warning), 

18 lineno=function.__code__.co_firstlineno, 

19 filename=function.__code__.co_filename, 

20 ) 

21 

22 

23class PluginValidationError(Exception): 

24 """ plugin failed validation. 

25 

26 :param object plugin: the plugin which failed validation, 

27 may be a module or an arbitrary object. 

28 """ 

29 

30 def __init__(self, plugin, message): 

31 self.plugin = plugin 

32 super(Exception, self).__init__(message) 

33 

34 

35class DistFacade(object): 

36 """Emulate a pkg_resources Distribution""" 

37 

38 def __init__(self, dist): 

39 self._dist = dist 

40 

41 @property 

42 def project_name(self): 

43 return self.metadata["name"] 

44 

45 def __getattr__(self, attr, default=None): 

46 return getattr(self._dist, attr, default) 

47 

48 def __dir__(self): 

49 return sorted(dir(self._dist) + ["_dist", "project_name"]) 

50 

51 

52class PluginManager(object): 

53 """ Core :py:class:`.PluginManager` class which manages registration 

54 of plugin objects and 1:N hook calling. 

55 

56 You can register new hooks by calling :py:meth:`add_hookspecs(module_or_class) 

57 <.PluginManager.add_hookspecs>`. 

58 You can register plugin objects (which contain hooks) by calling 

59 :py:meth:`register(plugin) <.PluginManager.register>`. The :py:class:`.PluginManager` 

60 is initialized with a prefix that is searched for in the names of the dict 

61 of registered plugin objects. 

62 

63 For debugging purposes you can call :py:meth:`.PluginManager.enable_tracing` 

64 which will subsequently send debug information to the trace helper. 

65 """ 

66 

67 def __init__(self, project_name, implprefix=None): 

68 """If ``implprefix`` is given implementation functions 

69 will be recognized if their name matches the ``implprefix``. """ 

70 self.project_name = project_name 

71 self._name2plugin = {} 

72 self._plugin2hookcallers = {} 

73 self._plugin_distinfo = [] 

74 self.trace = _tracing.TagTracer().get("pluginmanage") 

75 self.hook = _HookRelay() 

76 if implprefix is not None: 

77 warnings.warn( 

78 "Support for the `implprefix` arg is now deprecated and will " 

79 "be removed in an upcoming release. Please use HookimplMarker.", 

80 DeprecationWarning, 

81 stacklevel=2, 

82 ) 

83 self._implprefix = implprefix 

84 self._inner_hookexec = lambda hook, methods, kwargs: hook.multicall( 

85 methods, 

86 kwargs, 

87 firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, 

88 ) 

89 

90 def _hookexec(self, hook, methods, kwargs): 

91 # called from all hookcaller instances. 

92 # enable_tracing will set its own wrapping function at self._inner_hookexec 

93 return self._inner_hookexec(hook, methods, kwargs) 

94 

95 def register(self, plugin, name=None): 

96 """ Register a plugin and return its canonical name or ``None`` if the name 

97 is blocked from registering. Raise a :py:class:`ValueError` if the plugin 

98 is already registered. """ 

99 plugin_name = name or self.get_canonical_name(plugin) 

100 

101 if plugin_name in self._name2plugin or plugin in self._plugin2hookcallers: 

102 if self._name2plugin.get(plugin_name, -1) is None: 

103 return # blocked plugin, return None to indicate no registration 

104 raise ValueError( 

105 "Plugin already registered: %s=%s\n%s" 

106 % (plugin_name, plugin, self._name2plugin) 

107 ) 

108 

109 # XXX if an error happens we should make sure no state has been 

110 # changed at point of return 

111 self._name2plugin[plugin_name] = plugin 

112 

113 # register matching hook implementations of the plugin 

114 self._plugin2hookcallers[plugin] = hookcallers = [] 

115 for name in dir(plugin): 

116 hookimpl_opts = self.parse_hookimpl_opts(plugin, name) 

117 if hookimpl_opts is not None: 

118 normalize_hookimpl_opts(hookimpl_opts) 

119 method = getattr(plugin, name) 

120 hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts) 

121 hook = getattr(self.hook, name, None) 

122 if hook is None: 

123 hook = _HookCaller(name, self._hookexec) 

124 setattr(self.hook, name, hook) 

125 elif hook.has_spec(): 

126 self._verify_hook(hook, hookimpl) 

127 hook._maybe_apply_history(hookimpl) 

128 hook._add_hookimpl(hookimpl) 

129 hookcallers.append(hook) 

130 return plugin_name 

131 

132 def parse_hookimpl_opts(self, plugin, name): 

133 method = getattr(plugin, name) 

134 if not inspect.isroutine(method): 

135 return 

136 try: 

137 res = getattr(method, self.project_name + "_impl", None) 

138 except Exception: 

139 res = {} 

140 if res is not None and not isinstance(res, dict): 

141 # false positive 

142 res = None 

143 # TODO: remove when we drop implprefix in 1.0 

144 elif res is None and self._implprefix and name.startswith(self._implprefix): 

145 _warn_for_function( 

146 DeprecationWarning( 

147 "The `implprefix` system is deprecated please decorate " 

148 "this function using an instance of HookimplMarker." 

149 ), 

150 method, 

151 ) 

152 res = {} 

153 return res 

154 

155 def unregister(self, plugin=None, name=None): 

156 """ unregister a plugin object and all its contained hook implementations 

157 from internal data structures. """ 

158 if name is None: 

159 assert plugin is not None, "one of name or plugin needs to be specified" 

160 name = self.get_name(plugin) 

161 

162 if plugin is None: 

163 plugin = self.get_plugin(name) 

164 

165 # if self._name2plugin[name] == None registration was blocked: ignore 

166 if self._name2plugin.get(name): 

167 del self._name2plugin[name] 

168 

169 for hookcaller in self._plugin2hookcallers.pop(plugin, []): 

170 hookcaller._remove_plugin(plugin) 

171 

172 return plugin 

173 

174 def set_blocked(self, name): 

175 """ block registrations of the given name, unregister if already registered. """ 

176 self.unregister(name=name) 

177 self._name2plugin[name] = None 

178 

179 def is_blocked(self, name): 

180 """ return ``True`` if the given plugin name is blocked. """ 

181 return name in self._name2plugin and self._name2plugin[name] is None 

182 

183 def add_hookspecs(self, module_or_class): 

184 """ add new hook specifications defined in the given ``module_or_class``. 

185 Functions are recognized if they have been decorated accordingly. """ 

186 names = [] 

187 for name in dir(module_or_class): 

188 spec_opts = self.parse_hookspec_opts(module_or_class, name) 

189 if spec_opts is not None: 

190 hc = getattr(self.hook, name, None) 

191 if hc is None: 

192 hc = _HookCaller(name, self._hookexec, module_or_class, spec_opts) 

193 setattr(self.hook, name, hc) 

194 else: 

195 # plugins registered this hook without knowing the spec 

196 hc.set_specification(module_or_class, spec_opts) 

197 for hookfunction in hc.get_hookimpls(): 

198 self._verify_hook(hc, hookfunction) 

199 names.append(name) 

200 

201 if not names: 

202 raise ValueError( 

203 "did not find any %r hooks in %r" % (self.project_name, module_or_class) 

204 ) 

205 

206 def parse_hookspec_opts(self, module_or_class, name): 

207 method = getattr(module_or_class, name) 

208 return getattr(method, self.project_name + "_spec", None) 

209 

210 def get_plugins(self): 

211 """ return the set of registered plugins. """ 

212 return set(self._plugin2hookcallers) 

213 

214 def is_registered(self, plugin): 

215 """ Return ``True`` if the plugin is already registered. """ 

216 return plugin in self._plugin2hookcallers 

217 

218 def get_canonical_name(self, plugin): 

219 """ Return canonical name for a plugin object. Note that a plugin 

220 may be registered under a different name which was specified 

221 by the caller of :py:meth:`register(plugin, name) <.PluginManager.register>`. 

222 To obtain the name of an registered plugin use :py:meth:`get_name(plugin) 

223 <.PluginManager.get_name>` instead.""" 

224 return getattr(plugin, "__name__", None) or str(id(plugin)) 

225 

226 def get_plugin(self, name): 

227 """ Return a plugin or ``None`` for the given name. """ 

228 return self._name2plugin.get(name) 

229 

230 def has_plugin(self, name): 

231 """ Return ``True`` if a plugin with the given name is registered. """ 

232 return self.get_plugin(name) is not None 

233 

234 def get_name(self, plugin): 

235 """ Return name for registered plugin or ``None`` if not registered. """ 

236 for name, val in self._name2plugin.items(): 

237 if plugin == val: 

238 return name 

239 

240 def _verify_hook(self, hook, hookimpl): 

241 if hook.is_historic() and hookimpl.hookwrapper: 

242 raise PluginValidationError( 

243 hookimpl.plugin, 

244 "Plugin %r\nhook %r\nhistoric incompatible to hookwrapper" 

245 % (hookimpl.plugin_name, hook.name), 

246 ) 

247 if hook.spec.warn_on_impl: 

248 _warn_for_function(hook.spec.warn_on_impl, hookimpl.function) 

249 # positional arg checking 

250 notinspec = set(hookimpl.argnames) - set(hook.spec.argnames) 

251 if notinspec: 

252 raise PluginValidationError( 

253 hookimpl.plugin, 

254 "Plugin %r for hook %r\nhookimpl definition: %s\n" 

255 "Argument(s) %s are declared in the hookimpl but " 

256 "can not be found in the hookspec" 

257 % ( 

258 hookimpl.plugin_name, 

259 hook.name, 

260 _formatdef(hookimpl.function), 

261 notinspec, 

262 ), 

263 ) 

264 

265 def check_pending(self): 

266 """ Verify that all hooks which have not been verified against 

267 a hook specification are optional, otherwise raise :py:class:`.PluginValidationError`.""" 

268 for name in self.hook.__dict__: 

269 if name[0] != "_": 

270 hook = getattr(self.hook, name) 

271 if not hook.has_spec(): 

272 for hookimpl in hook.get_hookimpls(): 

273 if not hookimpl.optionalhook: 

274 raise PluginValidationError( 

275 hookimpl.plugin, 

276 "unknown hook %r in plugin %r" 

277 % (name, hookimpl.plugin), 

278 ) 

279 

280 def load_setuptools_entrypoints(self, group, name=None): 

281 """ Load modules from querying the specified setuptools ``group``. 

282 

283 :param str group: entry point group to load plugins 

284 :param str name: if given, loads only plugins with the given ``name``. 

285 :rtype: int 

286 :return: return the number of loaded plugins by this call. 

287 """ 

288 count = 0 

289 for dist in importlib_metadata.distributions(): 

290 for ep in dist.entry_points: 

291 if ( 

292 ep.group != group 

293 or (name is not None and ep.name != name) 

294 # already registered 

295 or self.get_plugin(ep.name) 

296 or self.is_blocked(ep.name) 

297 ): 

298 continue 

299 plugin = ep.load() 

300 self.register(plugin, name=ep.name) 

301 self._plugin_distinfo.append((plugin, DistFacade(dist))) 

302 count += 1 

303 return count 

304 

305 def list_plugin_distinfo(self): 

306 """ return list of distinfo/plugin tuples for all setuptools registered 

307 plugins. """ 

308 return list(self._plugin_distinfo) 

309 

310 def list_name_plugin(self): 

311 """ return list of name/plugin pairs. """ 

312 return list(self._name2plugin.items()) 

313 

314 def get_hookcallers(self, plugin): 

315 """ get all hook callers for the specified plugin. """ 

316 return self._plugin2hookcallers.get(plugin) 

317 

318 def add_hookcall_monitoring(self, before, after): 

319 """ add before/after tracing functions for all hooks 

320 and return an undo function which, when called, 

321 will remove the added tracers. 

322 

323 ``before(hook_name, hook_impls, kwargs)`` will be called ahead 

324 of all hook calls and receive a hookcaller instance, a list 

325 of HookImpl instances and the keyword arguments for the hook call. 

326 

327 ``after(outcome, hook_name, hook_impls, kwargs)`` receives the 

328 same arguments as ``before`` but also a :py:class:`pluggy.callers._Result` object 

329 which represents the result of the overall hook call. 

330 """ 

331 oldcall = self._inner_hookexec 

332 

333 def traced_hookexec(hook, hook_impls, kwargs): 

334 before(hook.name, hook_impls, kwargs) 

335 outcome = _Result.from_call(lambda: oldcall(hook, hook_impls, kwargs)) 

336 after(outcome, hook.name, hook_impls, kwargs) 

337 return outcome.get_result() 

338 

339 self._inner_hookexec = traced_hookexec 

340 

341 def undo(): 

342 self._inner_hookexec = oldcall 

343 

344 return undo 

345 

346 def enable_tracing(self): 

347 """ enable tracing of hook calls and return an undo function. """ 

348 hooktrace = self.trace.root.get("hook") 

349 

350 def before(hook_name, methods, kwargs): 

351 hooktrace.root.indent += 1 

352 hooktrace(hook_name, kwargs) 

353 

354 def after(outcome, hook_name, methods, kwargs): 

355 if outcome.excinfo is None: 

356 hooktrace("finish", hook_name, "-->", outcome.get_result()) 

357 hooktrace.root.indent -= 1 

358 

359 return self.add_hookcall_monitoring(before, after) 

360 

361 def subset_hook_caller(self, name, remove_plugins): 

362 """ Return a new :py:class:`.hooks._HookCaller` instance for the named method 

363 which manages calls to all registered plugins except the 

364 ones from remove_plugins. """ 

365 orig = getattr(self.hook, name) 

366 plugins_to_remove = [plug for plug in remove_plugins if hasattr(plug, name)] 

367 if plugins_to_remove: 

368 hc = _HookCaller( 

369 orig.name, orig._hookexec, orig.spec.namespace, orig.spec.opts 

370 ) 

371 for hookimpl in orig.get_hookimpls(): 

372 plugin = hookimpl.plugin 

373 if plugin not in plugins_to_remove: 

374 hc._add_hookimpl(hookimpl) 

375 # we also keep track of this hook caller so it 

376 # gets properly removed on plugin unregistration 

377 self._plugin2hookcallers.setdefault(plugin, []).append(hc) 

378 return hc 

379 return orig 

380 

381 

382if hasattr(inspect, "signature"): 

383 

384 def _formatdef(func): 

385 return "%s%s" % (func.__name__, str(inspect.signature(func))) 

386 

387 

388else: 

389 

390 def _formatdef(func): 

391 return "%s%s" % ( 

392 func.__name__, 

393 inspect.formatargspec(*inspect.getargspec(func)), 

394 )