Coverage for src / tracekit / integrations / llm.py: 80%

555 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""LLM Integration for TraceKit. 

2 

3Provides hooks for Large Language Model integration to enable natural language 

4analysis and assistance. 

5 

6 

7Examples: 

8 Basic usage with auto-selection: 

9 

10 >>> from tracekit.integrations import llm 

11 >>> client = llm.get_client() # Auto-selects available provider 

12 >>> response = client.chat_completion("What is signal rise time?") 

13 

14 Provider-specific usage: 

15 

16 >>> client = llm.get_client("openai", model="gpt-4") 

17 >>> analysis = client.analyze_trace({"sample_rate": 1e9, "mean": 0.5}) 

18 

19 With failover: 

20 

21 >>> client = llm.get_client_with_failover( 

22 ... providers=["openai", "anthropic", "local"] 

23 ... ) 

24""" 

25 

26import hashlib 

27import json 

28import os 

29import time 

30from collections.abc import Callable 

31from dataclasses import dataclass, field 

32from enum import Enum 

33from threading import Lock 

34from typing import Any, Protocol 

35 

36from tracekit.core.exceptions import TraceKitError 

37 

38# ============================================================================== 

39# Cost Constants (API-020: Cost Tracking) 

40# ============================================================================== 

41 

42# Pricing per 1K tokens (approximate, as of 2024) 

43TOKEN_COSTS: dict[str, dict[str, float]] = { 

44 "gpt-4": {"input": 0.03, "output": 0.06}, 

45 "gpt-4-turbo": {"input": 0.01, "output": 0.03}, 

46 "gpt-4o": {"input": 0.005, "output": 0.015}, 

47 "gpt-4o-mini": {"input": 0.00015, "output": 0.0006}, 

48 "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015}, 

49 "claude-3-opus": {"input": 0.015, "output": 0.075}, 

50 "claude-3-opus-20240229": {"input": 0.015, "output": 0.075}, 

51 "claude-3-sonnet": {"input": 0.003, "output": 0.015}, 

52 "claude-3-sonnet-20240229": {"input": 0.003, "output": 0.015}, 

53 "claude-3-haiku": {"input": 0.00025, "output": 0.00125}, 

54 "claude-3-haiku-20240307": {"input": 0.00025, "output": 0.00125}, 

55 "claude-3-5-sonnet": {"input": 0.003, "output": 0.015}, 

56 "claude-3-5-sonnet-20241022": {"input": 0.003, "output": 0.015}, 

57 "default": {"input": 0.001, "output": 0.002}, 

58} 

59 

60 

61@dataclass 

62class CostTracker: 

63 """Tracks API usage costs. 

64 

65 Attributes: 

66 total_input_tokens: Total input tokens used across all requests 

67 total_output_tokens: Total output tokens used across all requests 

68 total_cost: Total estimated cost in USD 

69 request_count: Number of API requests made 

70 """ 

71 

72 total_input_tokens: int = 0 

73 total_output_tokens: int = 0 

74 total_cost: float = 0.0 

75 request_count: int = 0 

76 _lock: Lock = field(default_factory=Lock) 

77 

78 def record(self, model: str, input_tokens: int, output_tokens: int) -> float: 

79 """Record token usage and return estimated cost. 

80 

81 Args: 

82 model: Model name for cost lookup 

83 input_tokens: Number of input/prompt tokens 

84 output_tokens: Number of output/completion tokens 

85 

86 Returns: 

87 Estimated cost in USD for this request 

88 """ 

89 # Get cost rates for model, fall back to default 

90 rates = TOKEN_COSTS.get(model, TOKEN_COSTS["default"]) 

91 

92 cost = input_tokens / 1000 * rates["input"] + output_tokens / 1000 * rates["output"] 

93 

94 with self._lock: 

95 self.total_input_tokens += input_tokens 

96 self.total_output_tokens += output_tokens 

97 self.total_cost += cost 

98 self.request_count += 1 

99 

100 return cost 

101 

102 def reset(self) -> None: 

103 """Reset all tracking counters.""" 

104 with self._lock: 

105 self.total_input_tokens = 0 

106 self.total_output_tokens = 0 

107 self.total_cost = 0.0 

108 self.request_count = 0 

109 

110 def get_summary(self) -> dict[str, Any]: 

111 """Get summary of usage statistics. 

112 

113 Returns: 

114 Dictionary with usage statistics 

115 """ 

116 with self._lock: 

117 return { 

118 "total_input_tokens": self.total_input_tokens, 

119 "total_output_tokens": self.total_output_tokens, 

120 "total_tokens": self.total_input_tokens + self.total_output_tokens, 

121 "total_cost_usd": round(self.total_cost, 6), 

122 "request_count": self.request_count, 

123 "avg_cost_per_request": ( 

124 round(self.total_cost / self.request_count, 6) 

125 if self.request_count > 0 

126 else 0.0 

127 ), 

128 } 

129 

130 

131class ResponseCache: 

132 """Simple LRU cache for LLM responses. 

133 

134 Caches responses based on prompt hash to avoid repeated API calls 

135 for identical queries. Thread-safe implementation. 

136 """ 

137 

138 def __init__(self, max_size: int = 100, ttl_seconds: float = 3600.0): 

139 """Initialize response cache. 

140 

141 Args: 

142 max_size: Maximum number of cached responses 

143 ttl_seconds: Time-to-live for cache entries in seconds 

144 """ 

145 self.max_size = max_size 

146 self.ttl_seconds = ttl_seconds 

147 self._cache: dict[str, tuple[Any, float]] = {} 

148 self._lock = Lock() 

149 

150 def _make_key(self, prompt: str, model: str, **kwargs: Any) -> str: 

151 """Create cache key from request parameters. 

152 

153 Args: 

154 prompt: The prompt text 

155 model: Model name 

156 **kwargs: Additional parameters affecting response 

157 

158 Returns: 

159 Hash key for cache lookup 

160 """ 

161 key_data = json.dumps( 

162 {"prompt": prompt, "model": model, "kwargs": sorted(kwargs.items())}, sort_keys=True 

163 ) 

164 return hashlib.sha256(key_data.encode()).hexdigest() 

165 

166 def get(self, prompt: str, model: str, **kwargs: Any) -> Any | None: 

167 """Get cached response if available and not expired. 

168 

169 Args: 

170 prompt: The prompt text 

171 model: Model name 

172 **kwargs: Additional parameters 

173 

174 Returns: 

175 Cached response or None if not found/expired 

176 """ 

177 key = self._make_key(prompt, model, **kwargs) 

178 

179 with self._lock: 

180 if key in self._cache: 

181 response, timestamp = self._cache[key] 

182 if time.time() - timestamp < self.ttl_seconds: 

183 return response 

184 # Expired entry 

185 del self._cache[key] 

186 return None 

187 

188 def set(self, prompt: str, model: str, response: Any, **kwargs: Any) -> None: 

189 """Cache a response. 

190 

191 Args: 

192 prompt: The prompt text 

193 model: Model name 

194 response: Response to cache 

195 **kwargs: Additional parameters 

196 """ 

197 key = self._make_key(prompt, model, **kwargs) 

198 

199 with self._lock: 

200 # Evict oldest entries if at capacity 

201 while len(self._cache) >= self.max_size: 

202 oldest_key = min(self._cache.keys(), key=lambda k: self._cache[k][1]) 

203 del self._cache[oldest_key] 

204 

205 self._cache[key] = (response, time.time()) 

206 

207 def clear(self) -> None: 

208 """Clear all cached entries.""" 

209 with self._lock: 

210 self._cache.clear() 

211 

212 @property 

213 def size(self) -> int: 

214 """Current number of cached entries.""" 

215 with self._lock: 

216 return len(self._cache) 

217 

218 

219# Global instances for tracking 

220_global_cost_tracker = CostTracker() 

221_global_response_cache = ResponseCache() 

222 

223 

224def get_cost_tracker() -> CostTracker: 

225 """Get global cost tracker instance. 

