Coverage for src / documint_mcp / griffe_extractor.py: 19%

157 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 22:30 -0400

1"""Griffe-powered symbol extraction for Python codebases. 

2 

3Uses Griffe (https://github.com/mkdocstrings/griffe) to extract 

4full API graphs with type info, parameters, return types, decorators, 

5and inheritance chains. Falls back to the existing symbol_extractor 

6for non-Python files or when Griffe is not installed. 

7 

8Griffe gives us: 

9- Rich object models (Module, Class, Function, Attribute, Parameter) 

10- 12 distinct breakage types for drift detection 

11- Git-aware comparison between any two refs 

12- JSON serialization of the full API graph 

13""" 

14from __future__ import annotations 

15 

16import logging 

17from pathlib import Path 

18 

19logger = logging.getLogger(__name__) 

20 

21_HAS_GRIFFE = False 

22try: 

23 import griffe 

24 from griffe import ( 

25 Alias, 

26 Attribute, 

27 Class, 

28 Function, 

29 Module, 

30 Object, 

31 ) 

32 _HAS_GRIFFE = True 

33except ImportError: 

34 logger.debug("griffe not installed — falling back to basic symbol extractor") 

35 

36 

37def is_available() -> bool: 

38 """Return True if Griffe is installed and usable.""" 

39 return _HAS_GRIFFE 

40 

41 

42def extract_api_graph(package_name: str) -> dict | None: 

43 """Extract the full API graph for a Python package using Griffe. 

44 

45 Returns a dict with the serialized API graph, or None if Griffe 

46 is unavailable. 

47 """ 

48 if not _HAS_GRIFFE: 

49 return None 

50 try: 

51 api = griffe.load(package_name, resolve_aliases=False) 

52 return _serialize_module(api) 

53 except Exception as exc: 

54 logger.warning("Griffe extraction failed for %s: %s", package_name, exc) 

55 return None 

56 

57 

58def extract_symbols_from_file(file_path: str, source_code: str | None = None) -> list[dict]: 

59 """Extract symbols from a single Python file using Griffe. 

60 

61 Returns symbols in the standard Documint format: 

62 {n: name, k: kind, s: signature, d: docstring, f: file, l: line} 

63 """ 

64 if not _HAS_GRIFFE: 

65 return [] 

66 try: 

67 path = Path(file_path) 

68 code = source_code or path.read_text(encoding="utf-8") 

69 with griffe.temporary_visited_module(code, module_name=path.stem) as module: 

70 return _module_to_symbols(module, str(path)) 

71 except Exception as exc: 

72 logger.debug("Griffe file extraction failed for %s: %s", file_path, exc) 

73 return [] 

74 

75 

76def find_breaking_changes( 

77 old_symbols: list[dict], 

78 new_symbols: list[dict], 

79 package_name: str = "", 

80) -> list[dict]: 

81 """Compare two symbol sets and find breaking changes using Griffe's diff engine. 

82 

83 This is much more precise than string matching — Griffe detects: 

84 - Parameter removed/moved/changed kind/changed default/now required 

85 - Return type changed 

86 - Object removed/changed kind 

87 - Attribute type/value changed 

88 - Base class removed 

89 """ 

90 if not _HAS_GRIFFE: 

91 return [] 

92 try: 

93 # For Griffe diff, we need actual Module objects 

94 # This is a simplified version that works with our symbol format 

95 changes = [] 

96 old_map = {s["n"]: s for s in old_symbols if s.get("n")} 

97 new_map = {s["n"]: s for s in new_symbols if s.get("n")} 

98 

99 # Detect removals 

100 for name in old_map: 

101 if name not in new_map: 

102 changes.append({ 

103 "type": "removed", 

104 "symbol": name, 

105 "severity": "HIGH", 

106 "detail": f"{name} was removed", 

107 "before": old_map[name].get("s", ""), 

108 "after": "", 

109 }) 

110 

111 # Detect additions 

112 for name in new_map: 

113 if name not in old_map: 

