Coverage for mcp_bridge/tools/model_invoke.py: 29%
486 statements
« prev ^ index » next coverage.py v7.10.1, created at 2026-01-10 00:20 -0500
« prev ^ index » next coverage.py v7.10.1, created at 2026-01-10 00:20 -0500
1"""
2Model invocation tools for Gemini and OpenAI.
4These tools use OAuth tokens from the token store to authenticate
5API requests to external model providers.
6"""
8import asyncio
9import base64
10import json as json_module
11import logging
12import os
13import time
14import uuid
16from mcp_bridge.config.rate_limits import get_rate_limiter
18logger = logging.getLogger(__name__)
21def _summarize_prompt(prompt: str, max_length: int = 120) -> str:
22 """
23 Generate a short summary of the prompt for logging.
25 Args:
26 prompt: The full prompt text
27 max_length: Maximum characters to include in summary
29 Returns:
30 Truncated prompt suitable for logging (single line, max_length chars)
31 """
32 if not prompt:
33 return "(empty prompt)"
35 # Normalize whitespace: collapse newlines and multiple spaces
36 clean = " ".join(prompt.split())
38 if len(clean) <= max_length:
39 return clean
41 return clean[:max_length] + "..."
44# Cache for Codex instructions (fetched from GitHub)
45_CODEX_INSTRUCTIONS_CACHE = {}
46_CODEX_INSTRUCTIONS_RELEASE_TAG = "rust-v0.77.0" # Update as needed
49async def _fetch_codex_instructions(model: str = "gpt-5.2-codex") -> str:
50 """
51 Fetch official Codex instructions from GitHub.
52 Caches results to avoid repeated fetches.
53 """
54 import httpx
56 if model in _CODEX_INSTRUCTIONS_CACHE:
57 return _CODEX_INSTRUCTIONS_CACHE[model]
59 # Map model to prompt file
60 prompt_file_map = {
61 "gpt-5.2-codex": "gpt-5.2-codex_prompt.md",
62 "gpt-5.1-codex": "gpt_5_codex_prompt.md",
63 "gpt-5.1-codex-max": "gpt_5_codex_max_prompt.md",
64 }
66 prompt_file = prompt_file_map.get(model, "gpt-5.2-codex_prompt.md")
67 url = f"https://raw.githubusercontent.com/openai/codex/{_CODEX_INSTRUCTIONS_RELEASE_TAG}/codex-rs/core/{prompt_file}"
69 try:
70 async with httpx.AsyncClient() as client:
71 response = await client.get(url, timeout=30.0)
72 response.raise_for_status()
73 instructions = response.text
74 _CODEX_INSTRUCTIONS_CACHE[model] = instructions
75 return instructions
76 except Exception as e:
77 logger.error(f"Failed to fetch Codex instructions: {e}")
78 # Return basic fallback instructions
79 return "You are Codex, based on GPT-5. You are running as a coding agent."
82# Model name mapping: user-friendly names -> Antigravity API model IDs
83# Per API spec: https://github.com/NoeFabris/opencode-antigravity-auth/blob/main/docs/ANTIGRAVITY_API_SPEC.md
84# VERIFIED GEMINI MODELS (as of 2026-01):
85# - gemini-3-flash, gemini-3-pro-high, gemini-3-pro-low
86# NOTE: Claude models should use Anthropic API directly, NOT Antigravity
87GEMINI_MODEL_MAP = {
88 # Antigravity verified Gemini models (pass-through)
89 "gemini-3-pro-low": "gemini-3-pro-low",
90 "gemini-3-pro-high": "gemini-3-pro-high",
91 "gemini-3-flash": "gemini-3-flash",
92 # Aliases for convenience
93 "gemini-flash": "gemini-3-flash",
94 "gemini-pro": "gemini-3-pro-low",
95 "gemini-3-pro": "gemini-3-pro-low",
96 "gemini": "gemini-3-pro-low", # Default gemini alias
97 # Legacy mappings (redirect to Antigravity models)
98 "gemini-2.0-flash": "gemini-3-pro-low",
99 "gemini-2.0-flash-001": "gemini-3-pro-low",
100 "gemini-2.0-pro": "gemini-3-pro-low",
101 "gemini-2.0-pro-exp": "gemini-3-pro-high",
102}
105def resolve_gemini_model(model: str) -> str:
106 """Resolve a user-friendly model name to the actual API model ID."""
107 return GEMINI_MODEL_MAP.get(model, model) # Pass through if not in map
110import httpx
111from tenacity import (
112 retry,
113 retry_if_exception,
114 stop_after_attempt,
115 wait_exponential,
116)
118from ..auth.oauth import (
119 ANTIGRAVITY_DEFAULT_PROJECT_ID,
120 ANTIGRAVITY_ENDPOINTS,
121 ANTIGRAVITY_HEADERS,
122)
123from ..auth.oauth import (
124 refresh_access_token as gemini_refresh,
125)
126from ..auth.openai_oauth import refresh_access_token as openai_refresh
127from ..auth.token_store import TokenStore
128from ..hooks.manager import get_hook_manager
130# ========================
131# SESSION & HTTP MANAGEMENT
132# ========================
134# Session cache for thinking signature persistence across multi-turn conversations
135# Key: conversation_key (or "default"), Value: session UUID
136_SESSION_CACHE: dict[str, str] = {}
138# Pooled HTTP client for connection reuse
139_HTTP_CLIENT: httpx.AsyncClient | None = None
141# Per-model semaphores for async rate limiting (uses config from ~/.stravinsky/config.json)
142_GEMINI_SEMAPHORES: dict[str, asyncio.Semaphore] = {}
145def _get_gemini_rate_limit(model: str) -> int:
146 """
147 Get configured rate limit for a Gemini model.
149 Reads from ~/.stravinsky/config.json if available, otherwise uses defaults.
151 Args:
152 model: Gemini model name (e.g., "gemini-3-flash", "gemini-3-pro-high")
154 Returns:
155 Configured concurrency limit for this model
156 """
157 rate_limiter = get_rate_limiter()
158 # Normalize model name to match config keys
159 normalized = rate_limiter._normalize_model(model)
160 return rate_limiter._limits.get(normalized, rate_limiter._limits.get("_default", 5))
163def _get_gemini_semaphore(model: str) -> asyncio.Semaphore:
164 """
165 Get or create async semaphore for Gemini model rate limiting.
167 Creates one semaphore per model type with limits from config.
168 Limits can be customized in ~/.stravinsky/config.json:
169 {
170 "rate_limits": {
171 "gemini-3-flash": 15,
172 "gemini-3-pro-high": 8
173 }
174 }
176 Args:
177 model: Gemini model name
179 Returns:
180 asyncio.Semaphore with configured limit for this model
181 """
182 if model not in _GEMINI_SEMAPHORES:
183 limit = _get_gemini_rate_limit(model)
184 _GEMINI_SEMAPHORES[model] = asyncio.Semaphore(limit)
185 logger.info(f"[RateLimit] Created semaphore for {model} with limit {limit}")
186 return _GEMINI_SEMAPHORES[model]
189def _get_session_id(conversation_key: str | None = None) -> str:
190 """
191 Get or create persistent session ID for thinking signature caching.
193 Per Antigravity API: session IDs must persist across multi-turn to maintain
194 thinking signature cache. Creating new UUID per call breaks this.
196 Args:
197 conversation_key: Optional key to scope session (e.g., per-agent)
199 Returns:
200 Stable session UUID for this conversation
201 """
202 import uuid as uuid_module # Local import workaround
204 key = conversation_key or "default"
205 if key not in _SESSION_CACHE:
206 _SESSION_CACHE[key] = str(uuid_module.uuid4())
207 return _SESSION_CACHE[key]
210def clear_session_cache() -> None:
211 """Clear session cache (for thinking recovery on error)."""
212 _SESSION_CACHE.clear()
215async def _get_http_client() -> httpx.AsyncClient:
216 """
217 Get or create pooled HTTP client for connection reuse.
219 Reusing a single client across requests improves performance
220 by maintaining connection pools.
221 """
222 global _HTTP_CLIENT
223 if _HTTP_CLIENT is None or _HTTP_CLIENT.is_closed:
224 _HTTP_CLIENT = httpx.AsyncClient(timeout=120.0)
225 return _HTTP_CLIENT
230def _extract_gemini_response(data: dict) -> str:
231 """
232 Extract text from Gemini response, handling thinking blocks.
234 Per Antigravity API, responses may contain:
235 - text: Regular response text
236 - thought: Thinking block content (when thinkingConfig enabled)
237 - thoughtSignature: Signature for caching (ignored)
239 Args:
240 data: Raw API response JSON
242 Returns:
243 Extracted text, with thinking blocks formatted as <thinking>...</thinking>
244 """
245 try:
246 # Unwrap the outer "response" envelope if present
247 inner_response = data.get("response", data)
248 candidates = inner_response.get("candidates", [])
250 if not candidates:
251 return "No response generated"
253 content = candidates[0].get("content", {})
254 parts = content.get("parts", [])
256 if not parts:
257 return "No response parts"
259 text_parts = []
260 thinking_parts = []
262 for part in parts:
263 if "thought" in part:
264 thinking_parts.append(part["thought"])
265 elif "text" in part:
266 text_parts.append(part["text"])
267 # Skip thoughtSignature parts
269 # Combine results
270 result = "".join(text_parts)
272 # Prepend thinking blocks if present
273 if thinking_parts:
274 thinking_content = "".join(thinking_parts)
275 result = f"<thinking>\n{thinking_content}\n</thinking>\n\n{result}"
277 return result if result.strip() else "No response generated"
279 except (KeyError, IndexError, TypeError) as e:
280 return f"Error parsing response: {e}"
283async def _ensure_valid_token(token_store: TokenStore, provider: str) -> str:
284 """
285 Get a valid access token, refreshing if needed.
287 Args:
288 token_store: Token store
289 provider: Provider name
291 Returns:
292 Valid access token
294 Raises:
295 ValueError: If not authenticated
296 """
297 # Check if token needs refresh (with 5 minute buffer)
298 if token_store.needs_refresh(provider, buffer_seconds=300):
299 token = token_store.get_token(provider)
301 if not token or not token.get("refresh_token"):
302 raise ValueError(
303 f"Not authenticated with {provider}. "
304 f"Run: python -m mcp_bridge.auth.cli login {provider}"
305 )
307 try:
308 if provider == "gemini":
309 result = gemini_refresh(token["refresh_token"])
310 elif provider == "openai":
311 result = openai_refresh(token["refresh_token"])
312 else:
313 raise ValueError(f"Unknown provider: {provider}")
315 # Update stored token
316 token_store.set_token(
317 provider=provider,
318 access_token=result.access_token,
319 refresh_token=result.refresh_token or token["refresh_token"],
320 expires_at=time.time() + result.expires_in,
321 )
323 return result.access_token
324 except Exception as e:
325 raise ValueError(
326 f"Token refresh failed: {e}. Run: python -m mcp_bridge.auth.cli login {provider}"
327 )
329 access_token = token_store.get_access_token(provider)
330 if not access_token:
331 raise ValueError(
332 f"Not authenticated with {provider}. "
333 f"Run: python -m mcp_bridge.auth.cli login {provider}"
334 )
336 return access_token
339def is_retryable_exception(e: Exception) -> bool:
340 """
341 Check if an exception is retryable (5xx only, NOT 429).
343 429 (Rate Limit) errors should fail fast - retrying makes the problem worse
344 by adding more requests to an already exhausted quota. The semaphore prevents
345 these in the first place, but if one slips through, we shouldn't retry.
346 """
347 if isinstance(e, httpx.HTTPStatusError):
348 # Only retry server errors (5xx), not rate limits (429)
349 return 500 <= e.response.status_code < 600
350 return False
353async def _invoke_gemini_with_api_key(
354 api_key: str,
355 prompt: str,
356 model: str = "gemini-3-flash",
357 temperature: float = 0.7,
358 max_tokens: int = 4096,
359 thinking_budget: int = 0,
360 image_path: str | None = None,
361) -> str:
362 """
363 Invoke Gemini using API key authentication (google-genai library).
365 This is an alternative to OAuth authentication that uses the official
366 google-genai Python library with a simple API key.
368 Args:
369 api_key: Gemini API key (from GEMINI_API_KEY or GOOGLE_API_KEY env var)
370 prompt: The prompt to send to Gemini
371 model: Gemini model to use (e.g., "gemini-2.0-flash-exp")
372 temperature: Sampling temperature (0.0-2.0)
373 max_tokens: Maximum tokens in response
374 thinking_budget: Tokens reserved for internal reasoning (if supported)
375 image_path: Optional path to image/PDF for vision analysis
377 Returns:
378 The model's response text.
380 Raises:
381 ImportError: If google-genai library is not installed
382 ValueError: If API request fails
383 """
384 try:
385 from google import genai
386 except ImportError:
387 raise ImportError(
388 "google-genai library not installed. Install with: pip install google-genai"
389 )
391 # Map stravinsky model names to google-genai model names
392 # google-genai uses different model naming (e.g., "gemini-2.0-flash-exp")
393 model_map = {
394 "gemini-3-flash": "gemini-2.0-flash-exp",
395 "gemini-3-pro-low": "gemini-2.0-flash-exp",
396 "gemini-3-pro-high": "gemini-exp-1206", # Experimental model
397 "gemini-flash": "gemini-2.0-flash-exp",
398 "gemini-pro": "gemini-2.0-flash-exp",
399 "gemini-3-pro": "gemini-2.0-flash-exp",
400 "gemini": "gemini-2.0-flash-exp",
401 }
402 genai_model = model_map.get(model, model) # Pass through if not in map
404 try:
405 # Initialize client with API key
406 client = genai.Client(api_key=api_key)
408 # Build generation config
409 config = {
410 "temperature": temperature,
411 "max_output_tokens": max_tokens,
412 }
414 # Add thinking budget if supported (experimental feature)
415 if thinking_budget > 0:
416 config["thinking_config"] = {
417 "thinking_budget": thinking_budget,
418 }
420 # Build contents - text prompt plus optional image
421 contents = [prompt]
423 # Add image data for vision analysis
424 if image_path:
425 from pathlib import Path
427 image_file = Path(image_path)
428 if image_file.exists():
429 # google-genai supports direct file path or base64
430 # For simplicity, use the file path directly
431 contents.append(image_file)
432 logger.info(f"[API_KEY] Added vision data: {image_path}")
434 # Generate content
435 response = client.models.generate_content(
436 model=genai_model,
437 contents=contents,
438 config=config,
439 )
441 # Extract text from response
442 if hasattr(response, 'text'):
443 return response.text
444 elif hasattr(response, 'candidates') and response.candidates:
445 # Fallback: extract from candidates
446 candidate = response.candidates[0]
447 if hasattr(candidate, 'content'):
448 parts = candidate.content.parts
449 text_parts = [part.text for part in parts if hasattr(part, 'text')]
450 return "".join(text_parts) if text_parts else "No response generated"
452 return "No response generated"
454 except Exception as e:
455 logger.error(f"API key authentication failed: {e}")
456 raise ValueError(f"Gemini API key request failed: {e}")
459@retry(
460 stop=stop_after_attempt(2), # Reduced from 5 to 2 attempts
461 wait=wait_exponential(multiplier=2, min=10, max=120), # Longer waits: 10s → 20s → 40s
462 retry=retry_if_exception(is_retryable_exception),
463 before_sleep=lambda retry_state: logger.info(
464 f"Server error, retrying in {retry_state.next_action.sleep} seconds..."
465 ),
466)
467async def invoke_gemini(
468 token_store: TokenStore,
469 prompt: str,
470 model: str = "gemini-3-flash",
471 temperature: float = 0.7,
472 max_tokens: int = 4096,
473 thinking_budget: int = 0,
474 image_path: str | None = None,
475) -> str:
476 """
477 Invoke a Gemini model with the given prompt.
479 Supports two authentication methods (API key takes precedence):
480 1. API Key: Set GEMINI_API_KEY or GOOGLE_API_KEY in environment
481 2. OAuth: Use Google OAuth via Antigravity (requires stravinsky-auth login gemini)
483 Supports vision API for image/PDF analysis when image_path is provided.
485 Args:
486 token_store: Token store for OAuth credentials
487 prompt: The prompt to send to Gemini
488 model: Gemini model to use
489 temperature: Sampling temperature (0.0-2.0)
490 max_tokens: Maximum tokens in response
491 thinking_budget: Tokens reserved for internal reasoning
492 image_path: Optional path to image/PDF for vision analysis (token optimization)
494 Returns:
495 The model's response text.
497 Raises:
498 ValueError: If not authenticated with Gemini
499 httpx.HTTPStatusError: If API request fails
500 """
501 logger.info(f"[DEBUG] invoke_gemini called, uuid module check: {uuid}")
502 # Execute pre-model invoke hooks
503 params = {
504 "prompt": prompt,
505 "model": model,
506 "temperature": temperature,
507 "max_tokens": max_tokens,
508 "thinking_budget": thinking_budget,
509 "token_store": token_store, # Pass for hooks that need model access
510 "provider": "gemini", # Identify which provider is being called
511 }
512 hook_manager = get_hook_manager()
513 params = await hook_manager.execute_pre_model_invoke(params)
515 # Update local variables from possibly modified params
516 prompt = params["prompt"]
517 model = params["model"]
518 temperature = params["temperature"]
519 max_tokens = params["max_tokens"]
520 thinking_budget = params["thinking_budget"]
522 # Extract agent context for logging (may be passed via params or original call)
523 agent_context = params.get("agent_context", {})
524 agent_type = agent_context.get("agent_type", "direct")
525 task_id = agent_context.get("task_id", "")
526 description = agent_context.get("description", "")
527 prompt_summary = _summarize_prompt(prompt)
529 # Log with agent context and prompt summary
530 logger.info(f"[{agent_type}] → {model}: {prompt_summary}")
532 # Check for API key authentication (takes precedence over OAuth)
533 api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY")
535 # USER-VISIBLE NOTIFICATION (stderr) - Shows when Gemini is invoked with auth method
536 import sys
537 task_info = f" task={task_id}" if task_id else ""
538 desc_info = f" | {description}" if description else ""
539 auth_method = "API" if api_key else "OAuth"
540 print(f"🔮 GEMINI ({auth_method}): {model} | agent={agent_type}{task_info}{desc_info}", file=sys.stderr)
542 if api_key:
543 logger.info(f"[{agent_type}] Using API key authentication (GEMINI_API_KEY)")
544 # Rate limit concurrent requests (configurable via ~/.stravinsky/config.json)
545 semaphore = _get_gemini_semaphore(model)
546 async with semaphore:
547 return await _invoke_gemini_with_api_key(
548 api_key=api_key,
549 prompt=prompt,
550 model=model,
551 temperature=temperature,
552 max_tokens=max_tokens,
553 thinking_budget=thinking_budget,
554 image_path=image_path,
555 )
557 # Fallback to OAuth authentication (Antigravity)
558 logger.info(f"[{agent_type}] Using OAuth authentication (Antigravity)")
559 # Rate limit concurrent Gemini requests (configurable via ~/.stravinsky/config.json)
560 semaphore = _get_gemini_semaphore(model)
561 async with semaphore:
562 access_token = await _ensure_valid_token(token_store, "gemini")
564 # Resolve user-friendly model name to actual API model ID
565 api_model = resolve_gemini_model(model)
567 # Use persistent session ID for thinking signature caching
568 session_id = _get_session_id()
569 project_id = os.getenv("STRAVINSKY_ANTIGRAVITY_PROJECT_ID", ANTIGRAVITY_DEFAULT_PROJECT_ID)
571 headers = {
572 "Authorization": f"Bearer {access_token}",
573 "Content-Type": "application/json",
574 **ANTIGRAVITY_HEADERS, # Include Antigravity headers
575 }
577 # Build inner request payload
578 # Per API spec: contents must include role ("user" or "model")
580 # Build parts list - text prompt plus optional image
581 parts = [{"text": prompt}]
583 # Add image data for vision analysis (token optimization for multimodal)
584 if image_path:
585 import base64
586 from pathlib import Path
588 image_file = Path(image_path)
589 if image_file.exists():
590 # Determine MIME type
591 suffix = image_file.suffix.lower()
592 mime_types = {
593 ".png": "image/png",
594 ".jpg": "image/jpeg",
595 ".jpeg": "image/jpeg",
596 ".gif": "image/gif",
597 ".webp": "image/webp",
598 ".pdf": "application/pdf",
599 }
600 mime_type = mime_types.get(suffix, "image/png")
602 # Read and base64 encode
603 image_data = base64.b64encode(image_file.read_bytes()).decode("utf-8")
605 # Add inline image data for Gemini Vision API
606 parts.append({
607 "inlineData": {
608 "mimeType": mime_type,
609 "data": image_data,
610 }
611 })
612 logger.info(f"[multimodal] Added vision data: {image_path} ({mime_type})")
614 inner_payload = {
615 "contents": [{"role": "user", "parts": parts}],
616 "generationConfig": {
617 "temperature": temperature,
618 "maxOutputTokens": max_tokens,
619 },
620 "sessionId": session_id,
621 }
623 # Add thinking budget if supported by model/API
624 if thinking_budget > 0:
625 # For Gemini 2.0+ Thinking models
626 # Per Antigravity API: use "thinkingBudget", NOT "tokenLimit"
627 inner_payload["generationConfig"]["thinkingConfig"] = {
628 "includeThoughts": True,
629 "thinkingBudget": thinking_budget,
630 }
632 # Wrap request body per reference implementation
633 try:
634 import uuid as uuid_module # Local import workaround for MCP context issue
636 request_id = f"invoke-{uuid_module.uuid4()}"
637 except Exception as e:
638 logger.error(f"UUID IMPORT FAILED: {e}")
639 raise RuntimeError(f"CUSTOM ERROR: UUID import failed: {e}")
641 wrapped_payload = {
642 "project": project_id,
643 "model": api_model,
644 "userAgent": "antigravity",
645 "requestId": request_id,
646 "request": inner_payload,
647 }
649 # Get pooled HTTP client for connection reuse
650 client = await _get_http_client()
652 # Try endpoints in fallback order with thinking recovery
653 response = None
654 last_error = None
655 max_retries = 2 # For thinking recovery
657 for retry_attempt in range(max_retries):
658 for endpoint in ANTIGRAVITY_ENDPOINTS:
659 # Reference uses: {endpoint}/v1internal:generateContent (NOT /models/{model})
660 api_url = f"{endpoint}/v1internal:generateContent"
662 try:
663 response = await client.post(
664 api_url,
665 headers=headers,
666 json=wrapped_payload,
667 timeout=120.0,
668 )
670 # 401/403 might be endpoint-specific, try next endpoint
671 if response.status_code in (401, 403):
672 logger.warning(
673 f"[Gemini] Endpoint {endpoint} returned {response.status_code}, trying next"
674 )
675 last_error = Exception(f"{response.status_code} from {endpoint}")
676 continue
678 # Check for thinking-related errors that need recovery
679 if response.status_code in (400, 500):
680 error_text = response.text.lower()
681 if "thinking" in error_text or "signature" in error_text:
682 logger.warning(
683 "[Gemini] Thinking error detected, clearing session cache and retrying"
684 )
685 clear_session_cache()
686 # Update session ID for retry
687 wrapped_payload["request"]["sessionId"] = _get_session_id()
688 last_error = Exception(f"Thinking error: {response.text[:200]}")
689 break # Break inner loop to retry with new session
691 # If we got a non-retryable response (success or 4xx client error), use it
692 if response.status_code < 500 and response.status_code != 429:
693 break
695 except httpx.TimeoutException as e:
696 last_error = e
697 continue
698 except Exception as e:
699 last_error = e
700 continue
701 else:
702 # Inner loop completed without break - no thinking recovery needed
703 break
705 # If we broke out of inner loop for thinking recovery, continue outer retry loop
706 if response and response.status_code in (400, 500):
707 continue
708 break
710 if response is None:
711 # FALLBACK: Try Claude sonnet-4.5 for agents that support it
712 agent_context = params.get("agent_context", {})
713 agent_type = agent_context.get("agent_type", "unknown")
715 if agent_type in ("dewey", "explore", "document_writer", "multimodal"):
716 logger.warning(f"[{agent_type}] Gemini failed, falling back to Claude sonnet-4.5")
717 try:
718 import subprocess
719 fallback_result = subprocess.run(
720 ["claude", "-p", prompt, "--model", "sonnet", "--output-format", "text"],
721 capture_output=True,
722 text=True,
723 timeout=120,
724 cwd=os.getcwd(),
725 )
726 if fallback_result.returncode == 0 and fallback_result.stdout.strip():
727 return fallback_result.stdout.strip()
728 except Exception as fallback_error:
729 logger.error(f"Fallback to Claude also failed: {fallback_error}")
731 raise ValueError(f"All Antigravity endpoints failed: {last_error}")
733 response.raise_for_status()
734 data = response.json()
736 # Extract text from response using thinking-aware parser
737 return _extract_gemini_response(data)
740# ========================
741# AGENTIC FUNCTION CALLING
742# ========================
744# Tool definitions for background agents
745AGENT_TOOLS = [
746 {
747 "functionDeclarations": [
748 {
749 "name": "read_file",
750 "description": "Read the contents of a file. Returns the file contents as text.",
751 "parameters": {
752 "type": "object",
753 "properties": {
754 "path": {
755 "type": "string",
756 "description": "Absolute or relative path to the file",
757 }
758 },
759 "required": ["path"],
760 },
761 },
762 {
763 "name": "list_directory",
764 "description": "List files and directories in a path",
765 "parameters": {
766 "type": "object",
767 "properties": {
768 "path": {"type": "string", "description": "Directory path to list"}
769 },
770 "required": ["path"],
771 },
772 },
773 {
774 "name": "grep_search",
775 "description": "Search for a pattern in files using ripgrep. Returns matching lines with file paths and line numbers.",
776 "parameters": {
777 "type": "object",
778 "properties": {
779 "pattern": {"type": "string", "description": "The search pattern (regex)"},
780 "path": {"type": "string", "description": "Directory or file to search in"},
781 },
782 "required": ["pattern", "path"],
783 },
784 },
785 {
786 "name": "write_file",
787 "description": "Write content to a file",
788 "parameters": {
789 "type": "object",
790 "properties": {
791 "path": {"type": "string", "description": "Path to the file to write"},
792 "content": {
793 "type": "string",
794 "description": "Content to write to the file",
795 },
796 },
797 "required": ["path", "content"],
798 },
799 },
800 ]
801 }
802]
805def _execute_tool(name: str, args: dict) -> str:
806 """Execute a tool and return the result."""
807 import subprocess
808 from pathlib import Path
810 try:
811 if name == "read_file":
812 path = Path(args["path"])
813 if not path.exists():
814 return f"Error: File not found: {path}"
815 return path.read_text()
817 elif name == "list_directory":
818 path = Path(args["path"])
819 if not path.exists():
820 return f"Error: Directory not found: {path}"
821 entries = []
822 for entry in path.iterdir():
823 entry_type = "DIR" if entry.is_dir() else "FILE"
824 entries.append(f"[{entry_type}] {entry.name}")
825 return "\n".join(entries) if entries else "(empty directory)"
827 elif name == "grep_search":
828 pattern = args["pattern"]
829 search_path = args["path"]
830 result = subprocess.run(
831 ["rg", "--json", "-m", "50", pattern, search_path],
832 capture_output=True,
833 text=True,
834 timeout=30,
835 )
836 if result.returncode == 0:
837 return result.stdout[:10000] # Limit output size
838 elif result.returncode == 1:
839 return "No matches found"
840 else:
841 return f"Search error: {result.stderr}"
843 elif name == "write_file":
844 path = Path(args["path"])
845 path.parent.mkdir(parents=True, exist_ok=True)
846 path.write_text(args["content"])
847 return f"Successfully wrote {len(args['content'])} bytes to {path}"
849 else:
850 return f"Unknown tool: {name}"
852 except Exception as e:
853 return f"Tool error: {str(e)}"
856async def _invoke_gemini_agentic_with_api_key(
857 api_key: str,
858 prompt: str,
859 model: str = "gemini-3-flash",
860 max_turns: int = 10,
861 timeout: int = 120,
862) -> str:
863 """
864 Invoke Gemini with function calling using API key authentication (google-genai library).
866 This implements a multi-turn agentic loop:
867 1. Send prompt with tool definitions
868 2. If model returns FunctionCall, execute the tool
869 3. Send FunctionResponse back to model
870 4. Repeat until model returns text or max_turns reached
872 Args:
873 api_key: Gemini API key (from GEMINI_API_KEY or GOOGLE_API_KEY env var)
874 prompt: The task prompt
875 model: Gemini model to use
876 max_turns: Maximum number of tool-use turns
877 timeout: Request timeout in seconds (currently unused by google-genai)
879 Returns:
880 Final text response from the model
882 Raises:
883 ImportError: If google-genai library is not installed
884 ValueError: If API request fails
885 """
886 # USER-VISIBLE NOTIFICATION (stderr) - Shows agentic mode with API key
887 import sys
888 print(f"🔮 GEMINI (API/Agentic): {model} | max_turns={max_turns}", file=sys.stderr)
890 try:
891 from google import genai
892 from google.genai import types
893 except ImportError:
894 raise ImportError(
895 "google-genai library not installed. Install with: pip install google-genai"
896 )
898 # Map stravinsky model names to google-genai model names
899 model_map = {
900 "gemini-3-flash": "gemini-2.0-flash-exp",
901 "gemini-3-pro-low": "gemini-2.0-flash-exp",
902 "gemini-3-pro-high": "gemini-exp-1206",
903 "gemini-flash": "gemini-2.0-flash-exp",
904 "gemini-pro": "gemini-2.0-flash-exp",
905 "gemini-3-pro": "gemini-2.0-flash-exp",
906 "gemini": "gemini-2.0-flash-exp",
907 }
908 genai_model = model_map.get(model, model)
910 # Initialize client with API key
911 client = genai.Client(api_key=api_key)
913 # Convert AGENT_TOOLS to google-genai format
914 # google-genai expects tools as a list of Tool objects containing function_declarations
915 function_declarations = []
916 for tool_group in AGENT_TOOLS:
917 for func_decl in tool_group.get("functionDeclarations", []):
918 function_declarations.append(
919 types.FunctionDeclaration(
920 name=func_decl["name"],
921 description=func_decl["description"],
922 parameters=func_decl["parameters"],
923 )
924 )
926 # Wrap function declarations in a Tool object
927 tools = [types.Tool(function_declarations=function_declarations)]
929 # Initialize conversation with user message
930 contents = [types.Content(role="user", parts=[types.Part(text=prompt)])]
932 for turn in range(max_turns):
933 try:
934 # Generate content with tools
935 response = client.models.generate_content(
936 model=genai_model,
937 contents=contents,
938 config=types.GenerateContentConfig(
939 tools=tools,
940 temperature=0.7,
941 max_output_tokens=8192,
942 ),
943 )
945 # Check if response has function calls
946 if not response.candidates or not response.candidates[0].content.parts:
947 return "No response generated"
949 parts = response.candidates[0].content.parts
950 function_calls = []
951 text_parts = []
953 for part in parts:
954 if part.function_call:
955 function_calls.append(part.function_call)
956 elif part.text:
957 text_parts.append(part.text)
959 # If no function calls, return text response
960 if not function_calls:
961 result = "".join(text_parts)
962 return result if result.strip() else "Task completed"
964 # Execute function calls and prepare responses
965 function_responses = []
966 for func_call in function_calls:
967 func_name = func_call.name
968 func_args = dict(func_call.args) if func_call.args else {}
970 logger.info(f"[AgenticGemini] Turn {turn + 1}: Executing {func_name}")
971 result = _execute_tool(func_name, func_args)
973 function_responses.append(
974 types.Part(
975 function_response=types.FunctionResponse(
976 name=func_name,
977 response={"result": result},
978 )
979 )
980 )
982 # Add model's response to conversation
983 contents.append(response.candidates[0].content)
985 # Add function responses to conversation
986 contents.append(
987 types.Content(
988 role="user",
989 parts=function_responses,
990 )
991 )
993 except Exception as e:
994 logger.error(f"[AgenticGemini] Error in turn {turn + 1}: {e}")
995 raise ValueError(f"Gemini API key request failed: {e}")
997 return "Max turns reached without final response"
1000async def invoke_gemini_agentic(
1001 token_store: TokenStore,
1002 prompt: str,
1003 model: str = "gemini-3-flash",
1004 max_turns: int = 10,
1005 timeout: int = 120,
1006) -> str:
1007 """
1008 Invoke Gemini with function calling for agentic tasks.
1010 This function implements a multi-turn agentic loop:
1011 1. Send prompt with tool definitions
1012 2. If model returns FunctionCall, execute the tool
1013 3. Send FunctionResponse back to model
1014 4. Repeat until model returns text or max_turns reached
1016 Supports two authentication methods (API key takes precedence):
1017 1. API Key: Set GEMINI_API_KEY or GOOGLE_API_KEY in environment
1018 2. OAuth: Use Google OAuth via Antigravity (requires stravinsky-auth login gemini)
1020 Args:
1021 token_store: Token store for OAuth credentials
1022 prompt: The task prompt
1023 model: Gemini model to use
1024 max_turns: Maximum number of tool-use turns
1025 timeout: Request timeout in seconds
1027 Returns:
1028 Final text response from the model
1029 """
1030 # Check for API key authentication (takes precedence over OAuth)
1031 api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY")
1032 if api_key:
1033 logger.info("[AgenticGemini] Using API key authentication (GEMINI_API_KEY)")
1034 return await _invoke_gemini_agentic_with_api_key(
1035 api_key=api_key,
1036 prompt=prompt,
1037 model=model,
1038 max_turns=max_turns,
1039 timeout=timeout,
1040 )
1042 # Fallback to OAuth authentication (Antigravity)
1043 logger.info("[AgenticGemini] Using OAuth authentication (Antigravity)")
1045 # USER-VISIBLE NOTIFICATION (stderr) - Shows agentic mode with OAuth
1046 import sys
1047 print(f"🔮 GEMINI (OAuth/Agentic): {model} | max_turns={max_turns}", file=sys.stderr)
1049 access_token = await _ensure_valid_token(token_store, "gemini")
1050 api_model = resolve_gemini_model(model)
1052 # Use persistent session ID for this conversation
1053 session_id = _get_session_id(conversation_key="agentic")
1055 # Project ID from environment or default
1056 project_id = os.getenv("STRAVINSKY_ANTIGRAVITY_PROJECT_ID", ANTIGRAVITY_DEFAULT_PROJECT_ID)
1058 headers = {
1059 "Authorization": f"Bearer {access_token}",
1060 "Content-Type": "application/json",
1061 **ANTIGRAVITY_HEADERS,
1062 }
1064 # Initialize conversation
1065 contents = [{"role": "user", "parts": [{"text": prompt}]}]
1067 # Get pooled HTTP client for connection reuse
1068 client = await _get_http_client()
1070 for turn in range(max_turns):
1071 # Build inner request payload (what goes inside "request" wrapper)
1072 inner_payload = {
1073 "contents": contents,
1074 "tools": AGENT_TOOLS,
1075 "generationConfig": {
1076 "temperature": 0.7,
1077 "maxOutputTokens": 8192,
1078 },
1079 "sessionId": session_id,
1080 }
1082 # Wrap request body per reference implementation
1083 # From request.ts wrapRequestBody()
1084 import uuid as uuid_module # Local import workaround
1086 wrapped_payload = {
1087 "project": project_id,
1088 "model": api_model,
1089 "userAgent": "antigravity",
1090 "requestId": f"agent-{uuid_module.uuid4()}",
1091 "request": inner_payload,
1092 }
1094 # Try endpoints in fallback order
1095 response = None
1096 last_error = None
1098 for endpoint in ANTIGRAVITY_ENDPOINTS:
1099 # Reference uses: {endpoint}/v1internal:generateContent (NOT /models/{model})
1100 api_url = f"{endpoint}/v1internal:generateContent"
1102 try:
1103 response = await client.post(
1104 api_url,
1105 headers=headers,
1106 json=wrapped_payload,
1107 timeout=float(timeout),
1108 )
1110 # 401/403 might be endpoint-specific, try next endpoint
1111 if response.status_code in (401, 403):
1112 logger.warning(
1113 f"[AgenticGemini] Endpoint {endpoint} returned {response.status_code}, trying next"
1114 )
1115 last_error = Exception(f"{response.status_code} from {endpoint}")
1116 continue
1118 # If we got a non-retryable response (success or 4xx client error), use it
1119 if response.status_code < 500 and response.status_code != 429:
1120 break
1122 logger.warning(
1123 f"[AgenticGemini] Endpoint {endpoint} returned {response.status_code}, trying next"
1124 )
1126 except httpx.TimeoutException as e:
1127 last_error = e
1128 logger.warning(f"[AgenticGemini] Endpoint {endpoint} timed out, trying next")
1129 continue
1130 except Exception as e:
1131 last_error = e
1132 logger.warning(f"[AgenticGemini] Endpoint {endpoint} failed: {e}, trying next")
1133 continue
1135 if response is None:
1136 raise ValueError(f"All Antigravity endpoints failed: {last_error}")
1138 response.raise_for_status()
1139 data = response.json()
1141 # Extract response - unwrap outer "response" envelope if present
1142 inner_response = data.get("response", data)
1143 candidates = inner_response.get("candidates", [])
1144 if not candidates:
1145 return "No response generated"
1147 content = candidates[0].get("content", {})
1148 parts = content.get("parts", [])
1150 if not parts:
1151 return "No response parts"
1153 # Check for function call
1154 function_call = None
1155 text_response = None
1157 for part in parts:
1158 if "functionCall" in part:
1159 function_call = part["functionCall"]
1160 break
1161 elif "text" in part:
1162 text_response = part["text"]
1164 if function_call:
1165 # Execute the function
1166 func_name = function_call.get("name")
1167 func_args = function_call.get("args", {})
1169 logger.info(f"[AgenticGemini] Turn {turn + 1}: Executing {func_name}")
1170 result = _execute_tool(func_name, func_args)
1172 # Add model's response and function result to conversation
1173 contents.append({"role": "model", "parts": [{"functionCall": function_call}]})
1174 contents.append(
1175 {
1176 "role": "user",
1177 "parts": [
1178 {"functionResponse": {"name": func_name, "response": {"result": result}}}
1179 ],
1180 }
1181 )
1182 else:
1183 # No function call, return text response
1184 return text_response or "Task completed"
1186 return "Max turns reached without final response"
1189@retry(
1190 stop=stop_after_attempt(2), # Reduced from 5 to 2 attempts
1191 wait=wait_exponential(multiplier=2, min=10, max=120), # Longer waits: 10s → 20s → 40s
1192 retry=retry_if_exception(is_retryable_exception),
1193 before_sleep=lambda retry_state: logger.info(
1194 f"Server error, retrying in {retry_state.next_action.sleep} seconds..."
1195 ),
1196)
1197async def invoke_openai(
1198 token_store: TokenStore,
1199 prompt: str,
1200 model: str = "gpt-5.2-codex",
1201 temperature: float = 0.7,
1202 max_tokens: int = 4096,
1203 thinking_budget: int = 0,
1204) -> str:
1205 """
1206 Invoke an OpenAI model with the given prompt.
1208 Args:
1209 token_store: Token store for API key
1210 prompt: The prompt to send to OpenAI
1211 model: OpenAI model to use
1212 temperature: Sampling temperature (0.0-2.0)
1213 max_tokens: Maximum tokens in response
1215 Returns:
1216 The model's response text.
1218 Raises:
1219 ValueError: If not authenticated with OpenAI
1220 httpx.HTTPStatusError: If API request fails
1221 """
1222 # Execute pre-model invoke hooks
1223 params = {
1224 "prompt": prompt,
1225 "model": model,
1226 "temperature": temperature,
1227 "max_tokens": max_tokens,
1228 "thinking_budget": thinking_budget,
1229 "token_store": token_store, # Pass for hooks that need model access
1230 "provider": "openai", # Identify which provider is being called
1231 }
1232 hook_manager = get_hook_manager()
1233 params = await hook_manager.execute_pre_model_invoke(params)
1235 # Update local variables from possibly modified params
1236 prompt = params["prompt"]
1237 model = params["model"]
1238 temperature = params["temperature"]
1239 max_tokens = params["max_tokens"]
1240 thinking_budget = params["thinking_budget"]
1242 # Extract agent context for logging (may be passed via params or original call)
1243 agent_context = params.get("agent_context", {})
1244 agent_type = agent_context.get("agent_type", "direct")
1245 task_id = agent_context.get("task_id", "")
1246 description = agent_context.get("description", "")
1247 prompt_summary = _summarize_prompt(prompt)
1249 # Log with agent context and prompt summary
1250 logger.info(f"[{agent_type}] → {model}: {prompt_summary}")
1252 # USER-VISIBLE NOTIFICATION (stderr) - Shows when OpenAI is invoked
1253 import sys
1254 task_info = f" task={task_id}" if task_id else ""
1255 desc_info = f" | {description}" if description else ""
1256 print(f"🧠 OPENAI: {model} | agent={agent_type}{task_info}{desc_info}", file=sys.stderr)
1258 access_token = await _ensure_valid_token(token_store, "openai")
1259 logger.info("[invoke_openai] Got access token")
1261 # ChatGPT Backend API - Uses Codex Responses endpoint
1262 # Replicates opencode-openai-codex-auth plugin behavior
1263 api_url = "https://chatgpt.com/backend-api/codex/responses"
1265 # Extract account ID from JWT token
1266 logger.info("[invoke_openai] Extracting account ID from JWT")
1267 try:
1268 parts = access_token.split(".")
1269 payload_b64 = parts[1]
1270 padding = 4 - len(payload_b64) % 4
1271 if padding != 4:
1272 payload_b64 += "=" * padding
1273 jwt_payload = json_module.loads(base64.urlsafe_b64decode(payload_b64))
1274 account_id = jwt_payload.get("https://api.openai.com/auth", {}).get("chatgpt_account_id")
1275 except Exception as e:
1276 logger.error(f"Failed to extract account ID from JWT: {e}")
1277 account_id = None
1279 # Fetch official Codex instructions from GitHub
1280 instructions = await _fetch_codex_instructions(model)
1282 # Headers matching opencode-openai-codex-auth plugin
1283 headers = {
1284 "Authorization": f"Bearer {access_token}",
1285 "Content-Type": "application/json",
1286 "Accept": "text/event-stream", # SSE stream
1287 "openai-beta": "responses=experimental",
1288 "openai-originator": "codex_cli_rs",
1289 }
1291 if account_id:
1292 headers["x-openai-account-id"] = account_id
1294 # Request body matching opencode transformation
1295 payload = {
1296 "model": model,
1297 "store": False, # Required by ChatGPT backend
1298 "stream": True, # Always stream (handler converts to non-stream if needed)
1299 "instructions": instructions,
1300 "input": [{"role": "user", "content": prompt}],
1301 "reasoning": {"effort": "high" if thinking_budget > 0 else "medium", "summary": "auto"},
1302 "text": {"verbosity": "medium"},
1303 "include": ["reasoning.encrypted_content"],
1304 }
1306 # Stream the response and collect text
1307 text_chunks = []
1309 logger.info(f"[invoke_openai] Calling {api_url} with model {model}")
1310 logger.info(f"[invoke_openai] Payload keys: {list(payload.keys())}")
1311 logger.info(f"[invoke_openai] Instructions length: {len(instructions)}")
1313 try:
1314 async with httpx.AsyncClient() as client, client.stream(
1315 "POST", api_url, headers=headers, json=payload, timeout=120.0
1316 ) as response:
1317 logger.info(f"[invoke_openai] Response status: {response.status_code}")
1318 if response.status_code == 401:
1319 raise ValueError(
1320 "OpenAI authentication failed. Run: stravinsky-auth login openai"
1321 )
1323 if response.status_code >= 400:
1324 error_body = await response.aread()
1325 error_text = error_body.decode("utf-8")
1326 logger.error(f"OpenAI API error {response.status_code}: {error_text}")
1327 logger.error(f"Request payload was: {payload}")
1328 logger.error(f"Request headers were: {headers}")
1329 raise ValueError(f"OpenAI API error {response.status_code}: {error_text}")
1331 # Parse SSE stream for text deltas
1332 async for line in response.aiter_lines():
1333 if line.startswith("data: "):
1334 data_json = line[6:] # Remove "data: " prefix
1335 try:
1336 data = json_module.loads(data_json)
1337 event_type = data.get("type")
1339 # Extract text deltas from SSE stream
1340 if event_type == "response.output_text.delta":
1341 delta = data.get("delta", "")
1342 text_chunks.append(delta)
1344 except json_module.JSONDecodeError:
1345 pass # Skip malformed JSON
1346 except Exception as e:
1347 logger.warning(f"Error processing SSE event: {e}")
1349 # Return collected text
1350 result = "".join(text_chunks)
1351 if not result:
1352 return "No response generated"
1353 return result
1355 except httpx.HTTPStatusError as e:
1356 logger.error(f"HTTP error: {e}")
1357 raise
1358 except Exception as e:
1359 logger.error(f"Unexpected error in invoke_openai: {e}")
1360 raise ValueError(f"Failed to invoke OpenAI: {e}")