226 

227 Returns: 

228 Global CostTracker for monitoring API costs 

229 """ 

230 return _global_cost_tracker 

231 

232 

233def get_response_cache() -> ResponseCache: 

234 """Get global response cache instance. 

235 

236 Returns: 

237 Global ResponseCache for caching LLM responses 

238 """ 

239 return _global_response_cache 

240 

241 

242class LLMProvider(Enum): 

243 """Supported LLM providers.""" 

244 

245 OPENAI = "openai" 

246 ANTHROPIC = "anthropic" 

247 LOCAL = "local" 

248 CUSTOM = "custom" 

249 

250 

251class AnalysisHook(Enum): 

252 """Hook points for LLM integration.""" 

253 

254 BEFORE_ANALYSIS = "before_analysis" 

255 AFTER_ANALYSIS = "after_analysis" 

256 ON_ERROR = "on_error" 

257 

258 

259class RateLimiter: 

260 """Rate limiter for API requests. 

261 

262 Implements token bucket algorithm for rate limiting. 

263 .: Rate limiting (configurable requests/minute). 

264 """ 

265 

266 def __init__(self, requests_per_minute: int = 60): 

267 """Initialize rate limiter. 

268 

269 Args: 

270 requests_per_minute: Maximum requests allowed per minute 

271 """ 

272 self.requests_per_minute = requests_per_minute 

273 self.min_interval = 60.0 / requests_per_minute if requests_per_minute > 0 else 0 

274 self.last_request_time = 0.0 

275 self.lock = Lock() 

276 

277 def acquire(self) -> None: 

278 """Wait if necessary to respect rate limit.""" 

279 if self.requests_per_minute <= 0: 

280 return # No rate limiting 

281 

282 with self.lock: 

283 now = time.time() 

284 time_since_last = now - self.last_request_time 

285 if time_since_last < self.min_interval: 

286 sleep_time = self.min_interval - time_since_last 

287 time.sleep(sleep_time) 

288 self.last_request_time = time.time() 

289 

290 

291@dataclass 

292class LLMConfig: 

293 """Configuration for LLM integration. 

294 

295 Attributes: 

296 provider: LLM provider to use 

297 model: Model identifier (e.g., 'gpt-4', 'claude-3-opus') 

298 api_key: API key for cloud providers (optional) 

299 base_url: Custom API endpoint (for local/custom providers) 

300 privacy_mode: If True, no data sent to cloud (local only) 

301 timeout: Request timeout in seconds 

302 max_retries: Maximum retry attempts for failed requests 

303 requests_per_minute: Rate limit for API requests (API-020) 

304 enable_cache: If True, cache responses for repeated queries (API-020) 

305 track_costs: If True, track token usage and costs (API-020) 

306 """ 

307 

308 provider: LLMProvider = LLMProvider.LOCAL 

309 model: str = "default" 

310 api_key: str | None = None 

311 base_url: str | None = None 

312 privacy_mode: bool = True 

313 timeout: float = 30.0 

314 max_retries: int = 3 

315 requests_per_minute: int = 60 

316 enable_cache: bool = False 

317 track_costs: bool = True 

318 

319 

320def estimate_tokens(text: str) -> int: 

321 """Estimate token count for text (API-019: token counting). 

322 

323 Uses approximate character-to-token ratio. Actual count varies by model. 

324 

325 Args: 

326 text: Input text to estimate tokens for 

327 

328 Returns: 

329 Estimated token count (roughly 4 characters per token) 

330 """ 

331 # Average ~4 chars per token for English text 

332 return max(1, len(text) // 4) 

333 

334 

335@dataclass 

336class LLMResponse: 

337 """Response from LLM query. 

338 

339 Attributes: 

340 answer: Main text response 

341 confidence: Confidence score (0-1) if available 

342 suggested_commands: List of suggested TraceKit commands 

343 metadata: Additional metadata from LLM 

344 raw_response: Raw response data for debugging 

345 estimated_cost: Estimated cost in USD for this request (API-020) 

346 cached: Whether this response was served from cache (API-020) 

347 """ 

348 

349 answer: str 

350 confidence: float | None = None 

351 suggested_commands: list[str] = field(default_factory=list) 

352 metadata: dict[str, Any] = field(default_factory=dict) 

353 raw_response: dict[str, Any] | None = None 

354 estimated_cost: float = 0.0 

355 cached: bool = False 

356 

357 

358class LLMClient(Protocol): 

359 """Protocol for LLM client implementations.""" 

360 

361 def query(self, prompt: str, context: dict[str, Any]) -> LLMResponse: 

362 """Send query to LLM. 

363 

364 Args: 

365 prompt: User prompt 

366 context: Analysis context (trace metadata, etc.) 

367 """ 

368 ... 

369 

370 def analyze(self, trace: Any, question: str) -> LLMResponse: 

371 """Analyze trace with natural language question. 

372 

373 Args: 

374 trace: Trace object 

375 question: Natural language question 

376 """ 

377 ... 

378 

379 def explain(self, measurement: Any) -> str: 

380 """Explain a measurement result. 

381 

382 Args: 

383 measurement: Measurement result 

384 """ 

385 ... 

386 

387 

388class LLMError(TraceKitError): 

389 """LLM integration error.""" 

390 

391 pass 

392 

393 

394class LLMIntegration: 

395 """LLM integration manager. 

396 

397 Provides hooks for LLM-assisted analysis and natural language interfaces. 

398 """ 

399 

400 def __init__(self, config: LLMConfig | None = None): 

401 """Initialize LLM integration. 

402 

403 Args: 

404 config: LLM configuration (defaults to privacy mode) 

405 """ 

406 self.config = config or LLMConfig() 

407 self._client: LLMClient | None = None 

408 self._hooks: dict[AnalysisHook, list[Callable]] = { # type: ignore[type-arg] 

409 AnalysisHook.BEFORE_ANALYSIS: [], 

410 AnalysisHook.AFTER_ANALYSIS: [], 

411 AnalysisHook.ON_ERROR: [], 

412 } 

413 

414 def configure( 

415 self, provider: str, model: str, api_key: str | None = None, **kwargs: Any 

416 ) -> None: 

417 """Configure LLM provider. 

418 

419 Args: 

420 provider: Provider name ('openai', 'anthropic', 'local', 'custom') 

421 model: Model identifier 

422 api_key: API key for cloud providers 

423 **kwargs: Additional configuration options 

424 

425 Raises: 

426 LLMError: If provider is unknown 

427 """ 

428 try: 

429 provider_enum = LLMProvider(provider.lower()) 

430 except ValueError: 

431 raise LLMError(f"Unknown provider: {provider}") # noqa: B904 

432 

433 self.config = LLMConfig( 

434 provider=provider_enum, 

435 model=model, 

436 api_key=api_key, 

437 base_url=kwargs.get("base_url"), 

438 privacy_mode=kwargs.get("privacy_mode", provider_enum == LLMProvider.LOCAL), 

439 timeout=kwargs.get("timeout", 30.0), 

440 max_retries=kwargs.get("max_retries", 3), 

441 requests_per_minute=kwargs.get("requests_per_minute", 60), 

442 ) 

443 

444 # Reset client to force reinitialization 

445 self._client = None 

446 

447 def _get_client(self) -> LLMClient: 

448 """Get or create LLM client. 

449 

450 Returns: 

451 LLM client instance 

452 """ 

453 if self._client is None: 453 ↛ 455line 453 didn't jump to line 455 because the condition on line 453 was always true

454 self._client = self._create_client() 

455 return self._client 

456 

457 def _create_client(self) -> LLMClient: 

458 """Create LLM client based on configuration. 

459 

460 Returns: 

461 LLM client instance 

462 

463 Raises: 

464 LLMError: If client cannot be created 