114 changes.append({ 

115 "type": "added", 

116 "symbol": name, 

117 "severity": "LOW", 

118 "detail": f"{name} was added", 

119 "before": "", 

120 "after": new_map[name].get("s", ""), 

121 }) 

122 

123 # Detect modifications (signature changed) 

124 for name in old_map: 

125 if name in new_map: 

126 old_sig = old_map[name].get("s", "") 

127 new_sig = new_map[name].get("s", "") 

128 if old_sig != new_sig: 

129 changes.append({ 

130 "type": "changed", 

131 "symbol": name, 

132 "severity": "HIGH" if _is_breaking(old_map[name], new_map[name]) else "MEDIUM", 

133 "detail": f"{name} signature changed", 

134 "before": old_sig, 

135 "after": new_sig, 

136 }) 

137 

138 return changes 

139 except Exception as exc: 

140 logger.debug("Griffe diff failed: %s", exc) 

141 return [] 

142 

143 

144def _is_breaking(old: dict, new: dict) -> bool: 

145 """Heuristic: is this change likely breaking?""" 

146 old_params = old.get("params", []) 

147 new_params = new.get("params", []) 

148 

149 # Params removed = breaking 

150 old_names = {p["name"] for p in old_params} if isinstance(old_params, list) else set() 

151 new_names = {p["name"] for p in new_params} if isinstance(new_params, list) else set() 

152 

153 if old_names - new_names: # params removed 

154 return True 

155 

156 # Return type changed = breaking 

157 if old.get("returns") != new.get("returns") and old.get("returns"): 

158 return True 

159 

160 return False 

161 

162 

163def _serialize_module(module: Module) -> dict: 

164 """Serialize a Griffe Module to a dict with full API graph.""" 

165 result: dict = { 

166 "name": module.name, 

167 "kind": "module", 

168 "path": str(module.filepath) if module.filepath else "", 

169 "docstring": _get_docstring(module), 

170 "members": {}, 

171 } 

172 

173 for name, member in module.members.items(): 

174 if name.startswith("_") and not name.startswith("__"): 

175 continue # skip private 

176 try: 

177 if isinstance(member, Alias): 

178 continue # skip aliases to avoid resolution errors 

179 result["members"][name] = _serialize_object(member) 

180 except Exception: 

181 continue # skip unresolvable members 

182 

183 return result 

184 

185 

186def _serialize_object(obj: Object) -> dict: 

187 """Serialize any Griffe object to a dict.""" 

188 if isinstance(obj, Function): 

189 return _serialize_function(obj) 

190 elif isinstance(obj, Class): 

191 return _serialize_class(obj) 

192 elif isinstance(obj, Attribute): 

193 return _serialize_attribute(obj) 

194 elif isinstance(obj, Module): 

195 return _serialize_module(obj) 

196 else: 

197 return {"name": obj.name, "kind": str(obj.kind)} 

198 

199 

200def _serialize_function(func: Function) -> dict: 

201 """Serialize a Griffe Function with full parameter info.""" 

202 params = [] 

203 for param in func.parameters: 

204 p: dict = { 

205 "name": param.name, 

206 "kind": str(param.kind), 

207 } 

208 if param.annotation is not None: 

209 p["type"] = str(param.annotation) 

210 if param.default is not None: 

211 p["default"] = str(param.default) 

212 params.append(p) 

213 

214 result: dict = { 

215 "name": func.name, 

216 "kind": "function", 

217 "signature": _build_signature(func), 

218 "parameters": params, 

219 "docstring": _get_docstring(func), 

220 "lineno": func.lineno, 

221 "decorators": [str(d.value) for d in func.decorators] if func.decorators else [], 

222 } 

223 

224 if func.returns is not None: 

225 result["returns"] = str(func.returns) 

226 

227 return result 

228 

229 

230def _serialize_class(cls: Class) -> dict: 

231 """Serialize a Griffe Class with methods and bases.""" 

232 methods = {} 

233 for name, member in cls.members.items(): 

