Coverage for src / documint_mcp / drift_engine.py: 0%

151 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 22:30 -0400

1""" 

2Semantic drift detection engine. 

3 

4Replaces timestamp-based drift detection with structural symbol hash comparison. 

5A hash change = exported symbol added/removed/changed = semantic drift. 

6Whitespace, comments, internal refactors: NO drift signal. 

7 

8The engine also produces an ATTRIBUTED SymbolDiff that tells the downstream 

9AI chain exactly WHAT changed, not just THAT something changed. 

10""" 

11from __future__ import annotations 

12 

13import hashlib 

14import json 

15from dataclasses import dataclass, field 

16from enum import StrEnum 

17from typing import Any 

18 

19import structlog 

20 

21from .symbol_extractor import SymbolEntry, extract_symbols_from_files 

22 

23logger = structlog.get_logger(__name__) 

24 

25 

26class ChangeType(StrEnum): 

27 ADDED = "ADDED" # new export appeared 

28 REMOVED = "REMOVED" # export disappeared (BREAKING) 

29 PARAM_ADDED = "PARAM_ADDED" 

30 PARAM_REMOVED = "PARAM_REMOVED" # BREAKING 

31 RETURN_CHANGED = "RETURN_CHANGED" # BREAKING 

32 RENAMED = "RENAMED" # symbol name changed (BREAKING) 

33 SIGNATURE_CHANGED = "SIGNATURE_CHANGED" # catch-all for other sig changes 

34 

35 

36class Severity(StrEnum): 

37 BREAKING = "BREAKING" # callers must update their code 

38 ADDITIVE = "ADDITIVE" # backwards compatible 

39 COSMETIC = "COSMETIC" # no functional impact on callers 

40 

41 

42@dataclass(frozen=True) 

43class SymbolChange: 

44 """A single attributed change to an exported symbol.""" 

45 

46 change_type: ChangeType 

47 symbol_name: str 

48 severity: Severity 

49 before: str | None = None 

50 after: str | None = None 

51 detail: str = "" 

52 

53 def human_summary(self) -> str: 

54 match self.change_type: 

55 case ChangeType.ADDED: 

56 return f"New export: {self.symbol_name}" 

57 case ChangeType.REMOVED: 

58 return f"Removed: {self.symbol_name} (BREAKING)" 

59 case ChangeType.PARAM_ADDED: 

60 return f"{self.symbol_name}() gained parameter: {self.detail}" 

61 case ChangeType.PARAM_REMOVED: 

62 return f"{self.symbol_name}() lost parameter: {self.detail} (BREAKING)" 

63 case ChangeType.RETURN_CHANGED: 

64 return f"{self.symbol_name}() return type: {self.before}{self.after}" 

65 case ChangeType.RENAMED: 

66 return f"Renamed: {self.before}{self.after}" 

67 case _: 

68 return f"{self.symbol_name}: {self.detail}" 

69 

70 def to_dict(self) -> dict[str, Any]: 

71 return { 

72 "type": self.change_type.value, 

73 "symbol": self.symbol_name, 

74 "severity": self.severity.value, 

75 "before": self.before, 

76 "after": self.after, 

77 "detail": self.detail, 

78 } 

79 

80 

81@dataclass 

82class SymbolDiff: 

83 """Complete diff between two symbol states.""" 

84 

85 changes: list[SymbolChange] = field(default_factory=list) 

86 old_hash: str = "" 

87 new_hash: str = "" 

88 

89 @property 

90 def has_breaking_changes(self) -> bool: 

91 return any(c.severity == Severity.BREAKING for c in self.changes) 

92 

93 @property 

94 def is_additive_only(self) -> bool: 

95 return bool(self.changes) and all(c.severity == Severity.ADDITIVE for c in self.changes) 

96 

97 @property 

98 def changed_symbol_names(self) -> list[str]: 

99 return list({c.symbol_name for c in self.changes}) 

100 

101 def confidence_score(self) -> float: 

102 """ 

103 Confidence that an AI patch can handle this diff without human review. 

104 ADDITIVE-only changes = high confidence (0.9). 

105 BREAKING changes = lower confidence (0.5). 

106 Many changes at once = lower confidence. 

107 """ 

108 if not self.changes: 

109 return 1.0 

110 base = 0.9 if self.is_additive_only else 0.5 

111 # Decay with number of changes 

112 decay = max(0.0, 1.0 - (len(self.changes) - 1) * 0.1) 

113 return round(base * decay, 2) 

114 

115 def human_summary(self) -> str: 

116 if not self.changes: 