465 """ 

466 if self.config.provider == LLMProvider.OPENAI: 

467 return self._create_openai_client() 

468 elif self.config.provider == LLMProvider.ANTHROPIC: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true

469 return self._create_anthropic_client() 

470 elif self.config.provider == LLMProvider.LOCAL: 470 ↛ 473line 470 didn't jump to line 473 because the condition on line 470 was always true

471 return self._create_local_client() 

472 else: 

473 raise LLMError(f"Provider not implemented: {self.config.provider.value}") 

474 

475 def _create_openai_client(self) -> LLMClient: 

476 """Create OpenAI client. 

477 

478 Returns: 

479 OpenAI client 

480 

481 Raises: 

482 LLMError: If OpenAI package not available or configuration invalid 

483 """ 

484 try: 

485 import openai # type: ignore[import-not-found] 

486 except ImportError: 

487 raise LLMError( # noqa: B904 

488 "OpenAI package not installed. Install with: pip install openai" 

489 ) 

490 

491 if not self.config.api_key: 

492 raise LLMError("OpenAI API key required") 

493 

494 if self.config.privacy_mode: 

495 raise LLMError("Privacy mode not compatible with OpenAI (cloud provider)") 

496 

497 return OpenAIClient(self.config) 

498 

499 def _create_anthropic_client(self) -> LLMClient: 

500 """Create Anthropic client. 

501 

502 Returns: 

503 Anthropic client 

504 

505 Raises: 

506 LLMError: If Anthropic package not available or configuration invalid 

507 """ 

508 try: 

509 import anthropic # type: ignore[import-not-found] 

510 except ImportError: 

511 raise LLMError( # noqa: B904 

512 "Anthropic package not installed. Install with: pip install anthropic" 

513 ) 

514 

515 if not self.config.api_key: 

516 raise LLMError("Anthropic API key required") 

517 

518 if self.config.privacy_mode: 

519 raise LLMError("Privacy mode not compatible with Anthropic (cloud provider)") 

520 

521 return AnthropicClient(self.config) 

522 

523 def _create_local_client(self) -> LLMClient: 

524 """Create local LLM client. 

525 

526 Returns: 

527 Local client (mock/stub for now) 

528 """ 

529 return LocalLLMClient(self.config) 

530 

531 def register_hook(self, hook: AnalysisHook, callback: Callable) -> None: # type: ignore[type-arg] 

532 """Register callback for analysis hook. 

533 

534 Args: 

535 hook: Hook point 

536 callback: Callback function 

537 """ 

538 self._hooks[hook].append(callback) 

539 

540 def trigger_hook(self, hook: AnalysisHook, *args: Any, **kwargs: Any) -> None: 

541 """Trigger all callbacks for a hook. 

542 

543 Args: 

544 hook: Hook point 

545 *args: Positional arguments for callbacks 

546 **kwargs: Keyword arguments for callbacks 

547 """ 

548 for callback in self._hooks[hook]: 

549 try: 

550 callback(*args, **kwargs) 

551 except Exception as e: 

552 # Don't let hook errors break analysis 

553 print(f"Warning: Hook {hook.value} failed: {e}") 

554 

555 def prepare_context(self, trace: Any) -> dict[str, Any]: 

556 """Prepare trace metadata for LLM context. 

557 

558 Args: 

559 trace: Trace object 

560 

561 Returns: 

562 Context dictionary with trace metadata 

563 """ 

564 context = { 

565 "type": type(trace).__name__, 

566 } 

567 

568 # Extract common metadata 

569 if hasattr(trace, "metadata"): 

570 meta = trace.metadata 

571 context.update( 

572 { 

573 "sample_rate": getattr(meta, "sample_rate", None), # type: ignore[dict-item] 

574 "num_samples": getattr(meta, "num_samples", None), # type: ignore[dict-item] 

575 "duration": getattr(meta, "duration", None), # type: ignore[dict-item] 

576 } 

577 ) 

578 

579 # Data statistics (without sending actual data in privacy mode) 

580 if hasattr(trace, "data") and not self.config.privacy_mode: 

581 import numpy as np 

582 

583 data = trace.data 

584 context["statistics"] = { # type: ignore[assignment] 

585 "mean": float(np.mean(data)), 

586 "std": float(np.std(data)), 

587 "min": float(np.min(data)), 

588 "max": float(np.max(data)), 

589 } 

590 elif self.config.privacy_mode: 590 ↛ 598line 590 didn't jump to line 598 because the condition on line 590 was always true

591 # Compute hash of data for change detection without sending data 

592 if hasattr(trace, "data"): 

593 import numpy as np 

594 

595 data_bytes = trace.data.tobytes() 

596 context["data_hash"] = hashlib.sha256(data_bytes).hexdigest()[:16] 

597 

598 return context 

599 

600 def analyze(self, trace: Any, question: str) -> LLMResponse: 

601 """Analyze trace with natural language question. 

602 

603 Args: 

604 trace: Trace object 

605 question: Natural language question 

606 

607 Returns: 

608 LLM response with answer and suggestions 

609 

610 Raises: 

611 LLMError: If analysis fails 

612 """ 

613 self.trigger_hook(AnalysisHook.BEFORE_ANALYSIS, trace, question) 

614 

615 try: 

616 client = self._get_client() 

617 response = client.analyze(trace, question) 

618 self.trigger_hook(AnalysisHook.AFTER_ANALYSIS, trace, response) 

619 return response 

620 

621 except Exception as e: 

622 self.trigger_hook(AnalysisHook.ON_ERROR, trace, question, e) 

623 raise LLMError(f"LLM analysis failed: {e}") # noqa: B904 

624 

625 def explain(self, measurement: Any) -> str: 

626 """Explain a measurement result. 

627 

628 Args: 

629 measurement: Measurement result to explain 

630 

631 Returns: 

632 Explanation text 

633 """ 

634 client = self._get_client() 

635 return client.explain(measurement) 

636 

637 

638# Stub implementations for different providers 

639 

640 

641class OpenAIClient: 

642 """OpenAI client implementation. 

643 

644 Full implementation.: 

645 - chat_completion() with retry logic 

646 - analyze_trace() for trace analysis 

647 - suggest_measurements() for measurement recommendations 

648 - Error handling for API failures, rate limits, timeouts 

649 - API key from OPENAI_API_KEY environment variable 

650 """ 

651 

652 def __init__(self, config: LLMConfig): 

653 """Initialize OpenAI client. 

654 

655 Args: 

656 config: LLM configuration 

657 

658 Raises: 

659 LLMError: If openai package not available 

660 """ 

661 self.config = config 

662 self.rate_limiter = RateLimiter(config.requests_per_minute) 

663 

664 # Import and initialize OpenAI client 

665 try: 

666 import openai # type: ignore[ignore-without-code] 

667 

668 self._openai = openai 

669 except ImportError: 

670 raise LLMError( # noqa: B904 

671 "OpenAI package not installed. Install with: pip install openai" 

672 ) 

673 

674 # Get API key from config or environment 

675 api_key = config.api_key or os.environ.get("OPENAI_API_KEY") 

676 if not api_key: 

677 raise LLMError( 

678 "OpenAI API key required. Set OPENAI_API_KEY environment variable " 

679 "or pass api_key to configure()" 

680 ) 

681 

682 # Initialize OpenAI client 

683 self.client = self._openai.OpenAI(api_key=api_key, timeout=config.timeout) 

684 

685 def chat_completion(self, messages: list[dict[str, str]], **kwargs: Any) -> LLMResponse: 

686 """Send chat completion request with retry logic. 

687 

688 Full implementation with retry logic. 

689 

690 Args: 

691 messages: List of message dicts with 'role' and 'content' 

692 **kwargs: Additional parameters for OpenAI API 

693 

694 Returns: 

695 LLM response with answer and metadata 

696 

697 Raises: 

698 LLMError: If API request fails after retries 