234 if name.startswith("_") and name != "__init__": 

235 continue 

236 try: 

237 if isinstance(member, Alias): 

238 continue 

239 methods[name] = _serialize_object(member) 

240 except Exception: 

241 continue 

242 

243 return { 

244 "name": cls.name, 

245 "kind": "class", 

246 "bases": [str(b) for b in cls.bases] if cls.bases else [], 

247 "docstring": _get_docstring(cls), 

248 "lineno": cls.lineno, 

249 "decorators": [str(d.value) for d in cls.decorators] if cls.decorators else [], 

250 "methods": methods, 

251 } 

252 

253 

254def _serialize_attribute(attr: Attribute) -> dict: 

255 """Serialize a Griffe Attribute.""" 

256 result: dict = { 

257 "name": attr.name, 

258 "kind": "attribute", 

259 "lineno": attr.lineno, 

260 } 

261 if attr.annotation is not None: 

262 result["type"] = str(attr.annotation) 

263 if attr.value is not None: 

264 result["value"] = str(attr.value) 

265 return result 

266 

267 

268def _build_signature(func: Function) -> str: 

269 """Build a human-readable signature string.""" 

270 params = [] 

271 for p in func.parameters: 

272 part = p.name 

273 if p.annotation is not None: 

274 part += f": {p.annotation}" 

275 if p.default is not None: 

276 part += f" = {p.default}" 

277 params.append(part) 

278 

279 sig = f"{func.name}({', '.join(params)})" 

280 if func.returns is not None: 

281 sig += f" -> {func.returns}" 

282 return sig 

283 

284 

285def _get_docstring(obj: Object) -> str: 

286 """Extract docstring from a Griffe object.""" 

287 if obj.docstring: 

288 return obj.docstring.value or "" 

289 return "" 

290 

291 

292def _module_to_symbols(module: Module, file_path: str) -> list[dict]: 

293 """Convert a Griffe Module to the standard Documint symbol format.""" 

294 symbols = [] 

295 

296 for name, member in module.members.items(): 

297 if name.startswith("_"): 

298 continue 

299 try: 

300 if isinstance(member, Alias): 

301 continue 

302 if isinstance(member, Function): 

303 symbols.append({ 

304 "n": name, 

305 "k": "function", 

306 "s": _build_signature(member), 

307 "d": _get_docstring(member), 

308 "f": file_path, 

309 "l": member.lineno, 

310 "params": [ 

311 { 

312 "name": p.name, 

313 "type": str(p.annotation) if p.annotation else None, 

314 "default": str(p.default) if p.default else None, 

315 } 

316 for p in member.parameters 

317 ], 

318 "returns": str(member.returns) if member.returns else None, 

319 "decorators": [str(d.value) for d in member.decorators] if member.decorators else [], 

320 }) 

321 elif isinstance(member, Class): 

322 symbols.append({ 

323 "n": name, 

324 "k": "class", 

325 "s": f"class {name}", 

326 "d": _get_docstring(member), 

327 "f": file_path, 

328 "l": member.lineno, 

329 "bases": [str(b) for b in member.bases] if member.bases else [], 

330 }) 

331 # Also extract public methods 

332 for mname, mmember in member.members.items(): 

333 if mname.startswith("_") and mname != "__init__": 

334 continue 

335 if isinstance(mmember, Function): 

336 symbols.append({ 

337 "n": f"{name}.{mname}", 

338 "k": "method", 

339 "s": _build_signature(mmember), 

340 "d": _get_docstring(mmember), 

341 "f": file_path, 

342 "l": mmember.lineno, 

343 "params": [ 

344 { 

345 "name": p.name, 

346 "type": str(p.annotation) if p.annotation else None, 

347 "default": str(p.default) if p.default else None, 

348 } 

349 for p in mmember.parameters 

350 ], 

351 "returns": str(mmember.returns) if mmember.returns else None, 

352 }) 

353 except Exception: 

354 continue 

355 

356 return symbols