Coverage for src / tracekit / guidance / recommender.py: 74%

106 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Recommendation engine for guided analysis workflow. 

2 

3This module provides contextual "What should I look at next?" recommendations 

4based on current analysis state. 

5 

6 

7Example: 

8 >>> from tracekit.guidance import suggest_next_steps 

9 >>> recommendations = suggest_next_steps(trace, current_state) 

10 >>> for rec in recommendations: 

11 ... print(f"{rec.title}: {rec.explanation}") 

12 

13References: 

14 TraceKit Auto-Discovery Specification 

15""" 

16 

17from __future__ import annotations 

18 

19from dataclasses import dataclass, field 

20from datetime import datetime 

21from typing import TYPE_CHECKING, Any 

22 

23if TYPE_CHECKING: 

24 from collections.abc import Callable 

25 

26 from tracekit.core.types import WaveformTrace 

27 

28 

29@dataclass 

30class Recommendation: 

31 """Analysis step recommendation. 

32 

33 Attributes: 

34 id: Unique recommendation ID. 

35 title: Short title (≤50 chars). 

36 explanation: Why this step is relevant (≤50 words). 

37 rationale: Detailed reasoning. 

38 priority: Priority score (0.0-1.0). 

39 urgency: Urgency score (0.0-1.0). 

40 ease: Ease of execution (0.0-1.0, higher = easier). 

41 impact: Expected impact (0.0-1.0, higher = more valuable). 

42 result_key: Key for storing result in state. 

43 execute: Optional callable to execute this step. 

44 impact_description: Description of expected impact. 

45 """ 

46 

47 id: str 

48 title: str 

49 explanation: str 

50 priority: float 

51 urgency: float = 0.5 

52 ease: float = 0.5 

53 impact: float = 0.5 

54 rationale: str = "" 

55 result_key: str = "" 

56 execute: Callable | None = None # type: ignore[type-arg] 

57 impact_description: str = "" 

58 

59 

60@dataclass 

61class AnalysisHistory: 

62 """Track completed analysis steps. 

63 

64 Attributes: 

65 steps_completed: List of completed step IDs. 

66 step_timestamps: Timestamps for each step. 

67 results: Stored results from each step. 

68 """ 

69 

70 steps_completed: list[str] = field(default_factory=list) 

71 step_timestamps: dict[str, datetime] = field(default_factory=dict) 

72 results: dict[str, Any] = field(default_factory=dict) 

73 

74 def add_step(self, step_id: str, result: Any = None) -> None: 

75 """Record a completed step. 

76 

77 Args: 

78 step_id: Step identifier. 

79 result: Optional result from step. 

80 """ 

81 if step_id not in self.steps_completed: 

82 self.steps_completed.append(step_id) 

83 

84 self.step_timestamps[step_id] = datetime.now() 

85 

86 if result is not None: 

87 self.results[step_id] = result 

88 

89 def was_recent(self, step_id: str, seconds: float = 60.0) -> bool: 

90 """Check if step was completed recently. 

91 

92 Args: 

93 step_id: Step identifier. 

94 seconds: Time window in seconds. 

95 

96 Returns: 

97 True if step was done within time window. 

98 """ 

99 if step_id not in self.step_timestamps: 99 ↛ 102line 99 didn't jump to line 102 because the condition on line 99 was always true

100 return False 

101 

102 elapsed = datetime.now() - self.step_timestamps[step_id] 

103 return elapsed.total_seconds() < seconds 

104 

105 

106def _calculate_priority( 

107 urgency: float, 

108 ease: float, 

109 impact: float, 

110) -> float: 

111 """Calculate recommendation priority. 

112 

113 Uses weighted scoring: urgency (40%), ease (30%), impact (30%). 

114 

115 Args: 

116 urgency: Urgency score (0.0-1.0). 

117 ease: Ease score (0.0-1.0). 

118 impact: Impact score (0.0-1.0). 

119 

120 Returns: 

121 Priority score (0.0-1.0). 

122 """ 

123 priority = 0.4 * urgency + 0.3 * ease + 0.3 * impact 

124 return round(priority, 2) 

125 

126 

127def _recommend_characterization( 

128 trace: WaveformTrace, 

129 state: dict, # type: ignore[type-arg] 

130 history: AnalysisHistory, 

131) -> Recommendation | None: 

132 """Recommend signal characterization if not done. 

133 

134 Args: 

135 trace: Waveform being analyzed. 

136 state: Current analysis state. 

137 history: Analysis history. 

138 

139 Returns: 

140 Recommendation or None if already done. 