699 """ 

700 self.rate_limiter.acquire() 

701 

702 last_exception = None 

703 for attempt in range(self.config.max_retries): 703 ↛ 769line 703 didn't jump to line 769 because the loop on line 703 didn't complete

704 try: 

705 response = self.client.chat.completions.create( 

706 model=self.config.model, messages=messages, **kwargs 

707 ) 

708 

709 # Extract response content 

710 answer = response.choices[0].message.content or "" 

711 

712 # Track costs. 

713 input_tokens = response.usage.prompt_tokens if response.usage else 0 

714 output_tokens = response.usage.completion_tokens if response.usage else 0 

715 estimated_cost = 0.0 

716 

717 if self.config.track_costs: 717 ↛ 722line 717 didn't jump to line 722 because the condition on line 717 was always true

718 estimated_cost = _global_cost_tracker.record( 

719 response.model, input_tokens, output_tokens 

720 ) 

721 

722 return LLMResponse( 

723 answer=answer, 

724 confidence=None, # OpenAI doesn't provide confidence scores 

725 suggested_commands=[], 

726 metadata={ 

727 "model": response.model, 

728 "usage": { 

729 "prompt_tokens": input_tokens, 

730 "completion_tokens": output_tokens, 

731 "total_tokens": response.usage.total_tokens if response.usage else 0, 

732 }, 

733 "finish_reason": response.choices[0].finish_reason, 

734 }, 

735 raw_response={ 

736 "id": response.id, 

737 "created": response.created, 

738 }, 

739 estimated_cost=estimated_cost, 

740 ) 

741 

742 except self._openai.RateLimitError as e: 

743 last_exception = e 

744 if attempt < self.config.max_retries - 1: 

745 # Exponential backoff for rate limits 

746 wait_time = 2**attempt 

747 time.sleep(wait_time) 

748 continue 

749 raise LLMError(f"OpenAI rate limit exceeded: {e}") # noqa: B904 

750 

751 except self._openai.APITimeoutError as e: 

752 last_exception = e 

753 if attempt < self.config.max_retries - 1: 

754 time.sleep(1) 

755 continue 

756 raise LLMError(f"OpenAI request timeout: {e}") # noqa: B904 

757 

758 except self._openai.APIError as e: 

759 last_exception = e 

760 if attempt < self.config.max_retries - 1: 

761 time.sleep(1) 

762 continue 

763 raise LLMError(f"OpenAI API error: {e}") # noqa: B904 

764 

765 except Exception as e: 

766 last_exception = e 

767 raise LLMError(f"OpenAI request failed: {e}") # noqa: B904 

768 

769 raise LLMError( 

770 f"OpenAI request failed after {self.config.max_retries} retries: {last_exception}" 

771 ) 

772 

773 def analyze_trace(self, trace: Any, question: str) -> LLMResponse: 

774 """Analyze trace with question. 

775 

776 Send trace summary, get insights. 

777 

778 Args: 

779 trace: Trace object 

780 question: Natural language question about the trace 

781 

782 Returns: 

783 LLM response with analysis 

784 """ 

785 # Prepare trace summary 

786 trace_summary = self._summarize_trace(trace) 

787 

788 messages = [ 

789 { 

790 "role": "system", 

791 "content": ( 

792 "You are an expert in signal analysis and oscilloscope data. " 

793 "Analyze the provided trace data and answer questions accurately. " 

794 "Provide specific, actionable insights." 

795 ), 

796 }, 

797 { 

798 "role": "user", 

799 "content": f"Trace Summary:\n{trace_summary}\n\nQuestion: {question}", 

800 }, 

801 ] 

802 

803 return self.chat_completion(messages) 

804 

805 def suggest_measurements(self, trace: Any) -> LLMResponse: 

806 """Suggest measurements based on trace characteristics. 

807 

808 Recommend measurements based on trace. 

809 

810 Args: 

811 trace: Trace object 

812 

813 Returns: 

814 LLM response with measurement suggestions 

815 """ 

816 trace_summary = self._summarize_trace(trace) 

817 

818 messages = [ 

819 { 

820 "role": "system", 

821 "content": ( 

822 "You are an expert in signal analysis. Based on trace characteristics, " 

823 "suggest relevant measurements. Provide 3-5 specific measurement recommendations " 

824 "with brief explanations." 

825 ), 

826 }, 

827 { 

828 "role": "user", 

829 "content": f"Trace Summary:\n{trace_summary}\n\nWhat measurements would be most informative for this trace?", 

830 }, 

831 ] 

832 

833 response = self.chat_completion(messages) 

834 

835 # Try to extract suggested commands from the response 

836 suggested_commands = self._extract_commands(response.answer) 

837 response.suggested_commands = suggested_commands 

838 

839 return response 

840 

841 def _summarize_trace(self, trace: Any) -> str: 

842 """Create a text summary of trace for LLM context. 

843 

844 Args: 

845 trace: Trace object 

846 

847 Returns: 

848 Text summary of trace characteristics 

849 """ 

850 summary_parts = [f"Trace Type: {type(trace).__name__}"] 

851 

852 # Extract metadata 

853 if hasattr(trace, "metadata"): 853 ↛ 863line 853 didn't jump to line 863 because the condition on line 853 was always true

854 meta = trace.metadata 

855 if hasattr(meta, "sample_rate"): 855 ↛ 857line 855 didn't jump to line 857 because the condition on line 855 was always true

856 summary_parts.append(f"Sample Rate: {meta.sample_rate:.2e} Hz") 

857 if hasattr(meta, "num_samples"): 857 ↛ 859line 857 didn't jump to line 859 because the condition on line 857 was always true

858 summary_parts.append(f"Number of Samples: {meta.num_samples:,}") 

859 if hasattr(meta, "duration"): 859 ↛ 863line 859 didn't jump to line 863 because the condition on line 859 was always true

860 summary_parts.append(f"Duration: {meta.duration:.6f} s") 

861 

862 # Data statistics 

863 if hasattr(trace, "data"): 863 ↛ 877line 863 didn't jump to line 877 because the condition on line 863 was always true

864 import numpy as np 

865 

866 data = trace.data 

867 summary_parts.extend( 

868 [ 

869 f"Mean: {np.mean(data):.6e}", 

870 f"Std Dev: {np.std(data):.6e}", 

871 f"Min: {np.min(data):.6e}", 

872 f"Max: {np.max(data):.6e}", 

873 f"Peak-to-Peak: {np.ptp(data):.6e}", 

874 ] 

875 ) 

876 

877 return "\n".join(summary_parts) 

878 

879 def _extract_commands(self, text: str) -> list[str]: 

880 """Extract suggested TraceKit commands from LLM response. 

881 

882 Args: 

883 text: LLM response text 

884 

885 Returns: 

886 List of extracted command strings 

887 """ 

888 commands = [] 

889 # Look for common measurement names 

890 measurement_keywords = [ 

891 "rise_time", 

892 "fall_time", 

893 "frequency", 

894 "period", 

895 "amplitude", 

896 "rms", 

897 "thd", 

898 "snr", 

899 "fft", 

900 "psd", 

901 "peak", 

902 "duty_cycle", 

903 ] 

904 

905 text_lower = text.lower() 

906 for keyword in measurement_keywords: 

907 if keyword in text_lower: 

908 commands.append(f"measure {keyword}") 

909 

910 return commands 

911 

912 def query(self, prompt: str, context: dict[str, Any]) -> LLMResponse: 

913 """Send query to LLM with context. 

914 

915 Args: 

916 prompt: User prompt 

917 context: Analysis context 

918 

919 Returns: 

920 LLM response 

921 """ 

922 context_str = json.dumps(context, indent=2) 

923 messages = [ 

924 { 

925 "role": "system", 

926 "content": "You are a helpful assistant for signal analysis.", 

927 }, 

928 { 

929 "role": "user", 

930 "content": f"Context:\n{context_str}\n\nQuery: {prompt}", 

931 }, 

932 ] 

933 return self.chat_completion(messages) 

934 

935 def analyze(self, trace: Any, question: str) -> LLMResponse: 

936 """Analyze trace with natural language question. 

937 

938 Args: 

939 trace: Trace object 

940 question: Natural language question 

941 

942 Returns: 

