Coverage for src / tracekit / guidance / recommender.py: 74%
106 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Recommendation engine for guided analysis workflow.
3This module provides contextual "What should I look at next?" recommendations
4based on current analysis state.
7Example:
8 >>> from tracekit.guidance import suggest_next_steps
9 >>> recommendations = suggest_next_steps(trace, current_state)
10 >>> for rec in recommendations:
11 ... print(f"{rec.title}: {rec.explanation}")
13References:
14 TraceKit Auto-Discovery Specification
15"""
17from __future__ import annotations
19from dataclasses import dataclass, field
20from datetime import datetime
21from typing import TYPE_CHECKING, Any
23if TYPE_CHECKING:
24 from collections.abc import Callable
26 from tracekit.core.types import WaveformTrace
29@dataclass
30class Recommendation:
31 """Analysis step recommendation.
33 Attributes:
34 id: Unique recommendation ID.
35 title: Short title (≤50 chars).
36 explanation: Why this step is relevant (≤50 words).
37 rationale: Detailed reasoning.
38 priority: Priority score (0.0-1.0).
39 urgency: Urgency score (0.0-1.0).
40 ease: Ease of execution (0.0-1.0, higher = easier).
41 impact: Expected impact (0.0-1.0, higher = more valuable).
42 result_key: Key for storing result in state.
43 execute: Optional callable to execute this step.
44 impact_description: Description of expected impact.
45 """
47 id: str
48 title: str
49 explanation: str
50 priority: float
51 urgency: float = 0.5
52 ease: float = 0.5
53 impact: float = 0.5
54 rationale: str = ""
55 result_key: str = ""
56 execute: Callable | None = None # type: ignore[type-arg]
57 impact_description: str = ""
60@dataclass
61class AnalysisHistory:
62 """Track completed analysis steps.
64 Attributes:
65 steps_completed: List of completed step IDs.
66 step_timestamps: Timestamps for each step.
67 results: Stored results from each step.
68 """
70 steps_completed: list[str] = field(default_factory=list)
71 step_timestamps: dict[str, datetime] = field(default_factory=dict)
72 results: dict[str, Any] = field(default_factory=dict)
74 def add_step(self, step_id: str, result: Any = None) -> None:
75 """Record a completed step.
77 Args:
78 step_id: Step identifier.
79 result: Optional result from step.
80 """
81 if step_id not in self.steps_completed:
82 self.steps_completed.append(step_id)
84 self.step_timestamps[step_id] = datetime.now()
86 if result is not None:
87 self.results[step_id] = result
89 def was_recent(self, step_id: str, seconds: float = 60.0) -> bool:
90 """Check if step was completed recently.
92 Args:
93 step_id: Step identifier.
94 seconds: Time window in seconds.
96 Returns:
97 True if step was done within time window.
98 """
99 if step_id not in self.step_timestamps: 99 ↛ 102line 99 didn't jump to line 102 because the condition on line 99 was always true
100 return False
102 elapsed = datetime.now() - self.step_timestamps[step_id]
103 return elapsed.total_seconds() < seconds
106def _calculate_priority(
107 urgency: float,
108 ease: float,
109 impact: float,
110) -> float:
111 """Calculate recommendation priority.
113 Uses weighted scoring: urgency (40%), ease (30%), impact (30%).
115 Args:
116 urgency: Urgency score (0.0-1.0).
117 ease: Ease score (0.0-1.0).
118 impact: Impact score (0.0-1.0).
120 Returns:
121 Priority score (0.0-1.0).
122 """
123 priority = 0.4 * urgency + 0.3 * ease + 0.3 * impact
124 return round(priority, 2)
127def _recommend_characterization(
128 trace: WaveformTrace,
129 state: dict, # type: ignore[type-arg]
130 history: AnalysisHistory,
131) -> Recommendation | None:
132 """Recommend signal characterization if not done.
134 Args:
135 trace: Waveform being analyzed.
136 state: Current analysis state.
137 history: Analysis history.
139 Returns:
140 Recommendation or None if already done.
141 """
142 if "characterization" in state or history.was_recent("characterization"):
143 return None
145 return Recommendation(
146 id="characterization",
147 title="Characterize signal",
148 explanation="Identify what type of signal this is (UART, SPI, analog, etc.) to guide further analysis.",
149 urgency=0.95,
150 ease=0.90,
151 impact=0.95,
152 priority=_calculate_priority(0.95, 0.90, 0.95),
153 rationale="Signal characterization is the foundation for all other analysis",
154 result_key="characterization",
155 impact_description="Enables protocol-specific analysis and targeted measurements",
156 )
159def _recommend_anomaly_check(
160 trace: WaveformTrace,
161 state: dict, # type: ignore[type-arg]
162 history: AnalysisHistory,
163) -> Recommendation | None:
164 """Recommend anomaly detection.
166 Args:
167 trace: Waveform being analyzed.
168 state: Current analysis state.
169 history: Analysis history.
171 Returns:
172 Recommendation or None if not applicable.
173 """
174 if "anomalies" in state or history.was_recent("anomalies"): 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 return None
177 # Higher priority if characterization shows issues
178 urgency = 0.70
180 if "characterization" in state:
181 char = state["characterization"]
182 if hasattr(char, "confidence") and char.confidence < 0.8: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 urgency = 0.85
185 if "quality" in state: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 quality = state["quality"]
187 if hasattr(quality, "status") and quality.status in ["WARNING", "FAIL"]:
188 urgency = 0.90
190 return Recommendation(
191 id="anomaly_detection",
192 title="Check for anomalies",
193 explanation="Scan the signal for glitches, dropouts, noise spikes, and other problems that could affect data integrity.",
194 urgency=urgency,
195 ease=0.85,
196 impact=0.80,
197 priority=_calculate_priority(urgency, 0.85, 0.80),
198 rationale="Quality concerns detected - anomaly scan recommended",
199 result_key="anomalies",
200 impact_description="Identifies specific problem areas and their severity",
201 )
204def _recommend_quality_assessment(
205 trace: WaveformTrace,
206 state: dict, # type: ignore[type-arg]
207 history: AnalysisHistory,
208) -> Recommendation | None:
209 """Recommend data quality assessment.
211 Args:
212 trace: Waveform being analyzed.
213 state: Current analysis state.
214 history: Analysis history.
216 Returns:
217 Recommendation or None if not applicable.
218 """
219 if "quality" in state or history.was_recent("quality"): 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 return None
222 # Higher priority early in analysis
223 urgency = 0.75 if len(state) < 2 else 0.65
225 return Recommendation(
226 id="quality_assessment",
227 title="Assess data quality",
228 explanation="Verify that sample rate and resolution are adequate for reliable analysis of this signal.",
229 urgency=urgency,
230 ease=0.90,
231 impact=0.75,
232 priority=_calculate_priority(urgency, 0.90, 0.75),
233 rationale="Ensures captured data is suitable for intended analysis",
234 result_key="quality",
235 impact_description="Confirms data is good enough or identifies capture improvements needed",
236 )
239def _recommend_protocol_decode(
240 trace: WaveformTrace,
241 state: dict, # type: ignore[type-arg]
242 history: AnalysisHistory,
243) -> Recommendation | None:
244 """Recommend protocol decoding.
246 Args:
247 trace: Waveform being analyzed.
248 state: Current analysis state.
249 history: Analysis history.
251 Returns:
252 Recommendation or None if not applicable.
253 """
254 if "decode" in state or history.was_recent("decode"): 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 return None
257 # Only recommend if signal type is identified
258 if "characterization" not in state:
259 return None
261 char = state["characterization"]
263 # Check if it's a protocol signal
264 if hasattr(char, "signal_type"): 264 ↛ 290line 264 didn't jump to line 290 because the condition on line 264 was always true
265 signal_type = char.signal_type.lower()
267 if any(proto in signal_type for proto in ["uart", "spi", "i2c", "can"]):
268 confidence = getattr(char, "confidence", 0.0)
270 if confidence >= 0.7: 270 ↛ 274line 270 didn't jump to line 274 because the condition on line 270 was always true
271 urgency = 0.85
272 explanation = f"Signal identified as {char.signal_type} with high confidence. Decode to extract transmitted data."
273 else:
274 urgency = 0.60
275 explanation = f"Signal possibly {char.signal_type} but confidence is low. Decode may help verify."
277 return Recommendation(
278 id="protocol_decode",
279 title="Decode protocol data",
280 explanation=explanation,
281 urgency=urgency,
282 ease=0.80,
283 impact=0.90,
284 priority=_calculate_priority(urgency, 0.80, 0.90),
285 rationale=f"{char.signal_type} protocol detected",
286 result_key="decode",
287 impact_description="Extracts meaningful data from signal",
288 )
290 return None
293def _recommend_spectral_analysis(
294 trace: WaveformTrace,
295 state: dict, # type: ignore[type-arg]
296 history: AnalysisHistory,
297) -> Recommendation | None:
298 """Recommend spectral analysis.
300 Args:
301 trace: Waveform being analyzed.
302 state: Current analysis state.
303 history: Analysis history.
305 Returns:
306 Recommendation or None if not applicable.
307 """
308 if "spectral" in state or history.was_recent("spectral"): 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true
309 return None
311 # Recommend for analog/periodic signals
312 if "characterization" in state:
313 char = state["characterization"]
315 if hasattr(char, "signal_type"): 315 ↛ 332line 315 didn't jump to line 332 because the condition on line 315 was always true
316 signal_type = char.signal_type.lower()
318 if "analog" in signal_type or "periodic" in signal_type: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 return Recommendation(
320 id="spectral_analysis",
321 title="Perform spectral analysis",
322 explanation="Analyze frequency content to identify dominant frequencies and harmonics in this analog signal.",
323 urgency=0.65,
324 ease=0.75,
325 impact=0.80,
326 priority=_calculate_priority(0.65, 0.75, 0.80),
327 rationale="Periodic/analog signal detected",
328 result_key="spectral",
329 impact_description="Reveals frequency components and signal purity",
330 )
332 return None
335def suggest_next_steps(
336 trace: WaveformTrace,
337 *,
338 current_state: dict[str, Any] | None = None,
339 max_suggestions: int = 3,
340 include_rationale: bool = False,
341) -> list[Recommendation]:
342 """Suggest next analysis steps based on current state.
344 Provides contextual recommendations guiding users through the investigation
345 process without requiring expertise.
347 Args:
348 trace: Waveform being analyzed.
349 current_state: Current analysis state with completed steps and results.
350 max_suggestions: Maximum number of suggestions (default 3, range 2-5).
351 include_rationale: Include detailed rationale in recommendations.
353 Returns:
354 List of 2-5 recommended next steps, ranked by priority.
356 Example:
357 >>> trace = load("capture.wfm")
358 >>> state = {"characterization": char_result}
359 >>> recommendations = suggest_next_steps(trace, current_state=state)
360 >>> for rec in recommendations:
361 ... print(f"{rec.priority:.2f}: {rec.title}")
363 References:
364 DISC-008: Recommendation Engine
365 """
366 current_state = current_state or {}
368 # Extract or create analysis history
369 if "steps_completed" in current_state: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true
370 history = AnalysisHistory(
371 steps_completed=current_state["steps_completed"],
372 )
373 else:
374 history = AnalysisHistory()
376 # Generate candidate recommendations
377 candidates = []
379 # Try each recommendation generator
380 generators = [
381 _recommend_characterization,
382 _recommend_quality_assessment,
383 _recommend_anomaly_check,
384 _recommend_protocol_decode,
385 _recommend_spectral_analysis,
386 ]
388 for generator in generators:
389 rec = generator(trace, current_state, history)
390 if rec is not None:
391 candidates.append(rec)
393 # If no specific recommendations, provide escape hatch
394 if not candidates: 394 ↛ 395line 394 didn't jump to line 395 because the condition on line 394 was never true
395 candidates.append(
396 Recommendation(
397 id="basic_characterization",
398 title="Start with basic signal characterization",
399 explanation="Not sure where to start? Begin with automatic signal characterization to identify the signal type.",
400 urgency=0.50,
401 ease=0.95,
402 impact=0.85,
403 priority=_calculate_priority(0.50, 0.95, 0.85),
404 rationale="Default starting point when no analysis has been done",
405 result_key="characterization",
406 impact_description="Provides foundation for further analysis",
407 )
408 )
410 # Sort by priority (descending)
411 candidates.sort(key=lambda r: r.priority, reverse=True)
413 # Limit to max_suggestions
414 max_suggestions = max(2, min(5, max_suggestions))
415 recommendations = candidates[:max_suggestions]
417 # Remove rationale if not requested
418 if not include_rationale: 418 ↛ 422line 418 didn't jump to line 422 because the condition on line 418 was always true
419 for rec in recommendations:
420 rec.rationale = ""
422 return recommendations
425__all__ = [
426 "AnalysisHistory",
427 "Recommendation",
428 "suggest_next_steps",
429]