141 """ 

142 if "characterization" in state or history.was_recent("characterization"): 

143 return None 

144 

145 return Recommendation( 

146 id="characterization", 

147 title="Characterize signal", 

148 explanation="Identify what type of signal this is (UART, SPI, analog, etc.) to guide further analysis.", 

149 urgency=0.95, 

150 ease=0.90, 

151 impact=0.95, 

152 priority=_calculate_priority(0.95, 0.90, 0.95), 

153 rationale="Signal characterization is the foundation for all other analysis", 

154 result_key="characterization", 

155 impact_description="Enables protocol-specific analysis and targeted measurements", 

156 ) 

157 

158 

159def _recommend_anomaly_check( 

160 trace: WaveformTrace, 

161 state: dict, # type: ignore[type-arg] 

162 history: AnalysisHistory, 

163) -> Recommendation | None: 

164 """Recommend anomaly detection. 

165 

166 Args: 

167 trace: Waveform being analyzed. 

168 state: Current analysis state. 

169 history: Analysis history. 

170 

171 Returns: 

172 Recommendation or None if not applicable. 

173 """ 

174 if "anomalies" in state or history.was_recent("anomalies"): 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 return None 

176 

177 # Higher priority if characterization shows issues 

178 urgency = 0.70 

179 

180 if "characterization" in state: 

181 char = state["characterization"] 

182 if hasattr(char, "confidence") and char.confidence < 0.8: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 urgency = 0.85 

184 

185 if "quality" in state: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 quality = state["quality"] 

187 if hasattr(quality, "status") and quality.status in ["WARNING", "FAIL"]: 

188 urgency = 0.90 

189 

190 return Recommendation( 

191 id="anomaly_detection", 

192 title="Check for anomalies", 

193 explanation="Scan the signal for glitches, dropouts, noise spikes, and other problems that could affect data integrity.", 

194 urgency=urgency, 

195 ease=0.85, 

196 impact=0.80, 

197 priority=_calculate_priority(urgency, 0.85, 0.80), 

198 rationale="Quality concerns detected - anomaly scan recommended", 

199 result_key="anomalies", 

200 impact_description="Identifies specific problem areas and their severity", 

201 ) 

202 

203 

204def _recommend_quality_assessment( 

205 trace: WaveformTrace, 

206 state: dict, # type: ignore[type-arg] 

207 history: AnalysisHistory, 

208) -> Recommendation | None: 

209 """Recommend data quality assessment. 

210 

211 Args: 

212 trace: Waveform being analyzed. 

213 state: Current analysis state. 

214 history: Analysis history. 

215 

216 Returns: 

217 Recommendation or None if not applicable. 

218 """ 

219 if "quality" in state or history.was_recent("quality"): 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 return None 

221 

222 # Higher priority early in analysis 

223 urgency = 0.75 if len(state) < 2 else 0.65 

224 

225 return Recommendation( 

226 id="quality_assessment", 

227 title="Assess data quality", 

228 explanation="Verify that sample rate and resolution are adequate for reliable analysis of this signal.", 

229 urgency=urgency, 

230 ease=0.90, 

231 impact=0.75, 

232 priority=_calculate_priority(urgency, 0.90, 0.75), 

233 rationale="Ensures captured data is suitable for intended analysis", 

234 result_key="quality", 

235 impact_description="Confirms data is good enough or identifies capture improvements needed", 

236 ) 

237 

238 

239def _recommend_protocol_decode( 

240 trace: WaveformTrace, 

241 state: dict, # type: ignore[type-arg] 

242 history: AnalysisHistory, 

243) -> Recommendation | None: 

244 """Recommend protocol decoding. 

245 

246 Args: 

247 trace: Waveform being analyzed. 

248 state: Current analysis state. 

249 history: Analysis history. 

250 

251 Returns: 

252 Recommendation or None if not applicable. 

253 """ 

254 if "decode" in state or history.was_recent("decode"): 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 return None 

256 

257 # Only recommend if signal type is identified 

258 if "characterization" not in state: 

259 return None 

260 

261 char = state["characterization"] 

262 

263 # Check if it's a protocol signal 

264 if hasattr(char, "signal_type"): 264 ↛ 290line 264 didn't jump to line 290 because the condition on line 264 was always true

265 signal_type = char.signal_type.lower() 

266 

267 if any(proto in signal_type for proto in ["uart", "spi", "i2c", "can"]): 

268 confidence = getattr(char, "confidence", 0.0) 

269 

270 if confidence >= 0.7: 270 ↛ 274line 270 didn't jump to line 274 because the condition on line 270 was always true

271 urgency = 0.85 

272 explanation = f"Signal identified as {char.signal_type} with high confidence. Decode to extract transmitted data." 

273 else: 

274 urgency = 0.60 

275 explanation = f"Signal possibly {char.signal_type} but confidence is low. Decode may help verify." 

276 

277 return Recommendation( 

278 id="protocol_decode", 

279 title="Decode protocol data", 

280 explanation=explanation, 

281 urgency=urgency, 

282 ease=0.80, 

283 impact=0.90, 

284 priority=_calculate_priority(urgency, 0.80, 0.90), 

285 rationale=f"{char.signal_type} protocol detected", 

286 result_key="decode", 

287 impact_description="Extracts meaningful data from signal", 

288 ) 

289 

290 return None 

291 

292 

293def _recommend_spectral_analysis( 

294 trace: WaveformTrace, 

295 state: dict, # type: ignore[type-arg] 

296 history: AnalysisHistory, 

297) -> Recommendation | None: 

298 """Recommend spectral analysis. 