943 Analysis response 

944 """ 

945 return self.analyze_trace(trace, question) 

946 

947 def explain(self, measurement: Any) -> str: 

948 """Explain a measurement result. 

949 

950 Args: 

951 measurement: Measurement result 

952 

953 Returns: 

954 Explanation text 

955 """ 

956 messages = [ 

957 { 

958 "role": "system", 

959 "content": "You are an expert in signal measurement interpretation. Explain measurement results clearly and concisely.", 

960 }, 

961 { 

962 "role": "user", 

963 "content": f"Explain this measurement result: {measurement}", 

964 }, 

965 ] 

966 response = self.chat_completion(messages) 

967 return response.answer 

968 

969 

970class AnthropicClient: 

971 """Anthropic client implementation. 

972 

973 Full implementation.: 

974 - chat_completion() with retry logic 

975 - analyze_trace() for trace analysis 

976 - suggest_measurements() for measurement recommendations 

977 - API key from ANTHROPIC_API_KEY environment variable 

978 """ 

979 

980 def __init__(self, config: LLMConfig): 

981 """Initialize Anthropic client. 

982 

983 Args: 

984 config: LLM configuration 

985 

986 Raises: 

987 LLMError: If anthropic package not available 

988 """ 

989 self.config = config 

990 self.rate_limiter = RateLimiter(config.requests_per_minute) 

991 

992 # Import and initialize Anthropic client 

993 try: 

994 import anthropic # type: ignore[ignore-without-code] 

995 

996 self._anthropic = anthropic 

997 except ImportError: 

998 raise LLMError( # noqa: B904 

999 "Anthropic package not installed. Install with: pip install anthropic" 

1000 ) 

1001 

1002 # Get API key from config or environment 

1003 api_key = config.api_key or os.environ.get("ANTHROPIC_API_KEY") 

1004 if not api_key: 

1005 raise LLMError( 

1006 "Anthropic API key required. Set ANTHROPIC_API_KEY environment variable " 

1007 "or pass api_key to configure()" 

1008 ) 

1009 

1010 # Initialize Anthropic client 

1011 self.client = self._anthropic.Anthropic(api_key=api_key, timeout=config.timeout) 

1012 

1013 def chat_completion( 

1014 self, messages: list[dict[str, str]], system: str | None = None, **kwargs: Any 

1015 ) -> LLMResponse: 

1016 """Send chat completion request with retry logic. 

1017 

1018 Full implementation with retry logic. 

1019 

1020 Args: 

1021 messages: List of message dicts with 'role' and 'content' 

1022 system: System prompt (optional) 

1023 **kwargs: Additional parameters for Anthropic API 

1024 

1025 Returns: 

1026 LLM response with answer and metadata 

1027 

1028 Raises: 

1029 LLMError: If API request fails after retries 

1030 """ 

1031 self.rate_limiter.acquire() 

1032 

1033 # Convert messages format (filter out system messages for Anthropic) 

1034 user_messages = [] 

1035 system_message = system 

1036 for msg in messages: 

1037 if msg["role"] == "system" and not system_message: 

1038 system_message = msg["content"] 

1039 elif msg["role"] in ["user", "assistant"]: 1039 ↛ 1036line 1039 didn't jump to line 1036 because the condition on line 1039 was always true

1040 user_messages.append(msg) 

1041 

1042 last_exception = None 

1043 for attempt in range(self.config.max_retries): 1043 ↛ 1123line 1043 didn't jump to line 1123 because the loop on line 1043 didn't complete

1044 try: 

1045 # Build request parameters 

1046 request_params = { 

1047 "model": self.config.model, 

1048 "messages": user_messages, 

1049 "max_tokens": kwargs.get("max_tokens", 1024), 

1050 } 

1051 if system_message: 

1052 request_params["system"] = system_message 

1053 

1054 # Add any additional kwargs 

1055 for key in ["temperature", "top_p", "top_k"]: 

1056 if key in kwargs: 1056 ↛ 1057line 1056 didn't jump to line 1057 because the condition on line 1056 was never true

1057 request_params[key] = kwargs[key] 

1058 

1059 response = self.client.messages.create(**request_params) 

1060 

1061 # Extract response content 

1062 answer = "" 

1063 for block in response.content: 

1064 if hasattr(block, "text"): 1064 ↛ 1063line 1064 didn't jump to line 1063 because the condition on line 1064 was always true

1065 answer += block.text 

1066 

1067 # Track costs./API-020 

1068 input_tokens = response.usage.input_tokens 

1069 output_tokens = response.usage.output_tokens 

1070 estimated_cost = 0.0 

1071 

1072 if self.config.track_costs: 1072 ↛ 1077line 1072 didn't jump to line 1077 because the condition on line 1072 was always true

1073 estimated_cost = _global_cost_tracker.record( 

1074 response.model, input_tokens, output_tokens 

1075 ) 

1076 

1077 return LLMResponse( 

1078 answer=answer, 

1079 confidence=None, # Anthropic doesn't provide confidence scores 

1080 suggested_commands=[], 

1081 metadata={ 

1082 "model": response.model, 

1083 "usage": { 

1084 "input_tokens": input_tokens, 

1085 "output_tokens": output_tokens, 

1086 }, 

1087 "stop_reason": response.stop_reason, 

1088 }, 

1089 raw_response={ 

1090 "id": response.id, 

1091 "type": response.type, 

1092 }, 

1093 estimated_cost=estimated_cost, 

1094 ) 

1095 

1096 except self._anthropic.RateLimitError as e: 

1097 last_exception = e 

1098 if attempt < self.config.max_retries - 1: 

1099 # Exponential backoff for rate limits 

1100 wait_time = 2**attempt 

1101 time.sleep(wait_time) 

1102 continue 

1103 raise LLMError(f"Anthropic rate limit exceeded: {e}") # noqa: B904 

1104 

1105 except self._anthropic.APITimeoutError as e: 

1106 last_exception = e 

1107 if attempt < self.config.max_retries - 1: 1107 ↛ 1110line 1107 didn't jump to line 1110 because the condition on line 1107 was always true

1108 time.sleep(1) 

1109 continue 

1110 raise LLMError(f"Anthropic request timeout: {e}") # noqa: B904 

1111 

1112 except self._anthropic.APIError as e: 

1113 last_exception = e 

1114 if attempt < self.config.max_retries - 1: 

1115 time.sleep(1) 

1116 continue 

1117 raise LLMError(f"Anthropic API error: {e}") # noqa: B904 

1118 

1119 except Exception as e: 

1120 last_exception = e 

1121 raise LLMError(f"Anthropic request failed: {e}") # noqa: B904 

1122 

1123 raise LLMError( 

1124 f"Anthropic request failed after {self.config.max_retries} retries: {last_exception}" 

1125 ) 

1126 

1127 def analyze_trace(self, trace: Any, question: str) -> LLMResponse: 

1128 """Analyze trace with question. 

1129 

1130 Trace analysis with Anthropic. 

1131 

1132 Args: 

1133 trace: Trace object 

1134 question: Natural language question about the trace 

1135 

1136 Returns: 

1137 LLM response with analysis 

1138 """ 

1139 # Prepare trace summary 

1140 trace_summary = self._summarize_trace(trace) 

1141 

1142 system_prompt = ( 

1143 "You are an expert in signal analysis and oscilloscope data. " 

1144 "Analyze the provided trace data and answer questions accurately. " 

1145 "Provide specific, actionable insights." 

1146 ) 

1147 

1148 messages = [ 

1149 { 

1150 "role": "user", 

1151 "content": f"Trace Summary:\n{trace_summary}\n\nQuestion: {question}", 

1152 }, 

1153 ] 

1154 

1155 return self.chat_completion(messages, system=system_prompt) 

1156 

1157 def suggest_measurements(self, trace: Any) -> LLMResponse: 

1158 """Suggest measurements based on trace characteristics. 

1159 

1160 Measurement recommendations. 

1161 

1162 Args: 