117 return "No structural changes detected." 

118 parts = [c.human_summary() for c in self.changes[:5]] 

119 if len(self.changes) > 5: 

120 parts.append(f"... and {len(self.changes) - 5} more changes") 

121 return "\n".join(f" -> {p}" for p in parts) 

122 

123 def to_dict(self) -> dict[str, Any]: 

124 return { 

125 "old_hash": self.old_hash, 

126 "new_hash": self.new_hash, 

127 "changes": [c.to_dict() for c in self.changes], 

128 "has_breaking_changes": self.has_breaking_changes, 

129 "confidence": self.confidence_score(), 

130 } 

131 

132 

133def compute_symbol_hash(symbols: list[SymbolEntry]) -> str: 

134 """ 

135 SHA-256 hash of the normalized, sorted symbol list. 

136 

137 Normalization ensures: 

138 - Order independence: symbols sorted by name 

139 - Whitespace independence: signatures are stripped 

140 - Comment independence: signatures only include semantic info 

141 

142 The hash changes ONLY when a public symbol's name, params, or return type changes. 

143 Internal refactors, comment changes, formatting = same hash. 

144 """ 

145 normalized = sorted(s.signature() for s in symbols) 

146 payload = "\n".join(normalized).encode("utf-8") 

147 return hashlib.sha256(payload).hexdigest() 

148 

149 

150def diff_symbols( 

151 old_symbols: list[SymbolEntry], new_symbols: list[SymbolEntry] 

152) -> list[SymbolChange]: 

153 """ 

154 Produce an attributed diff between two symbol lists. 

155 Returns a list of SymbolChange objects describing exactly what changed. 

156 """ 

157 old_map = {s.name: s for s in old_symbols} 

158 new_map = {s.name: s for s in new_symbols} 

159 

160 old_names = set(old_map) 

161 new_names = set(new_map) 

162 

163 changes: list[SymbolChange] = [] 

164 

165 # Removed symbols (potentially BREAKING) 

166 for name in old_names - new_names: 

167 changes.append( 

168 SymbolChange( 

169 change_type=ChangeType.REMOVED, 

170 symbol_name=name, 

171 severity=Severity.BREAKING, 

172 before=old_map[name].signature(), 

173 detail=f"was: {old_map[name].kind} {name}", 

174 ) 

175 ) 

176 

177 # Added symbols (ADDITIVE) 

178 for name in new_names - old_names: 

179 changes.append( 

180 SymbolChange( 

181 change_type=ChangeType.ADDED, 

182 symbol_name=name, 

183 severity=Severity.ADDITIVE, 

184 after=new_map[name].signature(), 

185 detail=f"new {new_map[name].kind}: {name}", 

186 ) 

187 ) 

188 

189 # Changed symbols 

190 for name in old_names & new_names: 

191 old = old_map[name] 

192 new = new_map[name] 

193 

194 if old.signature() == new.signature(): 

195 continue # no change 

196 

197 # Check params 

198 old_params = set(old.params) 

199 new_params = set(new.params) 

200 

201 for p in new_params - old_params: 

202 # New param — check if it has a default (heuristic: contains "=" or "None" or "Optional") 

203 has_default = "=" in p or "None" in p or "Optional" in p or "?" in p 

204 changes.append( 

205 SymbolChange( 

206 change_type=ChangeType.PARAM_ADDED, 

207 symbol_name=name, 

208 severity=Severity.ADDITIVE if has_default else Severity.BREAKING, 

209 detail=p, 

210 before=str(old.params), 

211 after=str(new.params), 

212 ) 

213 ) 

214 

215 for p in old_params - new_params: 

216 changes.append( 

217 SymbolChange( 

218 change_type=ChangeType.PARAM_REMOVED, 

219 symbol_name=name, 

220 severity=Severity.BREAKING, 

221 detail=p, 

222 before=str(old.params), 

223 after=str(new.params), 

224 ) 

225 ) 

226 

227 # Check return type 

228 if old.return_type != new.return_type and old.params == new.params: 

229 changes.append( 

230 SymbolChange( 

231 change_type=ChangeType.RETURN_CHANGED, 

232 symbol_name=name, 

233 severity=Severity.BREAKING, 

234 before=old.return_type or "void", 

235 after=new.return_type or "void", 

236 ) 

237 ) 

238 

239 return changes 

240 

241 

242@dataclass 

243class DriftResult: 

244 """Result of running the drift engine on an artifact.""" 

245 

246 artifact_key: str 

247 is_stale: bool 

248 diff: SymbolDiff 

249 new_symbol_hash: str 

