Coverage for src / documint_mcp / drift_engine.py: 0%
151 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 22:30 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 22:30 -0400
1"""
2Semantic drift detection engine.
4Replaces timestamp-based drift detection with structural symbol hash comparison.
5A hash change = exported symbol added/removed/changed = semantic drift.
6Whitespace, comments, internal refactors: NO drift signal.
8The engine also produces an ATTRIBUTED SymbolDiff that tells the downstream
9AI chain exactly WHAT changed, not just THAT something changed.
10"""
11from __future__ import annotations
13import hashlib
14import json
15from dataclasses import dataclass, field
16from enum import StrEnum
17from typing import Any
19import structlog
21from .symbol_extractor import SymbolEntry, extract_symbols_from_files
23logger = structlog.get_logger(__name__)
26class ChangeType(StrEnum):
27 ADDED = "ADDED" # new export appeared
28 REMOVED = "REMOVED" # export disappeared (BREAKING)
29 PARAM_ADDED = "PARAM_ADDED"
30 PARAM_REMOVED = "PARAM_REMOVED" # BREAKING
31 RETURN_CHANGED = "RETURN_CHANGED" # BREAKING
32 RENAMED = "RENAMED" # symbol name changed (BREAKING)
33 SIGNATURE_CHANGED = "SIGNATURE_CHANGED" # catch-all for other sig changes
36class Severity(StrEnum):
37 BREAKING = "BREAKING" # callers must update their code
38 ADDITIVE = "ADDITIVE" # backwards compatible
39 COSMETIC = "COSMETIC" # no functional impact on callers
42@dataclass(frozen=True)
43class SymbolChange:
44 """A single attributed change to an exported symbol."""
46 change_type: ChangeType
47 symbol_name: str
48 severity: Severity
49 before: str | None = None
50 after: str | None = None
51 detail: str = ""
53 def human_summary(self) -> str:
54 match self.change_type:
55 case ChangeType.ADDED:
56 return f"New export: {self.symbol_name}"
57 case ChangeType.REMOVED:
58 return f"Removed: {self.symbol_name} (BREAKING)"
59 case ChangeType.PARAM_ADDED:
60 return f"{self.symbol_name}() gained parameter: {self.detail}"
61 case ChangeType.PARAM_REMOVED:
62 return f"{self.symbol_name}() lost parameter: {self.detail} (BREAKING)"
63 case ChangeType.RETURN_CHANGED:
64 return f"{self.symbol_name}() return type: {self.before} → {self.after}"
65 case ChangeType.RENAMED:
66 return f"Renamed: {self.before} → {self.after}"
67 case _:
68 return f"{self.symbol_name}: {self.detail}"
70 def to_dict(self) -> dict[str, Any]:
71 return {
72 "type": self.change_type.value,
73 "symbol": self.symbol_name,
74 "severity": self.severity.value,
75 "before": self.before,
76 "after": self.after,
77 "detail": self.detail,
78 }
81@dataclass
82class SymbolDiff:
83 """Complete diff between two symbol states."""
85 changes: list[SymbolChange] = field(default_factory=list)
86 old_hash: str = ""
87 new_hash: str = ""
89 @property
90 def has_breaking_changes(self) -> bool:
91 return any(c.severity == Severity.BREAKING for c in self.changes)
93 @property
94 def is_additive_only(self) -> bool:
95 return bool(self.changes) and all(c.severity == Severity.ADDITIVE for c in self.changes)
97 @property
98 def changed_symbol_names(self) -> list[str]:
99 return list({c.symbol_name for c in self.changes})
101 def confidence_score(self) -> float:
102 """
103 Confidence that an AI patch can handle this diff without human review.
104 ADDITIVE-only changes = high confidence (0.9).
105 BREAKING changes = lower confidence (0.5).
106 Many changes at once = lower confidence.
107 """
108 if not self.changes:
109 return 1.0
110 base = 0.9 if self.is_additive_only else 0.5
111 # Decay with number of changes
112 decay = max(0.0, 1.0 - (len(self.changes) - 1) * 0.1)
113 return round(base * decay, 2)
115 def human_summary(self) -> str:
116 if not self.changes:
117 return "No structural changes detected."
118 parts = [c.human_summary() for c in self.changes[:5]]
119 if len(self.changes) > 5:
120 parts.append(f"... and {len(self.changes) - 5} more changes")
121 return "\n".join(f" -> {p}" for p in parts)
123 def to_dict(self) -> dict[str, Any]:
124 return {
125 "old_hash": self.old_hash,
126 "new_hash": self.new_hash,
127 "changes": [c.to_dict() for c in self.changes],
128 "has_breaking_changes": self.has_breaking_changes,
129 "confidence": self.confidence_score(),
130 }
133def compute_symbol_hash(symbols: list[SymbolEntry]) -> str:
134 """
135 SHA-256 hash of the normalized, sorted symbol list.
137 Normalization ensures:
138 - Order independence: symbols sorted by name
139 - Whitespace independence: signatures are stripped
140 - Comment independence: signatures only include semantic info
142 The hash changes ONLY when a public symbol's name, params, or return type changes.
143 Internal refactors, comment changes, formatting = same hash.
144 """
145 normalized = sorted(s.signature() for s in symbols)
146 payload = "\n".join(normalized).encode("utf-8")
147 return hashlib.sha256(payload).hexdigest()
150def diff_symbols(
151 old_symbols: list[SymbolEntry], new_symbols: list[SymbolEntry]
152) -> list[SymbolChange]:
153 """
154 Produce an attributed diff between two symbol lists.
155 Returns a list of SymbolChange objects describing exactly what changed.
156 """
157 old_map = {s.name: s for s in old_symbols}
158 new_map = {s.name: s for s in new_symbols}
160 old_names = set(old_map)
161 new_names = set(new_map)
163 changes: list[SymbolChange] = []
165 # Removed symbols (potentially BREAKING)
166 for name in old_names - new_names:
167 changes.append(
168 SymbolChange(
169 change_type=ChangeType.REMOVED,
170 symbol_name=name,
171 severity=Severity.BREAKING,
172 before=old_map[name].signature(),
173 detail=f"was: {old_map[name].kind} {name}",
174 )
175 )
177 # Added symbols (ADDITIVE)
178 for name in new_names - old_names:
179 changes.append(
180 SymbolChange(
181 change_type=ChangeType.ADDED,
182 symbol_name=name,
183 severity=Severity.ADDITIVE,
184 after=new_map[name].signature(),
185 detail=f"new {new_map[name].kind}: {name}",
186 )
187 )
189 # Changed symbols
190 for name in old_names & new_names:
191 old = old_map[name]
192 new = new_map[name]
194 if old.signature() == new.signature():
195 continue # no change
197 # Check params
198 old_params = set(old.params)
199 new_params = set(new.params)
201 for p in new_params - old_params:
202 # New param — check if it has a default (heuristic: contains "=" or "None" or "Optional")
203 has_default = "=" in p or "None" in p or "Optional" in p or "?" in p
204 changes.append(
205 SymbolChange(
206 change_type=ChangeType.PARAM_ADDED,
207 symbol_name=name,
208 severity=Severity.ADDITIVE if has_default else Severity.BREAKING,
209 detail=p,
210 before=str(old.params),
211 after=str(new.params),
212 )
213 )
215 for p in old_params - new_params:
216 changes.append(
217 SymbolChange(
218 change_type=ChangeType.PARAM_REMOVED,
219 symbol_name=name,
220 severity=Severity.BREAKING,
221 detail=p,
222 before=str(old.params),
223 after=str(new.params),
224 )
225 )
227 # Check return type
228 if old.return_type != new.return_type and old.params == new.params:
229 changes.append(
230 SymbolChange(
231 change_type=ChangeType.RETURN_CHANGED,
232 symbol_name=name,
233 severity=Severity.BREAKING,
234 before=old.return_type or "void",
235 after=new.return_type or "void",
236 )
237 )
239 return changes
242@dataclass
243class DriftResult:
244 """Result of running the drift engine on an artifact."""
246 artifact_key: str
247 is_stale: bool
248 diff: SymbolDiff
249 new_symbol_hash: str
250 old_symbol_hash: str
251 new_symbols: list[SymbolEntry]
253 @property
254 def severity(self) -> str:
255 if not self.is_stale:
256 return "clean"
257 if self.diff.has_breaking_changes:
258 return "high"
259 return "medium"
261 def finding_summary(self) -> str:
262 """Attributed summary for DriftFinding — replaces the generic timestamp message."""
263 if not self.is_stale:
264 return "Documentation is current."
265 if not self.diff.changes:
266 return "Source structure changed. Re-scan needed."
267 return self.diff.human_summary()
270class DriftEngine:
271 """
272 Semantic drift detection engine.
274 Usage:
275 engine = DriftEngine()
276 result = engine.check(
277 artifact_key="api-reference",
278 source_contents={"src/api.py": "..."},
279 stored_hash="sha256:abc123...",
280 )
281 if result.is_stale:
282 print(result.finding_summary())
283 """
285 def check(
286 self,
287 artifact_key: str,
288 source_contents: dict[str, str],
289 stored_hash: str | None,
290 stored_symbols_json: str | None = None,
291 ) -> DriftResult:
292 """
293 Check if an artifact has drifted by comparing structural symbol hashes.
295 Args:
296 artifact_key: Unique identifier for this artifact
297 source_contents: {path: content} for all source files in the artifact
298 stored_hash: The previously stored symbol hash (or None if first scan)
299 stored_symbols_json: Previously stored symbols as JSON (for diff generation)
301 Returns:
302 DriftResult with is_stale, diff, and new hash
303 """
304 new_symbols = extract_symbols_from_files(source_contents)
305 new_hash = compute_symbol_hash(new_symbols)
307 # First scan — always emit a "baseline established" non-stale result
308 if stored_hash is None:
309 logger.info("drift_baseline_established", artifact=artifact_key, hash=new_hash[:12])
310 return DriftResult(
311 artifact_key=artifact_key,
312 is_stale=False,
313 diff=SymbolDiff(old_hash="", new_hash=new_hash),
314 new_symbol_hash=new_hash,
315 old_symbol_hash="",
316 new_symbols=new_symbols,
317 )
319 # No change — clean
320 if new_hash == stored_hash:
321 return DriftResult(
322 artifact_key=artifact_key,
323 is_stale=False,
324 diff=SymbolDiff(old_hash=stored_hash, new_hash=new_hash),
325 new_symbol_hash=new_hash,
326 old_symbol_hash=stored_hash,
327 new_symbols=new_symbols,
328 )
330 # Hash changed — compute attributed diff
331 old_symbols: list[SymbolEntry] = []
332 if stored_symbols_json:
333 try:
334 raw = json.loads(stored_symbols_json)
335 old_symbols = [
336 SymbolEntry(
337 name=s["n"],
338 kind=s.get("k", "fn"),
339 params=s.get("p", []),
340 return_type=s.get("r"),
341 line=s.get("l", 0),
342 )
343 for s in raw
344 ]
345 except (json.JSONDecodeError, KeyError):
346 old_symbols = []
348 changes = diff_symbols(old_symbols, new_symbols) if old_symbols else []
349 diff = SymbolDiff(changes=changes, old_hash=stored_hash, new_hash=new_hash)
351 logger.info(
352 "drift_detected",
353 artifact=artifact_key,
354 old_hash=stored_hash[:12],
355 new_hash=new_hash[:12],
356 changes=len(changes),
357 has_breaking=diff.has_breaking_changes,
358 )
360 return DriftResult(
361 artifact_key=artifact_key,
362 is_stale=True,
363 diff=diff,
364 new_symbol_hash=new_hash,
365 old_symbol_hash=stored_hash,
366 new_symbols=new_symbols,
367 )
369 def symbols_to_json(self, symbols: list[SymbolEntry]) -> str:
370 """Serialize symbols to LSIF-compact JSON for storage."""
371 return json.dumps([s.to_lsif_compact() for s in symbols], separators=(",", ":"))
374_engine = DriftEngine()
377def get_drift_engine() -> DriftEngine:
378 return _engine