Coverage for src / tracekit / integrations / llm.py: 80%
555 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""LLM Integration for TraceKit.
3Provides hooks for Large Language Model integration to enable natural language
4analysis and assistance.
7Examples:
8 Basic usage with auto-selection:
10 >>> from tracekit.integrations import llm
11 >>> client = llm.get_client() # Auto-selects available provider
12 >>> response = client.chat_completion("What is signal rise time?")
14 Provider-specific usage:
16 >>> client = llm.get_client("openai", model="gpt-4")
17 >>> analysis = client.analyze_trace({"sample_rate": 1e9, "mean": 0.5})
19 With failover:
21 >>> client = llm.get_client_with_failover(
22 ... providers=["openai", "anthropic", "local"]
23 ... )
24"""
26import hashlib
27import json
28import os
29import time
30from collections.abc import Callable
31from dataclasses import dataclass, field
32from enum import Enum
33from threading import Lock
34from typing import Any, Protocol
36from tracekit.core.exceptions import TraceKitError
38# ==============================================================================
39# Cost Constants (API-020: Cost Tracking)
40# ==============================================================================
42# Pricing per 1K tokens (approximate, as of 2024)
43TOKEN_COSTS: dict[str, dict[str, float]] = {
44 "gpt-4": {"input": 0.03, "output": 0.06},
45 "gpt-4-turbo": {"input": 0.01, "output": 0.03},
46 "gpt-4o": {"input": 0.005, "output": 0.015},
47 "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
48 "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
49 "claude-3-opus": {"input": 0.015, "output": 0.075},
50 "claude-3-opus-20240229": {"input": 0.015, "output": 0.075},
51 "claude-3-sonnet": {"input": 0.003, "output": 0.015},
52 "claude-3-sonnet-20240229": {"input": 0.003, "output": 0.015},
53 "claude-3-haiku": {"input": 0.00025, "output": 0.00125},
54 "claude-3-haiku-20240307": {"input": 0.00025, "output": 0.00125},
55 "claude-3-5-sonnet": {"input": 0.003, "output": 0.015},
56 "claude-3-5-sonnet-20241022": {"input": 0.003, "output": 0.015},
57 "default": {"input": 0.001, "output": 0.002},
58}
61@dataclass
62class CostTracker:
63 """Tracks API usage costs.
65 Attributes:
66 total_input_tokens: Total input tokens used across all requests
67 total_output_tokens: Total output tokens used across all requests
68 total_cost: Total estimated cost in USD
69 request_count: Number of API requests made
70 """
72 total_input_tokens: int = 0
73 total_output_tokens: int = 0
74 total_cost: float = 0.0
75 request_count: int = 0
76 _lock: Lock = field(default_factory=Lock)
78 def record(self, model: str, input_tokens: int, output_tokens: int) -> float:
79 """Record token usage and return estimated cost.
81 Args:
82 model: Model name for cost lookup
83 input_tokens: Number of input/prompt tokens
84 output_tokens: Number of output/completion tokens
86 Returns:
87 Estimated cost in USD for this request
88 """
89 # Get cost rates for model, fall back to default
90 rates = TOKEN_COSTS.get(model, TOKEN_COSTS["default"])
92 cost = input_tokens / 1000 * rates["input"] + output_tokens / 1000 * rates["output"]
94 with self._lock:
95 self.total_input_tokens += input_tokens
96 self.total_output_tokens += output_tokens
97 self.total_cost += cost
98 self.request_count += 1
100 return cost
102 def reset(self) -> None:
103 """Reset all tracking counters."""
104 with self._lock:
105 self.total_input_tokens = 0
106 self.total_output_tokens = 0
107 self.total_cost = 0.0
108 self.request_count = 0
110 def get_summary(self) -> dict[str, Any]:
111 """Get summary of usage statistics.
113 Returns:
114 Dictionary with usage statistics
115 """
116 with self._lock:
117 return {
118 "total_input_tokens": self.total_input_tokens,
119 "total_output_tokens": self.total_output_tokens,
120 "total_tokens": self.total_input_tokens + self.total_output_tokens,
121 "total_cost_usd": round(self.total_cost, 6),
122 "request_count": self.request_count,
123 "avg_cost_per_request": (
124 round(self.total_cost / self.request_count, 6)
125 if self.request_count > 0
126 else 0.0
127 ),
128 }
131class ResponseCache:
132 """Simple LRU cache for LLM responses.
134 Caches responses based on prompt hash to avoid repeated API calls
135 for identical queries. Thread-safe implementation.
136 """
138 def __init__(self, max_size: int = 100, ttl_seconds: float = 3600.0):
139 """Initialize response cache.
141 Args:
142 max_size: Maximum number of cached responses
143 ttl_seconds: Time-to-live for cache entries in seconds
144 """
145 self.max_size = max_size
146 self.ttl_seconds = ttl_seconds
147 self._cache: dict[str, tuple[Any, float]] = {}
148 self._lock = Lock()
150 def _make_key(self, prompt: str, model: str, **kwargs: Any) -> str:
151 """Create cache key from request parameters.
153 Args:
154 prompt: The prompt text
155 model: Model name
156 **kwargs: Additional parameters affecting response
158 Returns:
159 Hash key for cache lookup
160 """
161 key_data = json.dumps(
162 {"prompt": prompt, "model": model, "kwargs": sorted(kwargs.items())}, sort_keys=True
163 )
164 return hashlib.sha256(key_data.encode()).hexdigest()
166 def get(self, prompt: str, model: str, **kwargs: Any) -> Any | None:
167 """Get cached response if available and not expired.
169 Args:
170 prompt: The prompt text
171 model: Model name
172 **kwargs: Additional parameters
174 Returns:
175 Cached response or None if not found/expired
176 """
177 key = self._make_key(prompt, model, **kwargs)
179 with self._lock:
180 if key in self._cache:
181 response, timestamp = self._cache[key]
182 if time.time() - timestamp < self.ttl_seconds:
183 return response
184 # Expired entry
185 del self._cache[key]
186 return None
188 def set(self, prompt: str, model: str, response: Any, **kwargs: Any) -> None:
189 """Cache a response.
191 Args:
192 prompt: The prompt text
193 model: Model name
194 response: Response to cache
195 **kwargs: Additional parameters
196 """
197 key = self._make_key(prompt, model, **kwargs)
199 with self._lock:
200 # Evict oldest entries if at capacity
201 while len(self._cache) >= self.max_size:
202 oldest_key = min(self._cache.keys(), key=lambda k: self._cache[k][1])
203 del self._cache[oldest_key]
205 self._cache[key] = (response, time.time())
207 def clear(self) -> None:
208 """Clear all cached entries."""
209 with self._lock:
210 self._cache.clear()
212 @property
213 def size(self) -> int:
214 """Current number of cached entries."""
215 with self._lock:
216 return len(self._cache)
219# Global instances for tracking
220_global_cost_tracker = CostTracker()
221_global_response_cache = ResponseCache()
224def get_cost_tracker() -> CostTracker:
225 """Get global cost tracker instance.
227 Returns:
228 Global CostTracker for monitoring API costs
229 """
230 return _global_cost_tracker
233def get_response_cache() -> ResponseCache:
234 """Get global response cache instance.
236 Returns:
237 Global ResponseCache for caching LLM responses
238 """
239 return _global_response_cache
242class LLMProvider(Enum):
243 """Supported LLM providers."""
245 OPENAI = "openai"
246 ANTHROPIC = "anthropic"
247 LOCAL = "local"
248 CUSTOM = "custom"
251class AnalysisHook(Enum):
252 """Hook points for LLM integration."""
254 BEFORE_ANALYSIS = "before_analysis"
255 AFTER_ANALYSIS = "after_analysis"
256 ON_ERROR = "on_error"
259class RateLimiter:
260 """Rate limiter for API requests.
262 Implements token bucket algorithm for rate limiting.
263 .: Rate limiting (configurable requests/minute).
264 """
266 def __init__(self, requests_per_minute: int = 60):
267 """Initialize rate limiter.
269 Args:
270 requests_per_minute: Maximum requests allowed per minute
271 """
272 self.requests_per_minute = requests_per_minute
273 self.min_interval = 60.0 / requests_per_minute if requests_per_minute > 0 else 0
274 self.last_request_time = 0.0
275 self.lock = Lock()
277 def acquire(self) -> None:
278 """Wait if necessary to respect rate limit."""
279 if self.requests_per_minute <= 0:
280 return # No rate limiting
282 with self.lock:
283 now = time.time()
284 time_since_last = now - self.last_request_time
285 if time_since_last < self.min_interval:
286 sleep_time = self.min_interval - time_since_last
287 time.sleep(sleep_time)
288 self.last_request_time = time.time()
291@dataclass
292class LLMConfig:
293 """Configuration for LLM integration.
295 Attributes:
296 provider: LLM provider to use
297 model: Model identifier (e.g., 'gpt-4', 'claude-3-opus')
298 api_key: API key for cloud providers (optional)
299 base_url: Custom API endpoint (for local/custom providers)
300 privacy_mode: If True, no data sent to cloud (local only)
301 timeout: Request timeout in seconds
302 max_retries: Maximum retry attempts for failed requests
303 requests_per_minute: Rate limit for API requests (API-020)
304 enable_cache: If True, cache responses for repeated queries (API-020)
305 track_costs: If True, track token usage and costs (API-020)
306 """
308 provider: LLMProvider = LLMProvider.LOCAL
309 model: str = "default"
310 api_key: str | None = None
311 base_url: str | None = None
312 privacy_mode: bool = True
313 timeout: float = 30.0
314 max_retries: int = 3
315 requests_per_minute: int = 60
316 enable_cache: bool = False
317 track_costs: bool = True
320def estimate_tokens(text: str) -> int:
321 """Estimate token count for text (API-019: token counting).
323 Uses approximate character-to-token ratio. Actual count varies by model.
325 Args:
326 text: Input text to estimate tokens for
328 Returns:
329 Estimated token count (roughly 4 characters per token)
330 """
331 # Average ~4 chars per token for English text
332 return max(1, len(text) // 4)
335@dataclass
336class LLMResponse:
337 """Response from LLM query.
339 Attributes:
340 answer: Main text response
341 confidence: Confidence score (0-1) if available
342 suggested_commands: List of suggested TraceKit commands
343 metadata: Additional metadata from LLM
344 raw_response: Raw response data for debugging
345 estimated_cost: Estimated cost in USD for this request (API-020)
346 cached: Whether this response was served from cache (API-020)
347 """
349 answer: str
350 confidence: float | None = None
351 suggested_commands: list[str] = field(default_factory=list)
352 metadata: dict[str, Any] = field(default_factory=dict)
353 raw_response: dict[str, Any] | None = None
354 estimated_cost: float = 0.0
355 cached: bool = False
358class LLMClient(Protocol):
359 """Protocol for LLM client implementations."""
361 def query(self, prompt: str, context: dict[str, Any]) -> LLMResponse:
362 """Send query to LLM.
364 Args:
365 prompt: User prompt
366 context: Analysis context (trace metadata, etc.)
367 """
368 ...
370 def analyze(self, trace: Any, question: str) -> LLMResponse:
371 """Analyze trace with natural language question.
373 Args:
374 trace: Trace object
375 question: Natural language question
376 """
377 ...
379 def explain(self, measurement: Any) -> str:
380 """Explain a measurement result.
382 Args:
383 measurement: Measurement result
384 """
385 ...
388class LLMError(TraceKitError):
389 """LLM integration error."""
391 pass
394class LLMIntegration:
395 """LLM integration manager.
397 Provides hooks for LLM-assisted analysis and natural language interfaces.
398 """
400 def __init__(self, config: LLMConfig | None = None):
401 """Initialize LLM integration.
403 Args:
404 config: LLM configuration (defaults to privacy mode)
405 """
406 self.config = config or LLMConfig()
407 self._client: LLMClient | None = None
408 self._hooks: dict[AnalysisHook, list[Callable]] = { # type: ignore[type-arg]
409 AnalysisHook.BEFORE_ANALYSIS: [],
410 AnalysisHook.AFTER_ANALYSIS: [],
411 AnalysisHook.ON_ERROR: [],
412 }
414 def configure(
415 self, provider: str, model: str, api_key: str | None = None, **kwargs: Any
416 ) -> None:
417 """Configure LLM provider.
419 Args:
420 provider: Provider name ('openai', 'anthropic', 'local', 'custom')
421 model: Model identifier
422 api_key: API key for cloud providers
423 **kwargs: Additional configuration options
425 Raises:
426 LLMError: If provider is unknown
427 """
428 try:
429 provider_enum = LLMProvider(provider.lower())
430 except ValueError:
431 raise LLMError(f"Unknown provider: {provider}") # noqa: B904
433 self.config = LLMConfig(
434 provider=provider_enum,
435 model=model,
436 api_key=api_key,
437 base_url=kwargs.get("base_url"),
438 privacy_mode=kwargs.get("privacy_mode", provider_enum == LLMProvider.LOCAL),
439 timeout=kwargs.get("timeout", 30.0),
440 max_retries=kwargs.get("max_retries", 3),
441 requests_per_minute=kwargs.get("requests_per_minute", 60),
442 )
444 # Reset client to force reinitialization
445 self._client = None
447 def _get_client(self) -> LLMClient:
448 """Get or create LLM client.
450 Returns:
451 LLM client instance
452 """
453 if self._client is None: 453 ↛ 455line 453 didn't jump to line 455 because the condition on line 453 was always true
454 self._client = self._create_client()
455 return self._client
457 def _create_client(self) -> LLMClient:
458 """Create LLM client based on configuration.
460 Returns:
461 LLM client instance
463 Raises:
464 LLMError: If client cannot be created
465 """
466 if self.config.provider == LLMProvider.OPENAI:
467 return self._create_openai_client()
468 elif self.config.provider == LLMProvider.ANTHROPIC: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true
469 return self._create_anthropic_client()
470 elif self.config.provider == LLMProvider.LOCAL: 470 ↛ 473line 470 didn't jump to line 473 because the condition on line 470 was always true
471 return self._create_local_client()
472 else:
473 raise LLMError(f"Provider not implemented: {self.config.provider.value}")
475 def _create_openai_client(self) -> LLMClient:
476 """Create OpenAI client.
478 Returns:
479 OpenAI client
481 Raises:
482 LLMError: If OpenAI package not available or configuration invalid
483 """
484 try:
485 import openai # type: ignore[import-not-found]
486 except ImportError:
487 raise LLMError( # noqa: B904
488 "OpenAI package not installed. Install with: pip install openai"
489 )
491 if not self.config.api_key:
492 raise LLMError("OpenAI API key required")
494 if self.config.privacy_mode:
495 raise LLMError("Privacy mode not compatible with OpenAI (cloud provider)")
497 return OpenAIClient(self.config)
499 def _create_anthropic_client(self) -> LLMClient:
500 """Create Anthropic client.
502 Returns:
503 Anthropic client
505 Raises:
506 LLMError: If Anthropic package not available or configuration invalid
507 """
508 try:
509 import anthropic # type: ignore[import-not-found]
510 except ImportError:
511 raise LLMError( # noqa: B904
512 "Anthropic package not installed. Install with: pip install anthropic"
513 )
515 if not self.config.api_key:
516 raise LLMError("Anthropic API key required")
518 if self.config.privacy_mode:
519 raise LLMError("Privacy mode not compatible with Anthropic (cloud provider)")
521 return AnthropicClient(self.config)
523 def _create_local_client(self) -> LLMClient:
524 """Create local LLM client.
526 Returns:
527 Local client (mock/stub for now)
528 """
529 return LocalLLMClient(self.config)
531 def register_hook(self, hook: AnalysisHook, callback: Callable) -> None: # type: ignore[type-arg]
532 """Register callback for analysis hook.
534 Args:
535 hook: Hook point
536 callback: Callback function
537 """
538 self._hooks[hook].append(callback)
540 def trigger_hook(self, hook: AnalysisHook, *args: Any, **kwargs: Any) -> None:
541 """Trigger all callbacks for a hook.
543 Args:
544 hook: Hook point
545 *args: Positional arguments for callbacks
546 **kwargs: Keyword arguments for callbacks
547 """
548 for callback in self._hooks[hook]:
549 try:
550 callback(*args, **kwargs)
551 except Exception as e:
552 # Don't let hook errors break analysis
553 print(f"Warning: Hook {hook.value} failed: {e}")
555 def prepare_context(self, trace: Any) -> dict[str, Any]:
556 """Prepare trace metadata for LLM context.
558 Args:
559 trace: Trace object
561 Returns:
562 Context dictionary with trace metadata
563 """
564 context = {
565 "type": type(trace).__name__,
566 }
568 # Extract common metadata
569 if hasattr(trace, "metadata"):
570 meta = trace.metadata
571 context.update(
572 {
573 "sample_rate": getattr(meta, "sample_rate", None), # type: ignore[dict-item]
574 "num_samples": getattr(meta, "num_samples", None), # type: ignore[dict-item]
575 "duration": getattr(meta, "duration", None), # type: ignore[dict-item]
576 }
577 )
579 # Data statistics (without sending actual data in privacy mode)
580 if hasattr(trace, "data") and not self.config.privacy_mode:
581 import numpy as np
583 data = trace.data
584 context["statistics"] = { # type: ignore[assignment]
585 "mean": float(np.mean(data)),
586 "std": float(np.std(data)),
587 "min": float(np.min(data)),
588 "max": float(np.max(data)),
589 }
590 elif self.config.privacy_mode: 590 ↛ 598line 590 didn't jump to line 598 because the condition on line 590 was always true
591 # Compute hash of data for change detection without sending data
592 if hasattr(trace, "data"):
593 import numpy as np
595 data_bytes = trace.data.tobytes()
596 context["data_hash"] = hashlib.sha256(data_bytes).hexdigest()[:16]
598 return context
600 def analyze(self, trace: Any, question: str) -> LLMResponse:
601 """Analyze trace with natural language question.
603 Args:
604 trace: Trace object
605 question: Natural language question
607 Returns:
608 LLM response with answer and suggestions
610 Raises:
611 LLMError: If analysis fails
612 """
613 self.trigger_hook(AnalysisHook.BEFORE_ANALYSIS, trace, question)
615 try:
616 client = self._get_client()
617 response = client.analyze(trace, question)
618 self.trigger_hook(AnalysisHook.AFTER_ANALYSIS, trace, response)
619 return response
621 except Exception as e:
622 self.trigger_hook(AnalysisHook.ON_ERROR, trace, question, e)
623 raise LLMError(f"LLM analysis failed: {e}") # noqa: B904
625 def explain(self, measurement: Any) -> str:
626 """Explain a measurement result.
628 Args:
629 measurement: Measurement result to explain
631 Returns:
632 Explanation text
633 """
634 client = self._get_client()
635 return client.explain(measurement)
638# Stub implementations for different providers
641class OpenAIClient:
642 """OpenAI client implementation.
644 Full implementation.:
645 - chat_completion() with retry logic
646 - analyze_trace() for trace analysis
647 - suggest_measurements() for measurement recommendations
648 - Error handling for API failures, rate limits, timeouts
649 - API key from OPENAI_API_KEY environment variable
650 """
652 def __init__(self, config: LLMConfig):
653 """Initialize OpenAI client.
655 Args:
656 config: LLM configuration
658 Raises:
659 LLMError: If openai package not available
660 """
661 self.config = config
662 self.rate_limiter = RateLimiter(config.requests_per_minute)
664 # Import and initialize OpenAI client
665 try:
666 import openai # type: ignore[ignore-without-code]
668 self._openai = openai
669 except ImportError:
670 raise LLMError( # noqa: B904
671 "OpenAI package not installed. Install with: pip install openai"
672 )
674 # Get API key from config or environment
675 api_key = config.api_key or os.environ.get("OPENAI_API_KEY")
676 if not api_key:
677 raise LLMError(
678 "OpenAI API key required. Set OPENAI_API_KEY environment variable "
679 "or pass api_key to configure()"
680 )
682 # Initialize OpenAI client
683 self.client = self._openai.OpenAI(api_key=api_key, timeout=config.timeout)
685 def chat_completion(self, messages: list[dict[str, str]], **kwargs: Any) -> LLMResponse:
686 """Send chat completion request with retry logic.
688 Full implementation with retry logic.
690 Args:
691 messages: List of message dicts with 'role' and 'content'
692 **kwargs: Additional parameters for OpenAI API
694 Returns:
695 LLM response with answer and metadata
697 Raises:
698 LLMError: If API request fails after retries
699 """
700 self.rate_limiter.acquire()
702 last_exception = None
703 for attempt in range(self.config.max_retries): 703 ↛ 769line 703 didn't jump to line 769 because the loop on line 703 didn't complete
704 try:
705 response = self.client.chat.completions.create(
706 model=self.config.model, messages=messages, **kwargs
707 )
709 # Extract response content
710 answer = response.choices[0].message.content or ""
712 # Track costs.
713 input_tokens = response.usage.prompt_tokens if response.usage else 0
714 output_tokens = response.usage.completion_tokens if response.usage else 0
715 estimated_cost = 0.0
717 if self.config.track_costs: 717 ↛ 722line 717 didn't jump to line 722 because the condition on line 717 was always true
718 estimated_cost = _global_cost_tracker.record(
719 response.model, input_tokens, output_tokens
720 )
722 return LLMResponse(
723 answer=answer,
724 confidence=None, # OpenAI doesn't provide confidence scores
725 suggested_commands=[],
726 metadata={
727 "model": response.model,
728 "usage": {
729 "prompt_tokens": input_tokens,
730 "completion_tokens": output_tokens,
731 "total_tokens": response.usage.total_tokens if response.usage else 0,
732 },
733 "finish_reason": response.choices[0].finish_reason,
734 },
735 raw_response={
736 "id": response.id,
737 "created": response.created,
738 },
739 estimated_cost=estimated_cost,
740 )
742 except self._openai.RateLimitError as e:
743 last_exception = e
744 if attempt < self.config.max_retries - 1:
745 # Exponential backoff for rate limits
746 wait_time = 2**attempt
747 time.sleep(wait_time)
748 continue
749 raise LLMError(f"OpenAI rate limit exceeded: {e}") # noqa: B904
751 except self._openai.APITimeoutError as e:
752 last_exception = e
753 if attempt < self.config.max_retries - 1:
754 time.sleep(1)
755 continue
756 raise LLMError(f"OpenAI request timeout: {e}") # noqa: B904
758 except self._openai.APIError as e:
759 last_exception = e
760 if attempt < self.config.max_retries - 1:
761 time.sleep(1)
762 continue
763 raise LLMError(f"OpenAI API error: {e}") # noqa: B904
765 except Exception as e:
766 last_exception = e
767 raise LLMError(f"OpenAI request failed: {e}") # noqa: B904
769 raise LLMError(
770 f"OpenAI request failed after {self.config.max_retries} retries: {last_exception}"
771 )
773 def analyze_trace(self, trace: Any, question: str) -> LLMResponse:
774 """Analyze trace with question.
776 Send trace summary, get insights.
778 Args:
779 trace: Trace object
780 question: Natural language question about the trace
782 Returns:
783 LLM response with analysis
784 """
785 # Prepare trace summary
786 trace_summary = self._summarize_trace(trace)
788 messages = [
789 {
790 "role": "system",
791 "content": (
792 "You are an expert in signal analysis and oscilloscope data. "
793 "Analyze the provided trace data and answer questions accurately. "
794 "Provide specific, actionable insights."
795 ),
796 },
797 {
798 "role": "user",
799 "content": f"Trace Summary:\n{trace_summary}\n\nQuestion: {question}",
800 },
801 ]
803 return self.chat_completion(messages)
805 def suggest_measurements(self, trace: Any) -> LLMResponse:
806 """Suggest measurements based on trace characteristics.
808 Recommend measurements based on trace.
810 Args:
811 trace: Trace object
813 Returns:
814 LLM response with measurement suggestions
815 """
816 trace_summary = self._summarize_trace(trace)
818 messages = [
819 {
820 "role": "system",
821 "content": (
822 "You are an expert in signal analysis. Based on trace characteristics, "
823 "suggest relevant measurements. Provide 3-5 specific measurement recommendations "
824 "with brief explanations."
825 ),
826 },
827 {
828 "role": "user",
829 "content": f"Trace Summary:\n{trace_summary}\n\nWhat measurements would be most informative for this trace?",
830 },
831 ]
833 response = self.chat_completion(messages)
835 # Try to extract suggested commands from the response
836 suggested_commands = self._extract_commands(response.answer)
837 response.suggested_commands = suggested_commands
839 return response
841 def _summarize_trace(self, trace: Any) -> str:
842 """Create a text summary of trace for LLM context.
844 Args:
845 trace: Trace object
847 Returns:
848 Text summary of trace characteristics
849 """
850 summary_parts = [f"Trace Type: {type(trace).__name__}"]
852 # Extract metadata
853 if hasattr(trace, "metadata"): 853 ↛ 863line 853 didn't jump to line 863 because the condition on line 853 was always true
854 meta = trace.metadata
855 if hasattr(meta, "sample_rate"): 855 ↛ 857line 855 didn't jump to line 857 because the condition on line 855 was always true
856 summary_parts.append(f"Sample Rate: {meta.sample_rate:.2e} Hz")
857 if hasattr(meta, "num_samples"): 857 ↛ 859line 857 didn't jump to line 859 because the condition on line 857 was always true
858 summary_parts.append(f"Number of Samples: {meta.num_samples:,}")
859 if hasattr(meta, "duration"): 859 ↛ 863line 859 didn't jump to line 863 because the condition on line 859 was always true
860 summary_parts.append(f"Duration: {meta.duration:.6f} s")
862 # Data statistics
863 if hasattr(trace, "data"): 863 ↛ 877line 863 didn't jump to line 877 because the condition on line 863 was always true
864 import numpy as np
866 data = trace.data
867 summary_parts.extend(
868 [
869 f"Mean: {np.mean(data):.6e}",
870 f"Std Dev: {np.std(data):.6e}",
871 f"Min: {np.min(data):.6e}",
872 f"Max: {np.max(data):.6e}",
873 f"Peak-to-Peak: {np.ptp(data):.6e}",
874 ]
875 )
877 return "\n".join(summary_parts)
879 def _extract_commands(self, text: str) -> list[str]:
880 """Extract suggested TraceKit commands from LLM response.
882 Args:
883 text: LLM response text
885 Returns:
886 List of extracted command strings
887 """
888 commands = []
889 # Look for common measurement names
890 measurement_keywords = [
891 "rise_time",
892 "fall_time",
893 "frequency",
894 "period",
895 "amplitude",
896 "rms",
897 "thd",
898 "snr",
899 "fft",
900 "psd",
901 "peak",
902 "duty_cycle",
903 ]
905 text_lower = text.lower()
906 for keyword in measurement_keywords:
907 if keyword in text_lower:
908 commands.append(f"measure {keyword}")
910 return commands
912 def query(self, prompt: str, context: dict[str, Any]) -> LLMResponse:
913 """Send query to LLM with context.
915 Args:
916 prompt: User prompt
917 context: Analysis context
919 Returns:
920 LLM response
921 """
922 context_str = json.dumps(context, indent=2)
923 messages = [
924 {
925 "role": "system",
926 "content": "You are a helpful assistant for signal analysis.",
927 },
928 {
929 "role": "user",
930 "content": f"Context:\n{context_str}\n\nQuery: {prompt}",
931 },
932 ]
933 return self.chat_completion(messages)
935 def analyze(self, trace: Any, question: str) -> LLMResponse:
936 """Analyze trace with natural language question.
938 Args:
939 trace: Trace object
940 question: Natural language question
942 Returns:
943 Analysis response
944 """
945 return self.analyze_trace(trace, question)
947 def explain(self, measurement: Any) -> str:
948 """Explain a measurement result.
950 Args:
951 measurement: Measurement result
953 Returns:
954 Explanation text
955 """
956 messages = [
957 {
958 "role": "system",
959 "content": "You are an expert in signal measurement interpretation. Explain measurement results clearly and concisely.",
960 },
961 {
962 "role": "user",
963 "content": f"Explain this measurement result: {measurement}",
964 },
965 ]
966 response = self.chat_completion(messages)
967 return response.answer
970class AnthropicClient:
971 """Anthropic client implementation.
973 Full implementation.:
974 - chat_completion() with retry logic
975 - analyze_trace() for trace analysis
976 - suggest_measurements() for measurement recommendations
977 - API key from ANTHROPIC_API_KEY environment variable
978 """
980 def __init__(self, config: LLMConfig):
981 """Initialize Anthropic client.
983 Args:
984 config: LLM configuration
986 Raises:
987 LLMError: If anthropic package not available
988 """
989 self.config = config
990 self.rate_limiter = RateLimiter(config.requests_per_minute)
992 # Import and initialize Anthropic client
993 try:
994 import anthropic # type: ignore[ignore-without-code]
996 self._anthropic = anthropic
997 except ImportError:
998 raise LLMError( # noqa: B904
999 "Anthropic package not installed. Install with: pip install anthropic"
1000 )
1002 # Get API key from config or environment
1003 api_key = config.api_key or os.environ.get("ANTHROPIC_API_KEY")
1004 if not api_key:
1005 raise LLMError(
1006 "Anthropic API key required. Set ANTHROPIC_API_KEY environment variable "
1007 "or pass api_key to configure()"
1008 )
1010 # Initialize Anthropic client
1011 self.client = self._anthropic.Anthropic(api_key=api_key, timeout=config.timeout)
1013 def chat_completion(
1014 self, messages: list[dict[str, str]], system: str | None = None, **kwargs: Any
1015 ) -> LLMResponse:
1016 """Send chat completion request with retry logic.
1018 Full implementation with retry logic.
1020 Args:
1021 messages: List of message dicts with 'role' and 'content'
1022 system: System prompt (optional)
1023 **kwargs: Additional parameters for Anthropic API
1025 Returns:
1026 LLM response with answer and metadata
1028 Raises:
1029 LLMError: If API request fails after retries
1030 """
1031 self.rate_limiter.acquire()
1033 # Convert messages format (filter out system messages for Anthropic)
1034 user_messages = []
1035 system_message = system
1036 for msg in messages:
1037 if msg["role"] == "system" and not system_message:
1038 system_message = msg["content"]
1039 elif msg["role"] in ["user", "assistant"]: 1039 ↛ 1036line 1039 didn't jump to line 1036 because the condition on line 1039 was always true
1040 user_messages.append(msg)
1042 last_exception = None
1043 for attempt in range(self.config.max_retries): 1043 ↛ 1123line 1043 didn't jump to line 1123 because the loop on line 1043 didn't complete
1044 try:
1045 # Build request parameters
1046 request_params = {
1047 "model": self.config.model,
1048 "messages": user_messages,
1049 "max_tokens": kwargs.get("max_tokens", 1024),
1050 }
1051 if system_message:
1052 request_params["system"] = system_message
1054 # Add any additional kwargs
1055 for key in ["temperature", "top_p", "top_k"]:
1056 if key in kwargs: 1056 ↛ 1057line 1056 didn't jump to line 1057 because the condition on line 1056 was never true
1057 request_params[key] = kwargs[key]
1059 response = self.client.messages.create(**request_params)
1061 # Extract response content
1062 answer = ""
1063 for block in response.content:
1064 if hasattr(block, "text"): 1064 ↛ 1063line 1064 didn't jump to line 1063 because the condition on line 1064 was always true
1065 answer += block.text
1067 # Track costs./API-020
1068 input_tokens = response.usage.input_tokens
1069 output_tokens = response.usage.output_tokens
1070 estimated_cost = 0.0
1072 if self.config.track_costs: 1072 ↛ 1077line 1072 didn't jump to line 1077 because the condition on line 1072 was always true
1073 estimated_cost = _global_cost_tracker.record(
1074 response.model, input_tokens, output_tokens
1075 )
1077 return LLMResponse(
1078 answer=answer,
1079 confidence=None, # Anthropic doesn't provide confidence scores
1080 suggested_commands=[],
1081 metadata={
1082 "model": response.model,
1083 "usage": {
1084 "input_tokens": input_tokens,
1085 "output_tokens": output_tokens,
1086 },
1087 "stop_reason": response.stop_reason,
1088 },
1089 raw_response={
1090 "id": response.id,
1091 "type": response.type,
1092 },
1093 estimated_cost=estimated_cost,
1094 )
1096 except self._anthropic.RateLimitError as e:
1097 last_exception = e
1098 if attempt < self.config.max_retries - 1:
1099 # Exponential backoff for rate limits
1100 wait_time = 2**attempt
1101 time.sleep(wait_time)
1102 continue
1103 raise LLMError(f"Anthropic rate limit exceeded: {e}") # noqa: B904
1105 except self._anthropic.APITimeoutError as e:
1106 last_exception = e
1107 if attempt < self.config.max_retries - 1: 1107 ↛ 1110line 1107 didn't jump to line 1110 because the condition on line 1107 was always true
1108 time.sleep(1)
1109 continue
1110 raise LLMError(f"Anthropic request timeout: {e}") # noqa: B904
1112 except self._anthropic.APIError as e:
1113 last_exception = e
1114 if attempt < self.config.max_retries - 1:
1115 time.sleep(1)
1116 continue
1117 raise LLMError(f"Anthropic API error: {e}") # noqa: B904
1119 except Exception as e:
1120 last_exception = e
1121 raise LLMError(f"Anthropic request failed: {e}") # noqa: B904
1123 raise LLMError(
1124 f"Anthropic request failed after {self.config.max_retries} retries: {last_exception}"
1125 )
1127 def analyze_trace(self, trace: Any, question: str) -> LLMResponse:
1128 """Analyze trace with question.
1130 Trace analysis with Anthropic.
1132 Args:
1133 trace: Trace object
1134 question: Natural language question about the trace
1136 Returns:
1137 LLM response with analysis
1138 """
1139 # Prepare trace summary
1140 trace_summary = self._summarize_trace(trace)
1142 system_prompt = (
1143 "You are an expert in signal analysis and oscilloscope data. "
1144 "Analyze the provided trace data and answer questions accurately. "
1145 "Provide specific, actionable insights."
1146 )
1148 messages = [
1149 {
1150 "role": "user",
1151 "content": f"Trace Summary:\n{trace_summary}\n\nQuestion: {question}",
1152 },
1153 ]
1155 return self.chat_completion(messages, system=system_prompt)
1157 def suggest_measurements(self, trace: Any) -> LLMResponse:
1158 """Suggest measurements based on trace characteristics.
1160 Measurement recommendations.
1162 Args:
1163 trace: Trace object
1165 Returns:
1166 LLM response with measurement suggestions
1167 """
1168 trace_summary = self._summarize_trace(trace)
1170 system_prompt = (
1171 "You are an expert in signal analysis. Based on trace characteristics, "
1172 "suggest relevant measurements. Provide 3-5 specific measurement recommendations "
1173 "with brief explanations."
1174 )
1176 messages = [
1177 {
1178 "role": "user",
1179 "content": f"Trace Summary:\n{trace_summary}\n\nWhat measurements would be most informative for this trace?",
1180 },
1181 ]
1183 response = self.chat_completion(messages, system=system_prompt)
1185 # Try to extract suggested commands from the response
1186 suggested_commands = self._extract_commands(response.answer)
1187 response.suggested_commands = suggested_commands
1189 return response
1191 def _summarize_trace(self, trace: Any) -> str:
1192 """Create a text summary of trace for LLM context.
1194 Args:
1195 trace: Trace object
1197 Returns:
1198 Text summary of trace characteristics
1199 """
1200 summary_parts = [f"Trace Type: {type(trace).__name__}"]
1202 # Extract metadata
1203 if hasattr(trace, "metadata"): 1203 ↛ 1213line 1203 didn't jump to line 1213 because the condition on line 1203 was always true
1204 meta = trace.metadata
1205 if hasattr(meta, "sample_rate"): 1205 ↛ 1207line 1205 didn't jump to line 1207 because the condition on line 1205 was always true
1206 summary_parts.append(f"Sample Rate: {meta.sample_rate:.2e} Hz")
1207 if hasattr(meta, "num_samples"): 1207 ↛ 1209line 1207 didn't jump to line 1209 because the condition on line 1207 was always true
1208 summary_parts.append(f"Number of Samples: {meta.num_samples:,}")
1209 if hasattr(meta, "duration"): 1209 ↛ 1213line 1209 didn't jump to line 1213 because the condition on line 1209 was always true
1210 summary_parts.append(f"Duration: {meta.duration:.6f} s")
1212 # Data statistics
1213 if hasattr(trace, "data"): 1213 ↛ 1227line 1213 didn't jump to line 1227 because the condition on line 1213 was always true
1214 import numpy as np
1216 data = trace.data
1217 summary_parts.extend(
1218 [
1219 f"Mean: {np.mean(data):.6e}",
1220 f"Std Dev: {np.std(data):.6e}",
1221 f"Min: {np.min(data):.6e}",
1222 f"Max: {np.max(data):.6e}",
1223 f"Peak-to-Peak: {np.ptp(data):.6e}",
1224 ]
1225 )
1227 return "\n".join(summary_parts)
1229 def _extract_commands(self, text: str) -> list[str]:
1230 """Extract suggested TraceKit commands from LLM response.
1232 Args:
1233 text: LLM response text
1235 Returns:
1236 List of extracted command strings
1237 """
1238 commands = []
1239 # Look for common measurement names
1240 measurement_keywords = [
1241 "rise_time",
1242 "fall_time",
1243 "frequency",
1244 "period",
1245 "amplitude",
1246 "rms",
1247 "thd",
1248 "snr",
1249 "fft",
1250 "psd",
1251 "peak",
1252 "duty_cycle",
1253 ]
1255 text_lower = text.lower()
1256 for keyword in measurement_keywords:
1257 if keyword in text_lower:
1258 commands.append(f"measure {keyword}")
1260 return commands
1262 def query(self, prompt: str, context: dict[str, Any]) -> LLMResponse:
1263 """Send query to LLM with context.
1265 Args:
1266 prompt: User prompt
1267 context: Analysis context
1269 Returns:
1270 LLM response
1271 """
1272 context_str = json.dumps(context, indent=2)
1273 system_prompt = "You are a helpful assistant for signal analysis."
1274 messages = [
1275 {
1276 "role": "user",
1277 "content": f"Context:\n{context_str}\n\nQuery: {prompt}",
1278 },
1279 ]
1280 return self.chat_completion(messages, system=system_prompt)
1282 def analyze(self, trace: Any, question: str) -> LLMResponse:
1283 """Analyze trace with natural language question.
1285 Args:
1286 trace: Trace object
1287 question: Natural language question
1289 Returns:
1290 Analysis response
1291 """
1292 return self.analyze_trace(trace, question)
1294 def explain(self, measurement: Any) -> str:
1295 """Explain a measurement result.
1297 Args:
1298 measurement: Measurement result
1300 Returns:
1301 Explanation text
1302 """
1303 system_prompt = "You are an expert in signal measurement interpretation. Explain measurement results clearly and concisely."
1304 messages = [
1305 {
1306 "role": "user",
1307 "content": f"Explain this measurement result: {measurement}",
1308 },
1309 ]
1310 response = self.chat_completion(messages, system=system_prompt)
1311 return response.answer
1314class LocalLLMClient:
1315 """Local LLM client (mock implementation)."""
1317 def __init__(self, config: LLMConfig):
1318 self.config = config
1320 def query(self, prompt: str, context: dict[str, Any]) -> LLMResponse:
1321 """Mock query implementation."""
1322 return LLMResponse(
1323 answer="Local LLM not configured. This is a mock response.",
1324 confidence=0.0,
1325 suggested_commands=[],
1326 metadata={"mock": True},
1327 )
1329 def analyze(self, trace: Any, question: str) -> LLMResponse:
1330 """Mock analysis implementation."""
1331 # Simple heuristic-based responses
1332 question_lower = question.lower()
1334 if "protocol" in question_lower:
1335 return LLMResponse(
1336 answer="Unable to determine protocol without LLM. Try manual inspection.",
1337 confidence=0.0,
1338 suggested_commands=[
1339 "measure frequency",
1340 "plot $trace",
1341 ],
1342 )
1344 return LLMResponse(
1345 answer=f"Local LLM analysis not available. Question was: {question}",
1346 confidence=0.0,
1347 suggested_commands=["measure all"],
1348 )
1350 def explain(self, measurement: Any) -> str:
1351 """Mock explanation implementation."""
1352 return f"Measurement result: {measurement}. Local LLM explanation not available."
1355def get_provider(name: str, **config_kwargs: Any) -> LLMClient:
1356 """Get LLM provider by name with unified interface.
1358 get_provider(name: str) factory function.
1360 Args:
1361 name: Provider name ('openai', 'anthropic', 'local')
1362 **config_kwargs: Configuration parameters for the provider
1364 Returns:
1365 LLM client instance
1367 Raises:
1368 LLMError: If provider unknown or configuration invalid
1370 Examples:
1371 >>> # Get OpenAI provider
1372 >>> client = get_provider('openai', model='gpt-4', api_key='...')
1373 >>> response = client.analyze(trace, "What is the frequency?")
1374 >>>
1375 >>> # Get Anthropic provider with rate limiting
1376 >>> client = get_provider('anthropic', model='claude-3-opus-20240229',
1377 ... requests_per_minute=30)
1378 >>> response = client.suggest_measurements(trace)
1379 >>>
1380 >>> # Get local provider (no API key needed)
1381 >>> client = get_provider('local')
1382 >>> response = client.analyze(trace, "Analyze this signal")
1383 """
1384 try:
1385 provider_enum = LLMProvider(name.lower())
1386 except ValueError:
1387 raise LLMError( # noqa: B904
1388 f"Unknown provider: {name}. Available: {[p.value for p in LLMProvider]}"
1389 )
1391 # Build config with sensible defaults
1392 config = LLMConfig(
1393 provider=provider_enum,
1394 model=config_kwargs.get("model", "default"),
1395 api_key=config_kwargs.get("api_key"),
1396 base_url=config_kwargs.get("base_url"),
1397 privacy_mode=config_kwargs.get("privacy_mode", provider_enum == LLMProvider.LOCAL),
1398 timeout=config_kwargs.get("timeout", 30.0),
1399 max_retries=config_kwargs.get("max_retries", 3),
1400 requests_per_minute=config_kwargs.get("requests_per_minute", 60),
1401 )
1403 # Create appropriate client with graceful degradation
1404 try:
1405 if provider_enum == LLMProvider.OPENAI:
1406 return OpenAIClient(config)
1407 elif provider_enum == LLMProvider.ANTHROPIC:
1408 return AnthropicClient(config)
1409 elif provider_enum == LLMProvider.LOCAL: 1409 ↛ 1413line 1409 didn't jump to line 1413 because the condition on line 1409 was always true
1410 return LocalLLMClient(config)
1411 else:
1412 # .: Graceful degradation
1413 raise LLMError(
1414 f"Provider {name} not yet implemented. "
1415 "Falling back to local provider is recommended."
1416 )
1417 except ImportError as e:
1418 # .: Graceful degradation when API unavailable
1419 raise LLMError( # noqa: B904
1420 f"Provider {name} unavailable: {e}. "
1421 "Install the required package or use 'local' provider."
1422 )
1425# Global LLM integration instance
1426_global_llm: LLMIntegration | None = None
1429def get_llm() -> LLMIntegration:
1430 """Get global LLM integration instance.
1432 Returns:
1433 Global LLM integration instance
1434 """
1435 global _global_llm
1436 if _global_llm is None: 1436 ↛ 1438line 1436 didn't jump to line 1438 because the condition on line 1436 was always true
1437 _global_llm = LLMIntegration()
1438 return _global_llm
1441def configure(provider: str, model: str, **kwargs: Any) -> None:
1442 """Configure global LLM integration.
1444 Args:
1445 provider: Provider name
1446 model: Model identifier
1447 **kwargs: Additional configuration
1448 """
1449 llm = get_llm()
1450 llm.configure(provider, model, **kwargs)
1453def analyze(trace: Any, question: str) -> LLMResponse:
1454 """Analyze trace with LLM.
1456 Args:
1457 trace: Trace object
1458 question: Natural language question
1460 Returns:
1461 LLM response
1462 """
1463 llm = get_llm()
1464 return llm.analyze(trace, question)
1467def explain(measurement: Any) -> str:
1468 """Explain measurement with LLM.
1470 Args:
1471 measurement: Measurement result
1473 Returns:
1474 Explanation text
1475 """
1476 llm = get_llm()
1477 return llm.explain(measurement)
1480# ==============================================================================
1481# ==============================================================================
1484def get_client(provider: str | None = None, **config_kwargs: Any) -> LLMClient:
1485 """Get LLM client with optional auto-selection.
1487 get_client(provider: str) -> LLMClient.
1488 Alias for get_provider() with auto-selection support.
1490 Args:
1491 provider: Provider name ('openai', 'anthropic', 'local'), or None for auto-select
1492 **config_kwargs: Configuration parameters for the provider
1494 Returns:
1495 LLM client instance
1497 Examples:
1498 >>> # Auto-select based on available API keys
1499 >>> client = get_client()
1500 >>>
1501 >>> # Explicit provider selection
1502 >>> client = get_client("openai", model="gpt-4")
1503 """
1504 if provider is not None:
1505 return get_provider(provider, **config_kwargs)
1507 # Auto-selection: try providers in preference order
1508 return get_client_auto(**config_kwargs)
1511def get_client_auto(**config_kwargs: Any) -> LLMClient:
1512 """Automatically select an available LLM provider.
1514 Automatic provider selection based on availability.
1516 Checks for API keys in environment and returns first available provider:
1517 1. OpenAI (if OPENAI_API_KEY set)
1518 2. Anthropic (if ANTHROPIC_API_KEY set)
1519 3. Local (fallback, always available)
1521 Args:
1522 **config_kwargs: Configuration parameters for the provider
1524 Returns:
1525 LLM client instance for the first available provider
1527 Examples:
1528 >>> client = get_client_auto(model="gpt-4") # Uses OpenAI if key available
1529 """
1530 # Check for OpenAI
1531 if os.environ.get("OPENAI_API_KEY"):
1532 try:
1533 return get_provider("openai", **config_kwargs)
1534 except LLMError:
1535 pass # Fall through to next provider
1537 # Check for Anthropic
1538 if os.environ.get("ANTHROPIC_API_KEY"):
1539 try:
1540 return get_provider("anthropic", **config_kwargs)
1541 except LLMError:
1542 pass # Fall through to next provider
1544 # Default to local
1545 return get_provider("local", **config_kwargs)
1548def get_client_with_failover(
1549 providers: list[str] | None = None, **config_kwargs: Any
1550) -> "FailoverLLMClient":
1551 """Get LLM client with automatic failover between providers.
1553 Failover logic (try OpenAI, fallback to Anthropic).
1555 Args:
1556 providers: List of provider names in preference order.
1557 Default: ["openai", "anthropic", "local"]
1558 **config_kwargs: Configuration parameters for providers
1560 Returns:
1561 FailoverLLMClient that tries providers in order
1563 Examples:
1564 >>> client = get_client_with_failover(
1565 ... providers=["openai", "anthropic"],
1566 ... model="gpt-4"
1567 ... )
1568 >>> response = client.chat_completion("Hello") # Tries OpenAI, then Anthropic
1569 """
1570 if providers is None: 1570 ↛ 1571line 1570 didn't jump to line 1571 because the condition on line 1570 was never true
1571 providers = ["openai", "anthropic", "local"]
1573 return FailoverLLMClient(providers, **config_kwargs)
1576class FailoverLLMClient:
1577 """LLM client wrapper with automatic failover between providers.
1579 .: Failover logic for provider availability.
1581 Attempts each provider in order until one succeeds. Useful for
1582 handling API outages or rate limiting gracefully.
1583 """
1585 def __init__(self, providers: list[str], **config_kwargs: Any):
1586 """Initialize failover client.
1588 Args:
1589 providers: List of provider names in preference order
1590 **config_kwargs: Configuration parameters for providers
1591 """
1592 self.providers = providers
1593 self.config_kwargs = config_kwargs
1594 self._clients: dict[str, LLMClient] = {}
1595 self._last_successful_provider: str | None = None
1597 def _get_or_create_client(self, provider: str) -> LLMClient | None:
1598 """Get or create client for provider.
1600 Args:
1601 provider: Provider name
1603 Returns:
1604 LLM client or None if unavailable
1605 """
1606 if provider not in self._clients:
1607 try:
1608 self._clients[provider] = get_provider(provider, **self.config_kwargs)
1609 except LLMError:
1610 return None
1611 return self._clients.get(provider)
1613 def _try_providers(self, operation: Callable[[LLMClient], Any]) -> Any:
1614 """Try operation on each provider until one succeeds.
1616 Args:
1617 operation: Callable that takes a client and returns result
1619 Returns:
1620 Result from first successful provider
1622 Raises:
1623 LLMError: If all providers fail
1624 """
1625 errors = []
1627 # Try last successful provider first for efficiency
1628 if self._last_successful_provider:
1629 reordered = [self._last_successful_provider] + [
1630 p for p in self.providers if p != self._last_successful_provider
1631 ]
1632 else:
1633 reordered = self.providers
1635 for provider in reordered:
1636 client = self._get_or_create_client(provider)
1637 if client is None:
1638 errors.append(f"{provider}: not available")
1639 continue
1641 try:
1642 result = operation(client)
1643 self._last_successful_provider = provider
1644 return result
1645 except Exception as e:
1646 errors.append(f"{provider}: {e}")
1647 continue
1649 raise LLMError(f"All providers failed: {'; '.join(errors)}")
1651 def chat_completion(
1652 self,
1653 prompt: str,
1654 model: str | None = None,
1655 **kwargs: Any,
1656 ) -> str:
1657 """Send chat completion with failover.
1659 Args:
1660 prompt: User prompt
1661 model: Model name (optional, uses config default)
1662 **kwargs: Additional parameters
1664 Returns:
1665 Response text from first successful provider
1666 """
1668 def operation(client: LLMClient) -> str:
1669 if hasattr(client, "chat_completion"): 1669 ↛ 1670line 1669 didn't jump to line 1670 because the condition on line 1669 was never true
1670 messages = [{"role": "user", "content": prompt}]
1671 response = client.chat_completion(messages, **kwargs) # type: ignore[ignore-without-code]
1672 return response.answer # type: ignore[no-any-return]
1673 else:
1674 response = client.query(prompt, {})
1675 return response.answer
1677 return self._try_providers(operation) # type: ignore[no-any-return]
1679 def analyze_trace(self, trace_data: dict[str, Any]) -> dict[str, Any]:
1680 """Analyze trace data with failover.
1682 Args:
1683 trace_data: Dictionary containing trace information
1685 Returns:
1686 Analysis results dictionary
1687 """
1689 def operation(client: LLMClient) -> dict[str, Any]:
1690 # Create mock trace object from dict
1691 class DictTrace:
1692 def __init__(self, data: dict[str, Any]):
1693 self._data = data
1694 for k, v in data.items():
1695 setattr(self, k, v)
1697 trace = DictTrace(trace_data)
1699 if hasattr(client, "analyze_trace"): 1699 ↛ 1700line 1699 didn't jump to line 1700 because the condition on line 1699 was never true
1700 response = client.analyze_trace(trace, "Analyze this signal") # type: ignore[ignore-without-code]
1701 else:
1702 response = client.analyze(trace, "Analyze this signal")
1704 return {
1705 "answer": response.answer,
1706 "suggested_commands": response.suggested_commands,
1707 "metadata": response.metadata,
1708 }
1710 return self._try_providers(operation) # type: ignore[no-any-return]
1712 def suggest_measurements(self, signal_characteristics: dict[str, Any]) -> list[str]:
1713 """Suggest measurements based on signal characteristics.
1715 Args:
1716 signal_characteristics: Dictionary describing the signal
1718 Returns:
1719 List of suggested measurement names
1720 """
1722 def operation(client: LLMClient) -> list[str]:
1723 # Create mock trace from characteristics
1724 class CharTrace:
1725 def __init__(self, chars: dict[str, Any]):
1726 self.metadata = type("Meta", (), chars)()
1727 self.data = None
1729 trace = CharTrace(signal_characteristics)
1731 if hasattr(client, "suggest_measurements"): 1731 ↛ 1732line 1731 didn't jump to line 1732 because the condition on line 1731 was never true
1732 response = client.suggest_measurements(trace) # type: ignore[ignore-without-code]
1733 else:
1734 response = client.analyze(trace, "What measurements should I perform?")
1736 return response.suggested_commands # type: ignore[no-any-return]
1738 return self._try_providers(operation) # type: ignore[no-any-return]
1740 def query(self, prompt: str, context: dict[str, Any]) -> LLMResponse:
1741 """Send query with failover.
1743 Args:
1744 prompt: User prompt
1745 context: Analysis context
1747 Returns:
1748 LLM response
1749 """
1750 return self._try_providers(lambda c: c.query(prompt, context)) # type: ignore[no-any-return]
1752 def analyze(self, trace: Any, question: str) -> LLMResponse:
1753 """Analyze trace with failover.
1755 Args:
1756 trace: Trace object
1757 question: Natural language question
1759 Returns:
1760 Analysis response
1761 """
1762 return self._try_providers(lambda c: c.analyze(trace, question)) # type: ignore[no-any-return]
1764 def explain(self, measurement: Any) -> str:
1765 """Explain measurement with failover.
1767 Args:
1768 measurement: Measurement result
1770 Returns:
1771 Explanation text
1772 """
1773 return self._try_providers(lambda c: c.explain(measurement)) # type: ignore[no-any-return]
1776def is_provider_available(provider: str) -> bool:
1777 """Check if a provider is available (API key set, package installed).
1779 Check provider availability.
1781 Args:
1782 provider: Provider name to check
1784 Returns:
1785 True if provider can be initialized
1787 Examples:
1788 >>> if is_provider_available("openai"):
1789 ... client = get_client("openai")
1790 """
1791 if provider == "local":
1792 return True
1794 if provider == "openai":
1795 if not os.environ.get("OPENAI_API_KEY"):
1796 return False
1797 try:
1798 import openai # type: ignore[ignore-without-code]
1800 return True
1801 except ImportError:
1802 return False
1804 if provider == "anthropic":
1805 if not os.environ.get("ANTHROPIC_API_KEY"): 1805 ↛ 1807line 1805 didn't jump to line 1807 because the condition on line 1805 was always true
1806 return False
1807 try:
1808 import anthropic # type: ignore[ignore-without-code]
1810 return True
1811 except ImportError:
1812 return False
1814 return False
1817def list_available_providers() -> list[str]:
1818 """List all currently available LLM providers.
1820 Discover available providers.
1822 Returns:
1823 List of provider names that can be used
1825 Examples:
1826 >>> providers = list_available_providers()
1827 >>> print(providers) # ['openai', 'local'] if OpenAI key is set
1828 """
1829 return [provider.value for provider in LLMProvider if is_provider_available(provider.value)]