250 old_symbol_hash: str 

251 new_symbols: list[SymbolEntry] 

252 

253 @property 

254 def severity(self) -> str: 

255 if not self.is_stale: 

256 return "clean" 

257 if self.diff.has_breaking_changes: 

258 return "high" 

259 return "medium" 

260 

261 def finding_summary(self) -> str: 

262 """Attributed summary for DriftFinding — replaces the generic timestamp message.""" 

263 if not self.is_stale: 

264 return "Documentation is current." 

265 if not self.diff.changes: 

266 return "Source structure changed. Re-scan needed." 

267 return self.diff.human_summary() 

268 

269 

270class DriftEngine: 

271 """ 

272 Semantic drift detection engine. 

273 

274 Usage: 

275 engine = DriftEngine() 

276 result = engine.check( 

277 artifact_key="api-reference", 

278 source_contents={"src/api.py": "..."}, 

279 stored_hash="sha256:abc123...", 

280 ) 

281 if result.is_stale: 

282 print(result.finding_summary()) 

283 """ 

284 

285 def check( 

286 self, 

287 artifact_key: str, 

288 source_contents: dict[str, str], 

289 stored_hash: str | None, 

290 stored_symbols_json: str | None = None, 

291 ) -> DriftResult: 

292 """ 

293 Check if an artifact has drifted by comparing structural symbol hashes. 

294 

295 Args: 

296 artifact_key: Unique identifier for this artifact 

297 source_contents: {path: content} for all source files in the artifact 

298 stored_hash: The previously stored symbol hash (or None if first scan) 

299 stored_symbols_json: Previously stored symbols as JSON (for diff generation) 

300 

301 Returns: 

302 DriftResult with is_stale, diff, and new hash 

303 """ 

304 new_symbols = extract_symbols_from_files(source_contents) 

305 new_hash = compute_symbol_hash(new_symbols) 

306 

307 # First scan — always emit a "baseline established" non-stale result 

308 if stored_hash is None: 

309 logger.info("drift_baseline_established", artifact=artifact_key, hash=new_hash[:12]) 

310 return DriftResult( 

311 artifact_key=artifact_key, 

312 is_stale=False, 

313 diff=SymbolDiff(old_hash="", new_hash=new_hash), 

314 new_symbol_hash=new_hash, 

315 old_symbol_hash="", 

316 new_symbols=new_symbols, 

317 ) 

318 

319 # No change — clean 

320 if new_hash == stored_hash: 

321 return DriftResult( 

322 artifact_key=artifact_key, 

323 is_stale=False, 

324 diff=SymbolDiff(old_hash=stored_hash, new_hash=new_hash), 

325 new_symbol_hash=new_hash, 

326 old_symbol_hash=stored_hash, 

327 new_symbols=new_symbols, 

328 ) 

329 

330 # Hash changed — compute attributed diff 

331 old_symbols: list[SymbolEntry] = [] 

332 if stored_symbols_json: 

333 try: 

334 raw = json.loads(stored_symbols_json) 

335 old_symbols = [ 

336 SymbolEntry( 

337 name=s["n"], 

338 kind=s.get("k", "fn"), 

339 params=s.get("p", []), 

340 return_type=s.get("r"), 

341 line=s.get("l", 0), 

342 ) 

343 for s in raw 

344 ] 

345 except (json.JSONDecodeError, KeyError): 

346 old_symbols = [] 

347 

348 changes = diff_symbols(old_symbols, new_symbols) if old_symbols else [] 

349 diff = SymbolDiff(changes=changes, old_hash=stored_hash, new_hash=new_hash) 

350 

351 logger.info( 

352 "drift_detected", 

353 artifact=artifact_key, 

354 old_hash=stored_hash[:12], 

355 new_hash=new_hash[:12], 

356 changes=len(changes), 

357 has_breaking=diff.has_breaking_changes, 

358 ) 

359 

360 return DriftResult( 

361 artifact_key=artifact_key, 

362 is_stale=True, 

363 diff=diff, 

364 new_symbol_hash=new_hash, 

365 old_symbol_hash=stored_hash, 

366 new_symbols=new_symbols, 

367 ) 

368 

369 def symbols_to_json(self, symbols: list[SymbolEntry]) -> str: 

370 """Serialize symbols to LSIF-compact JSON for storage.""" 

371 return json.dumps([s.to_lsif_compact() for s in symbols], separators=(",", ":")) 

372 

373 

374_engine = DriftEngine() 

375 

376 

377def get_drift_engine() -> DriftEngine: 

378 return _engine