299 

300 Args: 

301 trace: Waveform being analyzed. 

302 state: Current analysis state. 

303 history: Analysis history. 

304 

305 Returns: 

306 Recommendation or None if not applicable. 

307 """ 

308 if "spectral" in state or history.was_recent("spectral"): 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true

309 return None 

310 

311 # Recommend for analog/periodic signals 

312 if "characterization" in state: 

313 char = state["characterization"] 

314 

315 if hasattr(char, "signal_type"): 315 ↛ 332line 315 didn't jump to line 332 because the condition on line 315 was always true

316 signal_type = char.signal_type.lower() 

317 

318 if "analog" in signal_type or "periodic" in signal_type: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 return Recommendation( 

320 id="spectral_analysis", 

321 title="Perform spectral analysis", 

322 explanation="Analyze frequency content to identify dominant frequencies and harmonics in this analog signal.", 

323 urgency=0.65, 

324 ease=0.75, 

325 impact=0.80, 

326 priority=_calculate_priority(0.65, 0.75, 0.80), 

327 rationale="Periodic/analog signal detected", 

328 result_key="spectral", 

329 impact_description="Reveals frequency components and signal purity", 

330 ) 

331 

332 return None 

333 

334 

335def suggest_next_steps( 

336 trace: WaveformTrace, 

337 *, 

338 current_state: dict[str, Any] | None = None, 

339 max_suggestions: int = 3, 

340 include_rationale: bool = False, 

341) -> list[Recommendation]: 

342 """Suggest next analysis steps based on current state. 

343 

344 Provides contextual recommendations guiding users through the investigation 

345 process without requiring expertise. 

346 

347 Args: 

348 trace: Waveform being analyzed. 

349 current_state: Current analysis state with completed steps and results. 

350 max_suggestions: Maximum number of suggestions (default 3, range 2-5). 

351 include_rationale: Include detailed rationale in recommendations. 

352 

353 Returns: 

354 List of 2-5 recommended next steps, ranked by priority. 

355 

356 Example: 

357 >>> trace = load("capture.wfm") 

358 >>> state = {"characterization": char_result} 

359 >>> recommendations = suggest_next_steps(trace, current_state=state) 

360 >>> for rec in recommendations: 

361 ... print(f"{rec.priority:.2f}: {rec.title}") 

362 

363 References: 

364 DISC-008: Recommendation Engine 

365 """ 

366 current_state = current_state or {} 

367 

368 # Extract or create analysis history 

369 if "steps_completed" in current_state: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true

370 history = AnalysisHistory( 

371 steps_completed=current_state["steps_completed"], 

372 ) 

373 else: 

374 history = AnalysisHistory() 

375 

376 # Generate candidate recommendations 

377 candidates = [] 

378 

379 # Try each recommendation generator 

380 generators = [ 

381 _recommend_characterization, 

382 _recommend_quality_assessment, 

383 _recommend_anomaly_check, 

384 _recommend_protocol_decode, 

385 _recommend_spectral_analysis, 

386 ] 

387 

388 for generator in generators: 

389 rec = generator(trace, current_state, history) 

390 if rec is not None: 

391 candidates.append(rec) 

392 

393 # If no specific recommendations, provide escape hatch 

394 if not candidates: 394 ↛ 395line 394 didn't jump to line 395 because the condition on line 394 was never true

395 candidates.append( 

396 Recommendation( 

397 id="basic_characterization", 

398 title="Start with basic signal characterization", 

399 explanation="Not sure where to start? Begin with automatic signal characterization to identify the signal type.", 

400 urgency=0.50, 

401 ease=0.95, 

402 impact=0.85, 

403 priority=_calculate_priority(0.50, 0.95, 0.85), 

404 rationale="Default starting point when no analysis has been done", 

405 result_key="characterization", 

406 impact_description="Provides foundation for further analysis", 

407 ) 

408 ) 

409 

410 # Sort by priority (descending) 

411 candidates.sort(key=lambda r: r.priority, reverse=True) 

412 

413 # Limit to max_suggestions 

414 max_suggestions = max(2, min(5, max_suggestions)) 

415 recommendations = candidates[:max_suggestions] 

416 

417 # Remove rationale if not requested 

418 if not include_rationale: 418 ↛ 422line 418 didn't jump to line 422 because the condition on line 418 was always true

419 for rec in recommendations: 

420 rec.rationale = "" 

421 

422 return recommendations 

423 

424 

425__all__ = [ 

426 "AnalysisHistory", 

427 "Recommendation", 

428 "suggest_next_steps", 

429]