1163 trace: Trace object 

1164 

1165 Returns: 

1166 LLM response with measurement suggestions 

1167 """ 

1168 trace_summary = self._summarize_trace(trace) 

1169 

1170 system_prompt = ( 

1171 "You are an expert in signal analysis. Based on trace characteristics, " 

1172 "suggest relevant measurements. Provide 3-5 specific measurement recommendations " 

1173 "with brief explanations." 

1174 ) 

1175 

1176 messages = [ 

1177 { 

1178 "role": "user", 

1179 "content": f"Trace Summary:\n{trace_summary}\n\nWhat measurements would be most informative for this trace?", 

1180 }, 

1181 ] 

1182 

1183 response = self.chat_completion(messages, system=system_prompt) 

1184 

1185 # Try to extract suggested commands from the response 

1186 suggested_commands = self._extract_commands(response.answer) 

1187 response.suggested_commands = suggested_commands 

1188 

1189 return response 

1190 

1191 def _summarize_trace(self, trace: Any) -> str: 

1192 """Create a text summary of trace for LLM context. 

1193 

1194 Args: 

1195 trace: Trace object 

1196 

1197 Returns: 

1198 Text summary of trace characteristics 

1199 """ 

1200 summary_parts = [f"Trace Type: {type(trace).__name__}"] 

1201 

1202 # Extract metadata 

1203 if hasattr(trace, "metadata"): 1203 ↛ 1213line 1203 didn't jump to line 1213 because the condition on line 1203 was always true

1204 meta = trace.metadata 

1205 if hasattr(meta, "sample_rate"): 1205 ↛ 1207line 1205 didn't jump to line 1207 because the condition on line 1205 was always true

1206 summary_parts.append(f"Sample Rate: {meta.sample_rate:.2e} Hz") 

1207 if hasattr(meta, "num_samples"): 1207 ↛ 1209line 1207 didn't jump to line 1209 because the condition on line 1207 was always true

1208 summary_parts.append(f"Number of Samples: {meta.num_samples:,}") 

1209 if hasattr(meta, "duration"): 1209 ↛ 1213line 1209 didn't jump to line 1213 because the condition on line 1209 was always true

1210 summary_parts.append(f"Duration: {meta.duration:.6f} s") 

1211 

1212 # Data statistics 

1213 if hasattr(trace, "data"): 1213 ↛ 1227line 1213 didn't jump to line 1227 because the condition on line 1213 was always true

1214 import numpy as np 

1215 

1216 data = trace.data 

1217 summary_parts.extend( 

1218 [ 

1219 f"Mean: {np.mean(data):.6e}", 

1220 f"Std Dev: {np.std(data):.6e}", 

1221 f"Min: {np.min(data):.6e}", 

1222 f"Max: {np.max(data):.6e}", 

1223 f"Peak-to-Peak: {np.ptp(data):.6e}", 

1224 ] 

1225 ) 

1226 

1227 return "\n".join(summary_parts) 

1228 

1229 def _extract_commands(self, text: str) -> list[str]: 

1230 """Extract suggested TraceKit commands from LLM response. 

1231 

1232 Args: 

1233 text: LLM response text 

1234 

1235 Returns: 

1236 List of extracted command strings 

1237 """ 

1238 commands = [] 

1239 # Look for common measurement names 

1240 measurement_keywords = [ 

1241 "rise_time", 

1242 "fall_time", 

1243 "frequency", 

1244 "period", 

1245 "amplitude", 

1246 "rms", 

1247 "thd", 

1248 "snr", 

1249 "fft", 

1250 "psd", 

1251 "peak", 

1252 "duty_cycle", 

1253 ] 

1254 

1255 text_lower = text.lower() 

1256 for keyword in measurement_keywords: 

1257 if keyword in text_lower: 

1258 commands.append(f"measure {keyword}") 

1259 

1260 return commands 

1261 

1262 def query(self, prompt: str, context: dict[str, Any]) -> LLMResponse: 

1263 """Send query to LLM with context. 

1264 

1265 Args: 

1266 prompt: User prompt 

1267 context: Analysis context 

1268 

1269 Returns: 

1270 LLM response 

1271 """ 

1272 context_str = json.dumps(context, indent=2) 

1273 system_prompt = "You are a helpful assistant for signal analysis." 

1274 messages = [ 

1275 { 

1276 "role": "user", 

1277 "content": f"Context:\n{context_str}\n\nQuery: {prompt}", 

1278 }, 

1279 ] 

1280 return self.chat_completion(messages, system=system_prompt) 

1281 

1282 def analyze(self, trace: Any, question: str) -> LLMResponse: 

1283 """Analyze trace with natural language question. 

1284 

1285 Args: 

1286 trace: Trace object 

1287 question: Natural language question 

1288 

1289 Returns: 

1290 Analysis response 

1291 """ 

1292 return self.analyze_trace(trace, question) 

1293 

1294 def explain(self, measurement: Any) -> str: 

1295 """Explain a measurement result. 

1296 

1297 Args: 

1298 measurement: Measurement result 

1299 

1300 Returns: 

1301 Explanation text 

1302 """ 

1303 system_prompt = "You are an expert in signal measurement interpretation. Explain measurement results clearly and concisely." 

1304 messages = [ 

1305 { 

1306 "role": "user", 

1307 "content": f"Explain this measurement result: {measurement}", 

1308 }, 

1309 ] 

1310 response = self.chat_completion(messages, system=system_prompt) 

1311 return response.answer 

1312 

1313 

1314class LocalLLMClient: 

1315 """Local LLM client (mock implementation).""" 

1316 

1317 def __init__(self, config: LLMConfig): 

1318 self.config = config 

1319 

1320 def query(self, prompt: str, context: dict[str, Any]) -> LLMResponse: 

1321 """Mock query implementation.""" 

1322 return LLMResponse( 

1323 answer="Local LLM not configured. This is a mock response.", 

1324 confidence=0.0, 

1325 suggested_commands=[], 

1326 metadata={"mock": True}, 

1327 ) 

1328 

1329 def analyze(self, trace: Any, question: str) -> LLMResponse: 

1330 """Mock analysis implementation.""" 

1331 # Simple heuristic-based responses 

1332 question_lower = question.lower() 

1333 

1334 if "protocol" in question_lower: 

1335 return LLMResponse( 

1336 answer="Unable to determine protocol without LLM. Try manual inspection.", 

1337 confidence=0.0, 

1338 suggested_commands=[ 

1339 "measure frequency", 

1340 "plot $trace", 

1341 ], 

1342 ) 

1343 

1344 return LLMResponse( 

1345 answer=f"Local LLM analysis not available. Question was: {question}", 

1346 confidence=0.0, 

1347 suggested_commands=["measure all"], 

1348 ) 

1349 

1350 def explain(self, measurement: Any) -> str: 

1351 """Mock explanation implementation.""" 

1352 return f"Measurement result: {measurement}. Local LLM explanation not available." 

1353 

1354 

1355def get_provider(name: str, **config_kwargs: Any) -> LLMClient: 

1356 """Get LLM provider by name with unified interface. 

1357 

1358 get_provider(name: str) factory function. 

1359 

1360 Args: 

1361 name: Provider name ('openai', 'anthropic', 'local') 

1362 **config_kwargs: Configuration parameters for the provider 

1363 

1364 Returns: 

1365 LLM client instance 

1366 

1367 Raises: 

1368 LLMError: If provider unknown or configuration invalid 

1369 

1370 Examples: 

1371 >>> # Get OpenAI provider 

1372 >>> client = get_provider('openai', model='gpt-4', api_key='...') 

1373 >>> response = client.analyze(trace, "What is the frequency?") 

1374 >>> 

1375 >>> # Get Anthropic provider with rate limiting 

1376 >>> client = get_provider('anthropic', model='claude-3-opus-20240229', 

1377 ... requests_per_minute=30) 

1378 >>> response = client.suggest_measurements(trace) 

1379 >>> 

