Coverage for src / documint_mcp / griffe_extractor.py: 19%
157 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 22:30 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 22:30 -0400
1"""Griffe-powered symbol extraction for Python codebases.
3Uses Griffe (https://github.com/mkdocstrings/griffe) to extract
4full API graphs with type info, parameters, return types, decorators,
5and inheritance chains. Falls back to the existing symbol_extractor
6for non-Python files or when Griffe is not installed.
8Griffe gives us:
9- Rich object models (Module, Class, Function, Attribute, Parameter)
10- 12 distinct breakage types for drift detection
11- Git-aware comparison between any two refs
12- JSON serialization of the full API graph
13"""
14from __future__ import annotations
16import logging
17from pathlib import Path
19logger = logging.getLogger(__name__)
21_HAS_GRIFFE = False
22try:
23 import griffe
24 from griffe import (
25 Alias,
26 Attribute,
27 Class,
28 Function,
29 Module,
30 Object,
31 )
32 _HAS_GRIFFE = True
33except ImportError:
34 logger.debug("griffe not installed — falling back to basic symbol extractor")
37def is_available() -> bool:
38 """Return True if Griffe is installed and usable."""
39 return _HAS_GRIFFE
42def extract_api_graph(package_name: str) -> dict | None:
43 """Extract the full API graph for a Python package using Griffe.
45 Returns a dict with the serialized API graph, or None if Griffe
46 is unavailable.
47 """
48 if not _HAS_GRIFFE:
49 return None
50 try:
51 api = griffe.load(package_name, resolve_aliases=False)
52 return _serialize_module(api)
53 except Exception as exc:
54 logger.warning("Griffe extraction failed for %s: %s", package_name, exc)
55 return None
58def extract_symbols_from_file(file_path: str, source_code: str | None = None) -> list[dict]:
59 """Extract symbols from a single Python file using Griffe.
61 Returns symbols in the standard Documint format:
62 {n: name, k: kind, s: signature, d: docstring, f: file, l: line}
63 """
64 if not _HAS_GRIFFE:
65 return []
66 try:
67 path = Path(file_path)
68 code = source_code or path.read_text(encoding="utf-8")
69 with griffe.temporary_visited_module(code, module_name=path.stem) as module:
70 return _module_to_symbols(module, str(path))
71 except Exception as exc:
72 logger.debug("Griffe file extraction failed for %s: %s", file_path, exc)
73 return []
76def find_breaking_changes(
77 old_symbols: list[dict],
78 new_symbols: list[dict],
79 package_name: str = "",
80) -> list[dict]:
81 """Compare two symbol sets and find breaking changes using Griffe's diff engine.
83 This is much more precise than string matching — Griffe detects:
84 - Parameter removed/moved/changed kind/changed default/now required
85 - Return type changed
86 - Object removed/changed kind
87 - Attribute type/value changed
88 - Base class removed
89 """
90 if not _HAS_GRIFFE:
91 return []
92 try:
93 # For Griffe diff, we need actual Module objects
94 # This is a simplified version that works with our symbol format
95 changes = []
96 old_map = {s["n"]: s for s in old_symbols if s.get("n")}
97 new_map = {s["n"]: s for s in new_symbols if s.get("n")}
99 # Detect removals
100 for name in old_map:
101 if name not in new_map:
102 changes.append({
103 "type": "removed",
104 "symbol": name,
105 "severity": "HIGH",
106 "detail": f"{name} was removed",
107 "before": old_map[name].get("s", ""),
108 "after": "",
109 })
111 # Detect additions
112 for name in new_map:
113 if name not in old_map:
114 changes.append({
115 "type": "added",
116 "symbol": name,
117 "severity": "LOW",
118 "detail": f"{name} was added",
119 "before": "",
120 "after": new_map[name].get("s", ""),
121 })
123 # Detect modifications (signature changed)
124 for name in old_map:
125 if name in new_map:
126 old_sig = old_map[name].get("s", "")
127 new_sig = new_map[name].get("s", "")
128 if old_sig != new_sig:
129 changes.append({
130 "type": "changed",
131 "symbol": name,
132 "severity": "HIGH" if _is_breaking(old_map[name], new_map[name]) else "MEDIUM",
133 "detail": f"{name} signature changed",
134 "before": old_sig,
135 "after": new_sig,
136 })
138 return changes
139 except Exception as exc:
140 logger.debug("Griffe diff failed: %s", exc)
141 return []
144def _is_breaking(old: dict, new: dict) -> bool:
145 """Heuristic: is this change likely breaking?"""
146 old_params = old.get("params", [])
147 new_params = new.get("params", [])
149 # Params removed = breaking
150 old_names = {p["name"] for p in old_params} if isinstance(old_params, list) else set()
151 new_names = {p["name"] for p in new_params} if isinstance(new_params, list) else set()
153 if old_names - new_names: # params removed
154 return True
156 # Return type changed = breaking
157 if old.get("returns") != new.get("returns") and old.get("returns"):
158 return True
160 return False
163def _serialize_module(module: Module) -> dict:
164 """Serialize a Griffe Module to a dict with full API graph."""
165 result: dict = {
166 "name": module.name,
167 "kind": "module",
168 "path": str(module.filepath) if module.filepath else "",
169 "docstring": _get_docstring(module),
170 "members": {},
171 }
173 for name, member in module.members.items():
174 if name.startswith("_") and not name.startswith("__"):
175 continue # skip private
176 try:
177 if isinstance(member, Alias):
178 continue # skip aliases to avoid resolution errors
179 result["members"][name] = _serialize_object(member)
180 except Exception:
181 continue # skip unresolvable members
183 return result
186def _serialize_object(obj: Object) -> dict:
187 """Serialize any Griffe object to a dict."""
188 if isinstance(obj, Function):
189 return _serialize_function(obj)
190 elif isinstance(obj, Class):
191 return _serialize_class(obj)
192 elif isinstance(obj, Attribute):
193 return _serialize_attribute(obj)
194 elif isinstance(obj, Module):
195 return _serialize_module(obj)
196 else:
197 return {"name": obj.name, "kind": str(obj.kind)}
200def _serialize_function(func: Function) -> dict:
201 """Serialize a Griffe Function with full parameter info."""
202 params = []
203 for param in func.parameters:
204 p: dict = {
205 "name": param.name,
206 "kind": str(param.kind),
207 }
208 if param.annotation is not None:
209 p["type"] = str(param.annotation)
210 if param.default is not None:
211 p["default"] = str(param.default)
212 params.append(p)
214 result: dict = {
215 "name": func.name,
216 "kind": "function",
217 "signature": _build_signature(func),
218 "parameters": params,
219 "docstring": _get_docstring(func),
220 "lineno": func.lineno,
221 "decorators": [str(d.value) for d in func.decorators] if func.decorators else [],
222 }
224 if func.returns is not None:
225 result["returns"] = str(func.returns)
227 return result
230def _serialize_class(cls: Class) -> dict:
231 """Serialize a Griffe Class with methods and bases."""
232 methods = {}
233 for name, member in cls.members.items():
234 if name.startswith("_") and name != "__init__":
235 continue
236 try:
237 if isinstance(member, Alias):
238 continue
239 methods[name] = _serialize_object(member)
240 except Exception:
241 continue
243 return {
244 "name": cls.name,
245 "kind": "class",
246 "bases": [str(b) for b in cls.bases] if cls.bases else [],
247 "docstring": _get_docstring(cls),
248 "lineno": cls.lineno,
249 "decorators": [str(d.value) for d in cls.decorators] if cls.decorators else [],
250 "methods": methods,
251 }
254def _serialize_attribute(attr: Attribute) -> dict:
255 """Serialize a Griffe Attribute."""
256 result: dict = {
257 "name": attr.name,
258 "kind": "attribute",
259 "lineno": attr.lineno,
260 }
261 if attr.annotation is not None:
262 result["type"] = str(attr.annotation)
263 if attr.value is not None:
264 result["value"] = str(attr.value)
265 return result
268def _build_signature(func: Function) -> str:
269 """Build a human-readable signature string."""
270 params = []
271 for p in func.parameters:
272 part = p.name
273 if p.annotation is not None:
274 part += f": {p.annotation}"
275 if p.default is not None:
276 part += f" = {p.default}"
277 params.append(part)
279 sig = f"{func.name}({', '.join(params)})"
280 if func.returns is not None:
281 sig += f" -> {func.returns}"
282 return sig
285def _get_docstring(obj: Object) -> str:
286 """Extract docstring from a Griffe object."""
287 if obj.docstring:
288 return obj.docstring.value or ""
289 return ""
292def _module_to_symbols(module: Module, file_path: str) -> list[dict]:
293 """Convert a Griffe Module to the standard Documint symbol format."""
294 symbols = []
296 for name, member in module.members.items():
297 if name.startswith("_"):
298 continue
299 try:
300 if isinstance(member, Alias):
301 continue
302 if isinstance(member, Function):
303 symbols.append({
304 "n": name,
305 "k": "function",
306 "s": _build_signature(member),
307 "d": _get_docstring(member),
308 "f": file_path,
309 "l": member.lineno,
310 "params": [
311 {
312 "name": p.name,
313 "type": str(p.annotation) if p.annotation else None,
314 "default": str(p.default) if p.default else None,
315 }
316 for p in member.parameters
317 ],
318 "returns": str(member.returns) if member.returns else None,
319 "decorators": [str(d.value) for d in member.decorators] if member.decorators else [],
320 })
321 elif isinstance(member, Class):
322 symbols.append({
323 "n": name,
324 "k": "class",
325 "s": f"class {name}",
326 "d": _get_docstring(member),
327 "f": file_path,
328 "l": member.lineno,
329 "bases": [str(b) for b in member.bases] if member.bases else [],
330 })
331 # Also extract public methods
332 for mname, mmember in member.members.items():
333 if mname.startswith("_") and mname != "__init__":
334 continue
335 if isinstance(mmember, Function):
336 symbols.append({
337 "n": f"{name}.{mname}",
338 "k": "method",
339 "s": _build_signature(mmember),
340 "d": _get_docstring(mmember),
341 "f": file_path,
342 "l": mmember.lineno,
343 "params": [
344 {
345 "name": p.name,
346 "type": str(p.annotation) if p.annotation else None,
347 "default": str(p.default) if p.default else None,
348 }
349 for p in mmember.parameters
350 ],
351 "returns": str(mmember.returns) if mmember.returns else None,
352 })
353 except Exception:
354 continue
356 return symbols