1380 >>> # Get local provider (no API key needed) 

1381 >>> client = get_provider('local') 

1382 >>> response = client.analyze(trace, "Analyze this signal") 

1383 """ 

1384 try: 

1385 provider_enum = LLMProvider(name.lower()) 

1386 except ValueError: 

1387 raise LLMError( # noqa: B904 

1388 f"Unknown provider: {name}. Available: {[p.value for p in LLMProvider]}" 

1389 ) 

1390 

1391 # Build config with sensible defaults 

1392 config = LLMConfig( 

1393 provider=provider_enum, 

1394 model=config_kwargs.get("model", "default"), 

1395 api_key=config_kwargs.get("api_key"), 

1396 base_url=config_kwargs.get("base_url"), 

1397 privacy_mode=config_kwargs.get("privacy_mode", provider_enum == LLMProvider.LOCAL), 

1398 timeout=config_kwargs.get("timeout", 30.0), 

1399 max_retries=config_kwargs.get("max_retries", 3), 

1400 requests_per_minute=config_kwargs.get("requests_per_minute", 60), 

1401 ) 

1402 

1403 # Create appropriate client with graceful degradation 

1404 try: 

1405 if provider_enum == LLMProvider.OPENAI: 

1406 return OpenAIClient(config) 

1407 elif provider_enum == LLMProvider.ANTHROPIC: 

1408 return AnthropicClient(config) 

1409 elif provider_enum == LLMProvider.LOCAL: 1409 ↛ 1413line 1409 didn't jump to line 1413 because the condition on line 1409 was always true

1410 return LocalLLMClient(config) 

1411 else: 

1412 # .: Graceful degradation 

1413 raise LLMError( 

1414 f"Provider {name} not yet implemented. " 

1415 "Falling back to local provider is recommended." 

1416 ) 

1417 except ImportError as e: 

1418 # .: Graceful degradation when API unavailable 

1419 raise LLMError( # noqa: B904 

1420 f"Provider {name} unavailable: {e}. " 

1421 "Install the required package or use 'local' provider." 

1422 ) 

1423 

1424 

1425# Global LLM integration instance 

1426_global_llm: LLMIntegration | None = None 

1427 

1428 

1429def get_llm() -> LLMIntegration: 

1430 """Get global LLM integration instance. 

1431 

1432 Returns: 

1433 Global LLM integration instance 

1434 """ 

1435 global _global_llm 

1436 if _global_llm is None: 1436 ↛ 1438line 1436 didn't jump to line 1438 because the condition on line 1436 was always true

1437 _global_llm = LLMIntegration() 

1438 return _global_llm 

1439 

1440 

1441def configure(provider: str, model: str, **kwargs: Any) -> None: 

1442 """Configure global LLM integration. 

1443 

1444 Args: 

1445 provider: Provider name 

1446 model: Model identifier 

1447 **kwargs: Additional configuration 

1448 """ 

1449 llm = get_llm() 

1450 llm.configure(provider, model, **kwargs) 

1451 

1452 

1453def analyze(trace: Any, question: str) -> LLMResponse: 

1454 """Analyze trace with LLM. 

1455 

1456 Args: 

1457 trace: Trace object 

1458 question: Natural language question 

1459 

1460 Returns: 

1461 LLM response 

1462 """ 

1463 llm = get_llm() 

1464 return llm.analyze(trace, question) 

1465 

1466 

1467def explain(measurement: Any) -> str: 

1468 """Explain measurement with LLM. 

1469 

1470 Args: 

1471 measurement: Measurement result 

1472 

1473 Returns: 

1474 Explanation text 

1475 """ 

1476 llm = get_llm() 

1477 return llm.explain(measurement) 

1478 

1479 

1480# ============================================================================== 

1481# ============================================================================== 

1482 

1483 

1484def get_client(provider: str | None = None, **config_kwargs: Any) -> LLMClient: 

1485 """Get LLM client with optional auto-selection. 

1486 

1487 get_client(provider: str) -> LLMClient. 

1488 Alias for get_provider() with auto-selection support. 

1489 

1490 Args: 

1491 provider: Provider name ('openai', 'anthropic', 'local'), or None for auto-select 

1492 **config_kwargs: Configuration parameters for the provider 

1493 

1494 Returns: 

1495 LLM client instance 

1496 

1497 Examples: 

1498 >>> # Auto-select based on available API keys 

1499 >>> client = get_client() 

1500 >>> 

1501 >>> # Explicit provider selection 

1502 >>> client = get_client("openai", model="gpt-4") 

1503 """ 

1504 if provider is not None: 

1505 return get_provider(provider, **config_kwargs) 

1506 

1507 # Auto-selection: try providers in preference order 

1508 return get_client_auto(**config_kwargs) 

1509 

1510 

1511def get_client_auto(**config_kwargs: Any) -> LLMClient: 

1512 """Automatically select an available LLM provider. 

1513 

1514 Automatic provider selection based on availability. 

1515 

1516 Checks for API keys in environment and returns first available provider: 

1517 1. OpenAI (if OPENAI_API_KEY set) 

1518 2. Anthropic (if ANTHROPIC_API_KEY set) 

1519 3. Local (fallback, always available) 

1520 

1521 Args: 

1522 **config_kwargs: Configuration parameters for the provider 

1523 

1524 Returns: 

1525 LLM client instance for the first available provider 

1526 

1527 Examples: 

1528 >>> client = get_client_auto(model="gpt-4") # Uses OpenAI if key available 

1529 """ 

1530 # Check for OpenAI 

1531 if os.environ.get("OPENAI_API_KEY"): 

1532 try: 

1533 return get_provider("openai", **config_kwargs) 

1534 except LLMError: 

1535 pass # Fall through to next provider 

1536 

1537 # Check for Anthropic 

1538 if os.environ.get("ANTHROPIC_API_KEY"): 

1539 try: 

1540 return get_provider("anthropic", **config_kwargs) 

1541 except LLMError: 

1542 pass # Fall through to next provider 

1543 

1544 # Default to local 

1545 return get_provider("local", **config_kwargs) 

1546 

1547 

1548def get_client_with_failover( 

1549 providers: list[str] | None = None, **config_kwargs: Any 

1550) -> "FailoverLLMClient": 

1551 """Get LLM client with automatic failover between providers. 

1552 

1553 Failover logic (try OpenAI, fallback to Anthropic). 

1554 

1555 Args: 

1556 providers: List of provider names in preference order. 

1557 Default: ["openai", "anthropic", "local"] 

1558 **config_kwargs: Configuration parameters for providers 

1559 

1560 Returns: 

1561 FailoverLLMClient that tries providers in order 

1562 

1563 Examples: 

1564 >>> client = get_client_with_failover( 

1565 ... providers=["openai", "anthropic"], 

1566 ... model="gpt-4" 

1567 ... ) 

1568 >>> response = client.chat_completion("Hello") # Tries OpenAI, then Anthropic 

1569 """ 

1570 if providers is None: 1570 ↛ 1571line 1570 didn't jump to line 1571 because the condition on line 1570 was never true

1571 providers = ["openai", "anthropic", "local"] 

1572 

1573 return FailoverLLMClient(providers, **config_kwargs) 

1574 

1575 

1576class FailoverLLMClient: 

1577 """LLM client wrapper with automatic failover between providers. 

1578 

1579 .: Failover logic for provider availability. 

1580 

1581 Attempts each provider in order until one succeeds. Useful for 

1582 handling API outages or rate limiting gracefully. 

1583 """ 

1584 

1585 def __init__(self, providers: list[str], **config_kwargs: Any): 

1586 """Initialize failover client. 

1587 

1588 Args: 

1589 providers: List of provider names in preference order 

1590 **config_kwargs: Configuration parameters for providers 

1591 """ 

1592 self.providers = providers 

1593 self.config_kwargs = config_kwargs 

1594 self._clients: dict[str, LLMClient] = {} 

1595 self._last_successful_provider: str | None = None 

1596 

1597 def _get_or_create_client(self, provider: str) -> LLMClient | None: 

1598 """Get or create client for provider. 

1599 

1600 Args: 

1601 provider: Provider name 

1602 

1603 Returns: 

1604 LLM client or None if unavailable 

1605 """ 

1606 if provider not in self._clients: 

1607 try: 

1608 self._clients[provider] = get_provider(provider, **self.config_kwargs) 

1609 except LLMError: 

1610 return None 

1611 return self._clients.get(provider) 

1612 

1613 def _try_providers(self, operation: Callable[[LLMClient], Any]) -> Any: 

1614 """Try operation on each provider until one succeeds. 

1615 

1616 Args: 

1617 operation: Callable that takes a client and returns result 

1618 

1619 Returns: 

1620 Result from first successful provider 

1621 

1622 Raises: 

1623 LLMError: If all providers fail 

1624 """ 

1625 errors = [] 

1626 

1627 # Try last successful provider first for efficiency 

1628 if self._last_successful_provider: 

1629 reordered = [self._last_successful_provider] + [ 

1630 p for p in self.providers if p != self._last_successful_provider 

1631 ] 

1632 else: 

1633 reordered = self.providers 

1634 

1635 for provider in reordered: 

1636 client = self._get_or_create_client(provider) 

1637 if client is None: 

1638 errors.append(f"{provider}: not available") 

1639 continue 

1640 

1641 try: 

1642 result = operation(client) 

1643 self._last_successful_provider = provider 

1644 return result 

1645 except Exception as e: 

1646 errors.append(f"{provider}: {e}") 

1647 continue 

1648 

1649 raise LLMError(f"All providers failed: {'; '.join(errors)}") 

1650 

1651 def chat_completion( 

1652 self, 

1653 prompt: str, 

1654 model: str | None = None, 

1655 **kwargs: Any, 

1656 ) -> str: 

1657 """Send chat completion with failover. 

1658 

1659 Args: 

1660 prompt: User prompt 

1661 model: Model name (optional, uses config default) 

1662 **kwargs: Additional parameters 

1663 

1664 Returns: 

1665 Response text from first successful provider 

1666 """ 

1667 

1668 def operation(client: LLMClient) -> str: 

1669 if hasattr(client, "chat_completion"): 1669 ↛ 1670line 1669 didn't jump to line 1670 because the condition on line 1669 was never true

1670 messages = [{"role": "user", "content": prompt}] 

1671 response = client.chat_completion(messages, **kwargs) # type: ignore[ignore-without-code] 

1672 return response.answer # type: ignore[no-any-return] 

1673 else: 

1674 response = client.query(prompt, {}) 

1675 return response.answer 

1676 

1677 return self._try_providers(operation) # type: ignore[no-any-return] 

1678 

1679 def analyze_trace(self, trace_data: dict[str, Any]) -> dict[str, Any]: 

1680 """Analyze trace data with failover. 

1681 

1682 Args: 

1683 trace_data: Dictionary containing trace information 

1684 

1685 Returns: 

1686 Analysis results dictionary 

1687 """ 

1688 

1689 def operation(client: LLMClient) -> dict[str, Any]: 

1690 # Create mock trace object from dict 

1691 class DictTrace: 

1692 def __init__(self, data: dict[str, Any]): 

1693 self._data = data 

1694 for k, v in data.items(): 

1695 setattr(self, k, v) 

1696 

1697 trace = DictTrace(trace_data) 

1698 

1699 if hasattr(client, "analyze_trace"): 1699 ↛ 1700line 1699 didn't jump to line 1700 because the condition on line 1699 was never true

1700 response = client.analyze_trace(trace, "Analyze this signal") # type: ignore[ignore-without-code] 

1701 else: 

1702 response = client.analyze(trace, "Analyze this signal") 

1703 

1704 return { 

1705 "answer": response.answer, 

1706 "suggested_commands": response.suggested_commands, 

1707 "metadata": response.metadata, 

1708 } 

1709 

1710 return self._try_providers(operation) # type: ignore[no-any-return] 

1711 

1712 def suggest_measurements(self, signal_characteristics: dict[str, Any]) -> list[str]: 

1713 """Suggest measurements based on signal characteristics. 

1714 

1715 Args: 

1716 signal_characteristics: Dictionary describing the signal 

1717 

1718 Returns: 

1719 List of suggested measurement names 

1720 """ 

1721 

1722 def operation(client: LLMClient) -> list[str]: 

1723 # Create mock trace from characteristics 

1724 class CharTrace: 

1725 def __init__(self, chars: dict[str, Any]): 

1726 self.metadata = type("Meta", (), chars)() 

1727 self.data = None 

1728 

1729 trace = CharTrace(signal_characteristics) 

1730 

1731 if hasattr(client, "suggest_measurements"): 1731 ↛ 1732line 1731 didn't jump to line 1732 because the condition on line 1731 was never true

1732 response = client.suggest_measurements(trace) # type: ignore[ignore-without-code] 

1733 else: 

1734 response = client.analyze(trace, "What measurements should I perform?") 

1735 

1736 return response.suggested_commands # type: ignore[no-any-return] 

1737 

1738 return self._try_providers(operation) # type: ignore[no-any-return] 

1739 

1740 def query(self, prompt: str, context: dict[str, Any]) -> LLMResponse: 

1741 """Send query with failover. 

1742 

1743 Args: 

1744 prompt: User prompt 

1745 context: Analysis context 

1746 

1747 Returns: 

1748 LLM response 

1749 """ 

1750 return self._try_providers(lambda c: c.query(prompt, context)) # type: ignore[no-any-return] 

1751 

1752 def analyze(self, trace: Any, question: str) -> LLMResponse: 

1753 """Analyze trace with failover. 

1754 

1755 Args: 

1756 trace: Trace object 

1757 question: Natural language question 

1758 

1759 Returns: 

1760 Analysis response 

1761 """ 

1762 return self._try_providers(lambda c: c.analyze(trace, question)) # type: ignore[no-any-return] 

1763 

1764 def explain(self, measurement: Any) -> str: 

1765 """Explain measurement with failover. 

1766 

1767 Args: 

1768 measurement: Measurement result 

1769 

1770 Returns: 

1771 Explanation text 

1772 """ 

1773 return self._try_providers(lambda c: c.explain(measurement)) # type: ignore[no-any-return] 

1774 

1775 

1776def is_provider_available(provider: str) -> bool: 

1777 """Check if a provider is available (API key set, package installed). 

1778 

1779 Check provider availability. 

1780 

1781 Args: 

1782 provider: Provider name to check 

1783 

1784 Returns: 

1785 True if provider can be initialized 

1786 

1787 Examples: 

1788 >>> if is_provider_available("openai"): 

1789 ... client = get_client("openai") 

1790 """ 

1791 if provider == "local": 

1792 return True 

1793 

1794 if provider == "openai": 

1795 if not os.environ.get("OPENAI_API_KEY"): 

1796 return False 

1797 try: 

1798 import openai # type: ignore[ignore-without-code] 

1799 

1800 return True 

1801 except ImportError: 

1802 return False 

1803 

1804 if provider == "anthropic": 

1805 if not os.environ.get("ANTHROPIC_API_KEY"): 1805 ↛ 1807line 1805 didn't jump to line 1807 because the condition on line 1805 was always true

1806 return False 

1807 try: 

1808 import anthropic # type: ignore[ignore-without-code] 

1809 

1810 return True 

1811 except ImportError: 

1812 return False 

1813 

1814 return False 

1815 

1816 

1817def list_available_providers() -> list[str]: 

1818 """List all currently available LLM providers. 

1819 

1820 Discover available providers. 

1821 

1822 Returns: 

1823 List of provider names that can be used 

1824 

1825 Examples: 

1826 >>> providers = list_available_providers() 

1827 >>> print(providers) # ['openai', 'local'] if OpenAI key is set 

1828 """ 

1829 return [provider.value for provider in LLMProvider if is_provider_available(provider.value)]