Coverage for mcp_bridge/tools/model_invoke.py: 29%

486 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2026-01-10 00:20 -0500

1""" 

2Model invocation tools for Gemini and OpenAI. 

3 

4These tools use OAuth tokens from the token store to authenticate 

5API requests to external model providers. 

6""" 

7 

8import asyncio 

9import base64 

10import json as json_module 

11import logging 

12import os 

13import time 

14import uuid 

15 

16from mcp_bridge.config.rate_limits import get_rate_limiter 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21def _summarize_prompt(prompt: str, max_length: int = 120) -> str: 

22 """ 

23 Generate a short summary of the prompt for logging. 

24 

25 Args: 

26 prompt: The full prompt text 

27 max_length: Maximum characters to include in summary 

28 

29 Returns: 

30 Truncated prompt suitable for logging (single line, max_length chars) 

31 """ 

32 if not prompt: 

33 return "(empty prompt)" 

34 

35 # Normalize whitespace: collapse newlines and multiple spaces 

36 clean = " ".join(prompt.split()) 

37 

38 if len(clean) <= max_length: 

39 return clean 

40 

41 return clean[:max_length] + "..." 

42 

43 

44# Cache for Codex instructions (fetched from GitHub) 

45_CODEX_INSTRUCTIONS_CACHE = {} 

46_CODEX_INSTRUCTIONS_RELEASE_TAG = "rust-v0.77.0" # Update as needed 

47 

48 

49async def _fetch_codex_instructions(model: str = "gpt-5.2-codex") -> str: 

50 """ 

51 Fetch official Codex instructions from GitHub. 

52 Caches results to avoid repeated fetches. 

53 """ 

54 import httpx 

55 

56 if model in _CODEX_INSTRUCTIONS_CACHE: 

57 return _CODEX_INSTRUCTIONS_CACHE[model] 

58 

59 # Map model to prompt file 

60 prompt_file_map = { 

61 "gpt-5.2-codex": "gpt-5.2-codex_prompt.md", 

62 "gpt-5.1-codex": "gpt_5_codex_prompt.md", 

63 "gpt-5.1-codex-max": "gpt_5_codex_max_prompt.md", 

64 } 

65 

66 prompt_file = prompt_file_map.get(model, "gpt-5.2-codex_prompt.md") 

67 url = f"https://raw.githubusercontent.com/openai/codex/{_CODEX_INSTRUCTIONS_RELEASE_TAG}/codex-rs/core/{prompt_file}" 

68 

69 try: 

70 async with httpx.AsyncClient() as client: 

71 response = await client.get(url, timeout=30.0) 

72 response.raise_for_status() 

73 instructions = response.text 

74 _CODEX_INSTRUCTIONS_CACHE[model] = instructions 

75 return instructions 

76 except Exception as e: 

77 logger.error(f"Failed to fetch Codex instructions: {e}") 

78 # Return basic fallback instructions 

79 return "You are Codex, based on GPT-5. You are running as a coding agent." 

80 

81 

82# Model name mapping: user-friendly names -> Antigravity API model IDs 

83# Per API spec: https://github.com/NoeFabris/opencode-antigravity-auth/blob/main/docs/ANTIGRAVITY_API_SPEC.md 

84# VERIFIED GEMINI MODELS (as of 2026-01): 

85# - gemini-3-flash, gemini-3-pro-high, gemini-3-pro-low 

86# NOTE: Claude models should use Anthropic API directly, NOT Antigravity 

87GEMINI_MODEL_MAP = { 

88 # Antigravity verified Gemini models (pass-through) 

89 "gemini-3-pro-low": "gemini-3-pro-low", 

90 "gemini-3-pro-high": "gemini-3-pro-high", 

91 "gemini-3-flash": "gemini-3-flash", 

92 # Aliases for convenience 

93 "gemini-flash": "gemini-3-flash", 

94 "gemini-pro": "gemini-3-pro-low", 

95 "gemini-3-pro": "gemini-3-pro-low", 

96 "gemini": "gemini-3-pro-low", # Default gemini alias 

97 # Legacy mappings (redirect to Antigravity models) 

98 "gemini-2.0-flash": "gemini-3-pro-low", 

99 "gemini-2.0-flash-001": "gemini-3-pro-low", 

100 "gemini-2.0-pro": "gemini-3-pro-low", 

101 "gemini-2.0-pro-exp": "gemini-3-pro-high", 

102} 

103 

104 

105def resolve_gemini_model(model: str) -> str: 

106 """Resolve a user-friendly model name to the actual API model ID.""" 

107 return GEMINI_MODEL_MAP.get(model, model) # Pass through if not in map 

108 

109 

110import httpx 

111from tenacity import ( 

112 retry, 

113 retry_if_exception, 

114 stop_after_attempt, 

115 wait_exponential, 

116) 

117 

118from ..auth.oauth import ( 

119 ANTIGRAVITY_DEFAULT_PROJECT_ID, 

120 ANTIGRAVITY_ENDPOINTS, 

121 ANTIGRAVITY_HEADERS, 

122) 

123from ..auth.oauth import ( 

124 refresh_access_token as gemini_refresh, 

125) 

126from ..auth.openai_oauth import refresh_access_token as openai_refresh 

127from ..auth.token_store import TokenStore 

128from ..hooks.manager import get_hook_manager 

129 

130# ======================== 

131# SESSION & HTTP MANAGEMENT 

132# ======================== 

133 

134# Session cache for thinking signature persistence across multi-turn conversations 

135# Key: conversation_key (or "default"), Value: session UUID 

136_SESSION_CACHE: dict[str, str] = {} 

137 

138# Pooled HTTP client for connection reuse 

139_HTTP_CLIENT: httpx.AsyncClient | None = None 

140 

141# Per-model semaphores for async rate limiting (uses config from ~/.stravinsky/config.json) 

142_GEMINI_SEMAPHORES: dict[str, asyncio.Semaphore] = {} 

143 

144 

145def _get_gemini_rate_limit(model: str) -> int: 

146 """ 

147 Get configured rate limit for a Gemini model. 

148 

149 Reads from ~/.stravinsky/config.json if available, otherwise uses defaults. 

150 

151 Args: 

152 model: Gemini model name (e.g., "gemini-3-flash", "gemini-3-pro-high") 

153 

154 Returns: 

155 Configured concurrency limit for this model 

156 """ 

157 rate_limiter = get_rate_limiter() 

158 # Normalize model name to match config keys 

159 normalized = rate_limiter._normalize_model(model) 

160 return rate_limiter._limits.get(normalized, rate_limiter._limits.get("_default", 5)) 

161 

162 

163def _get_gemini_semaphore(model: str) -> asyncio.Semaphore: 

164 """ 

165 Get or create async semaphore for Gemini model rate limiting. 

166 

167 Creates one semaphore per model type with limits from config. 

168 Limits can be customized in ~/.stravinsky/config.json: 

169 { 

170 "rate_limits": { 

171 "gemini-3-flash": 15, 

172 "gemini-3-pro-high": 8 

173 } 

174 } 

175 

176 Args: 

177 model: Gemini model name 

178 

179 Returns: 

180 asyncio.Semaphore with configured limit for this model 

181 """ 

182 if model not in _GEMINI_SEMAPHORES: 

183 limit = _get_gemini_rate_limit(model) 

184 _GEMINI_SEMAPHORES[model] = asyncio.Semaphore(limit) 

185 logger.info(f"[RateLimit] Created semaphore for {model} with limit {limit}") 

186 return _GEMINI_SEMAPHORES[model] 

187 

188 

189def _get_session_id(conversation_key: str | None = None) -> str: 

190 """ 

191 Get or create persistent session ID for thinking signature caching. 

192 

193 Per Antigravity API: session IDs must persist across multi-turn to maintain 

194 thinking signature cache. Creating new UUID per call breaks this. 

195 

196 Args: 

197 conversation_key: Optional key to scope session (e.g., per-agent) 

198 

199 Returns: 

200 Stable session UUID for this conversation 

201 """ 

202 import uuid as uuid_module # Local import workaround 

203 

204 key = conversation_key or "default" 

205 if key not in _SESSION_CACHE: 

206 _SESSION_CACHE[key] = str(uuid_module.uuid4()) 

207 return _SESSION_CACHE[key] 

208 

209 

210def clear_session_cache() -> None: 

211 """Clear session cache (for thinking recovery on error).""" 

212 _SESSION_CACHE.clear() 

213 

214 

215async def _get_http_client() -> httpx.AsyncClient: 

216 """ 

217 Get or create pooled HTTP client for connection reuse. 

218 

219 Reusing a single client across requests improves performance 

220 by maintaining connection pools. 

221 """ 

222 global _HTTP_CLIENT 

223 if _HTTP_CLIENT is None or _HTTP_CLIENT.is_closed: 

224 _HTTP_CLIENT = httpx.AsyncClient(timeout=120.0) 

225 return _HTTP_CLIENT 

226 

227 

228 

229 

230def _extract_gemini_response(data: dict) -> str: 

231 """ 

232 Extract text from Gemini response, handling thinking blocks. 

233 

234 Per Antigravity API, responses may contain: 

235 - text: Regular response text 

236 - thought: Thinking block content (when thinkingConfig enabled) 

237 - thoughtSignature: Signature for caching (ignored) 

238 

239 Args: 

240 data: Raw API response JSON 

241 

242 Returns: 

243 Extracted text, with thinking blocks formatted as <thinking>...</thinking> 

244 """ 

245 try: 

246 # Unwrap the outer "response" envelope if present 

247 inner_response = data.get("response", data) 

248 candidates = inner_response.get("candidates", []) 

249 

250 if not candidates: 

251 return "No response generated" 

252 

253 content = candidates[0].get("content", {}) 

254 parts = content.get("parts", []) 

255 

256 if not parts: 

257 return "No response parts" 

258 

259 text_parts = [] 

260 thinking_parts = [] 

261 

262 for part in parts: 

263 if "thought" in part: 

264 thinking_parts.append(part["thought"]) 

265 elif "text" in part: 

266 text_parts.append(part["text"]) 

267 # Skip thoughtSignature parts 

268 

269 # Combine results 

270 result = "".join(text_parts) 

271 

272 # Prepend thinking blocks if present 

273 if thinking_parts: 

274 thinking_content = "".join(thinking_parts) 

275 result = f"<thinking>\n{thinking_content}\n</thinking>\n\n{result}" 

276 

277 return result if result.strip() else "No response generated" 

278 

279 except (KeyError, IndexError, TypeError) as e: 

280 return f"Error parsing response: {e}" 

281 

282 

283async def _ensure_valid_token(token_store: TokenStore, provider: str) -> str: 

284 """ 

285 Get a valid access token, refreshing if needed. 

286 

287 Args: 

288 token_store: Token store 

289 provider: Provider name 

290 

291 Returns: 

292 Valid access token 

293 

294 Raises: 

295 ValueError: If not authenticated 

296 """ 

297 # Check if token needs refresh (with 5 minute buffer) 

298 if token_store.needs_refresh(provider, buffer_seconds=300): 

299 token = token_store.get_token(provider) 

300 

301 if not token or not token.get("refresh_token"): 

302 raise ValueError( 

303 f"Not authenticated with {provider}. " 

304 f"Run: python -m mcp_bridge.auth.cli login {provider}" 

305 ) 

306 

307 try: 

308 if provider == "gemini": 

309 result = gemini_refresh(token["refresh_token"]) 

310 elif provider == "openai": 

311 result = openai_refresh(token["refresh_token"]) 

312 else: 

313 raise ValueError(f"Unknown provider: {provider}") 

314 

315 # Update stored token 

316 token_store.set_token( 

317 provider=provider, 

318 access_token=result.access_token, 

319 refresh_token=result.refresh_token or token["refresh_token"], 

320 expires_at=time.time() + result.expires_in, 

321 ) 

322 

323 return result.access_token 

324 except Exception as e: 

325 raise ValueError( 

326 f"Token refresh failed: {e}. Run: python -m mcp_bridge.auth.cli login {provider}" 

327 ) 

328 

329 access_token = token_store.get_access_token(provider) 

330 if not access_token: 

331 raise ValueError( 

332 f"Not authenticated with {provider}. " 

333 f"Run: python -m mcp_bridge.auth.cli login {provider}" 

334 ) 

335 

336 return access_token 

337 

338 

339def is_retryable_exception(e: Exception) -> bool: 

340 """ 

341 Check if an exception is retryable (5xx only, NOT 429). 

342 

343 429 (Rate Limit) errors should fail fast - retrying makes the problem worse 

344 by adding more requests to an already exhausted quota. The semaphore prevents 

345 these in the first place, but if one slips through, we shouldn't retry. 

346 """ 

347 if isinstance(e, httpx.HTTPStatusError): 

348 # Only retry server errors (5xx), not rate limits (429) 

349 return 500 <= e.response.status_code < 600 

350 return False 

351 

352 

353async def _invoke_gemini_with_api_key( 

354 api_key: str, 

355 prompt: str, 

356 model: str = "gemini-3-flash", 

357 temperature: float = 0.7, 

358 max_tokens: int = 4096, 

359 thinking_budget: int = 0, 

360 image_path: str | None = None, 

361) -> str: 

362 """ 

363 Invoke Gemini using API key authentication (google-genai library). 

364 

365 This is an alternative to OAuth authentication that uses the official 

366 google-genai Python library with a simple API key. 

367 

368 Args: 

369 api_key: Gemini API key (from GEMINI_API_KEY or GOOGLE_API_KEY env var) 

370 prompt: The prompt to send to Gemini 

371 model: Gemini model to use (e.g., "gemini-2.0-flash-exp") 

372 temperature: Sampling temperature (0.0-2.0) 

373 max_tokens: Maximum tokens in response 

374 thinking_budget: Tokens reserved for internal reasoning (if supported) 

375 image_path: Optional path to image/PDF for vision analysis 

376 

377 Returns: 

378 The model's response text. 

379 

380 Raises: 

381 ImportError: If google-genai library is not installed 

382 ValueError: If API request fails 

383 """ 

384 try: 

385 from google import genai 

386 except ImportError: 

387 raise ImportError( 

388 "google-genai library not installed. Install with: pip install google-genai" 

389 ) 

390 

391 # Map stravinsky model names to google-genai model names 

392 # google-genai uses different model naming (e.g., "gemini-2.0-flash-exp") 

393 model_map = { 

394 "gemini-3-flash": "gemini-2.0-flash-exp", 

395 "gemini-3-pro-low": "gemini-2.0-flash-exp", 

396 "gemini-3-pro-high": "gemini-exp-1206", # Experimental model 

397 "gemini-flash": "gemini-2.0-flash-exp", 

398 "gemini-pro": "gemini-2.0-flash-exp", 

399 "gemini-3-pro": "gemini-2.0-flash-exp", 

400 "gemini": "gemini-2.0-flash-exp", 

401 } 

402 genai_model = model_map.get(model, model) # Pass through if not in map 

403 

404 try: 

405 # Initialize client with API key 

406 client = genai.Client(api_key=api_key) 

407 

408 # Build generation config 

409 config = { 

410 "temperature": temperature, 

411 "max_output_tokens": max_tokens, 

412 } 

413 

414 # Add thinking budget if supported (experimental feature) 

415 if thinking_budget > 0: 

416 config["thinking_config"] = { 

417 "thinking_budget": thinking_budget, 

418 } 

419 

420 # Build contents - text prompt plus optional image 

421 contents = [prompt] 

422 

423 # Add image data for vision analysis 

424 if image_path: 

425 from pathlib import Path 

426 

427 image_file = Path(image_path) 

428 if image_file.exists(): 

429 # google-genai supports direct file path or base64 

430 # For simplicity, use the file path directly 

431 contents.append(image_file) 

432 logger.info(f"[API_KEY] Added vision data: {image_path}") 

433 

434 # Generate content 

435 response = client.models.generate_content( 

436 model=genai_model, 

437 contents=contents, 

438 config=config, 

439 ) 

440 

441 # Extract text from response 

442 if hasattr(response, 'text'): 

443 return response.text 

444 elif hasattr(response, 'candidates') and response.candidates: 

445 # Fallback: extract from candidates 

446 candidate = response.candidates[0] 

447 if hasattr(candidate, 'content'): 

448 parts = candidate.content.parts 

449 text_parts = [part.text for part in parts if hasattr(part, 'text')] 

450 return "".join(text_parts) if text_parts else "No response generated" 

451 

452 return "No response generated" 

453 

454 except Exception as e: 

455 logger.error(f"API key authentication failed: {e}") 

456 raise ValueError(f"Gemini API key request failed: {e}") 

457 

458 

459@retry( 

460 stop=stop_after_attempt(2), # Reduced from 5 to 2 attempts 

461 wait=wait_exponential(multiplier=2, min=10, max=120), # Longer waits: 10s → 20s → 40s 

462 retry=retry_if_exception(is_retryable_exception), 

463 before_sleep=lambda retry_state: logger.info( 

464 f"Server error, retrying in {retry_state.next_action.sleep} seconds..." 

465 ), 

466) 

467async def invoke_gemini( 

468 token_store: TokenStore, 

469 prompt: str, 

470 model: str = "gemini-3-flash", 

471 temperature: float = 0.7, 

472 max_tokens: int = 4096, 

473 thinking_budget: int = 0, 

474 image_path: str | None = None, 

475) -> str: 

476 """ 

477 Invoke a Gemini model with the given prompt. 

478 

479 Supports two authentication methods (API key takes precedence): 

480 1. API Key: Set GEMINI_API_KEY or GOOGLE_API_KEY in environment 

481 2. OAuth: Use Google OAuth via Antigravity (requires stravinsky-auth login gemini) 

482 

483 Supports vision API for image/PDF analysis when image_path is provided. 

484 

485 Args: 

486 token_store: Token store for OAuth credentials 

487 prompt: The prompt to send to Gemini 

488 model: Gemini model to use 

489 temperature: Sampling temperature (0.0-2.0) 

490 max_tokens: Maximum tokens in response 

491 thinking_budget: Tokens reserved for internal reasoning 

492 image_path: Optional path to image/PDF for vision analysis (token optimization) 

493 

494 Returns: 

495 The model's response text. 

496 

497 Raises: 

498 ValueError: If not authenticated with Gemini 

499 httpx.HTTPStatusError: If API request fails 

500 """ 

501 logger.info(f"[DEBUG] invoke_gemini called, uuid module check: {uuid}") 

502 # Execute pre-model invoke hooks 

503 params = { 

504 "prompt": prompt, 

505 "model": model, 

506 "temperature": temperature, 

507 "max_tokens": max_tokens, 

508 "thinking_budget": thinking_budget, 

509 "token_store": token_store, # Pass for hooks that need model access 

510 "provider": "gemini", # Identify which provider is being called 

511 } 

512 hook_manager = get_hook_manager() 

513 params = await hook_manager.execute_pre_model_invoke(params) 

514 

515 # Update local variables from possibly modified params 

516 prompt = params["prompt"] 

517 model = params["model"] 

518 temperature = params["temperature"] 

519 max_tokens = params["max_tokens"] 

520 thinking_budget = params["thinking_budget"] 

521 

522 # Extract agent context for logging (may be passed via params or original call) 

523 agent_context = params.get("agent_context", {}) 

524 agent_type = agent_context.get("agent_type", "direct") 

525 task_id = agent_context.get("task_id", "") 

526 description = agent_context.get("description", "") 

527 prompt_summary = _summarize_prompt(prompt) 

528 

529 # Log with agent context and prompt summary 

530 logger.info(f"[{agent_type}] → {model}: {prompt_summary}") 

531 

532 # Check for API key authentication (takes precedence over OAuth) 

533 api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") 

534 

535 # USER-VISIBLE NOTIFICATION (stderr) - Shows when Gemini is invoked with auth method 

536 import sys 

537 task_info = f" task={task_id}" if task_id else "" 

538 desc_info = f" | {description}" if description else "" 

539 auth_method = "API" if api_key else "OAuth" 

540 print(f"🔮 GEMINI ({auth_method}): {model} | agent={agent_type}{task_info}{desc_info}", file=sys.stderr) 

541 

542 if api_key: 

543 logger.info(f"[{agent_type}] Using API key authentication (GEMINI_API_KEY)") 

544 # Rate limit concurrent requests (configurable via ~/.stravinsky/config.json) 

545 semaphore = _get_gemini_semaphore(model) 

546 async with semaphore: 

547 return await _invoke_gemini_with_api_key( 

548 api_key=api_key, 

549 prompt=prompt, 

550 model=model, 

551 temperature=temperature, 

552 max_tokens=max_tokens, 

553 thinking_budget=thinking_budget, 

554 image_path=image_path, 

555 ) 

556 

557 # Fallback to OAuth authentication (Antigravity) 

558 logger.info(f"[{agent_type}] Using OAuth authentication (Antigravity)") 

559 # Rate limit concurrent Gemini requests (configurable via ~/.stravinsky/config.json) 

560 semaphore = _get_gemini_semaphore(model) 

561 async with semaphore: 

562 access_token = await _ensure_valid_token(token_store, "gemini") 

563 

564 # Resolve user-friendly model name to actual API model ID 

565 api_model = resolve_gemini_model(model) 

566 

567 # Use persistent session ID for thinking signature caching 

568 session_id = _get_session_id() 

569 project_id = os.getenv("STRAVINSKY_ANTIGRAVITY_PROJECT_ID", ANTIGRAVITY_DEFAULT_PROJECT_ID) 

570 

571 headers = { 

572 "Authorization": f"Bearer {access_token}", 

573 "Content-Type": "application/json", 

574 **ANTIGRAVITY_HEADERS, # Include Antigravity headers 

575 } 

576 

577 # Build inner request payload 

578 # Per API spec: contents must include role ("user" or "model") 

579 

580 # Build parts list - text prompt plus optional image 

581 parts = [{"text": prompt}] 

582 

583 # Add image data for vision analysis (token optimization for multimodal) 

584 if image_path: 

585 import base64 

586 from pathlib import Path 

587 

588 image_file = Path(image_path) 

589 if image_file.exists(): 

590 # Determine MIME type 

591 suffix = image_file.suffix.lower() 

592 mime_types = { 

593 ".png": "image/png", 

594 ".jpg": "image/jpeg", 

595 ".jpeg": "image/jpeg", 

596 ".gif": "image/gif", 

597 ".webp": "image/webp", 

598 ".pdf": "application/pdf", 

599 } 

600 mime_type = mime_types.get(suffix, "image/png") 

601 

602 # Read and base64 encode 

603 image_data = base64.b64encode(image_file.read_bytes()).decode("utf-8") 

604 

605 # Add inline image data for Gemini Vision API 

606 parts.append({ 

607 "inlineData": { 

608 "mimeType": mime_type, 

609 "data": image_data, 

610 } 

611 }) 

612 logger.info(f"[multimodal] Added vision data: {image_path} ({mime_type})") 

613 

614 inner_payload = { 

615 "contents": [{"role": "user", "parts": parts}], 

616 "generationConfig": { 

617 "temperature": temperature, 

618 "maxOutputTokens": max_tokens, 

619 }, 

620 "sessionId": session_id, 

621 } 

622 

623 # Add thinking budget if supported by model/API 

624 if thinking_budget > 0: 

625 # For Gemini 2.0+ Thinking models 

626 # Per Antigravity API: use "thinkingBudget", NOT "tokenLimit" 

627 inner_payload["generationConfig"]["thinkingConfig"] = { 

628 "includeThoughts": True, 

629 "thinkingBudget": thinking_budget, 

630 } 

631 

632 # Wrap request body per reference implementation 

633 try: 

634 import uuid as uuid_module # Local import workaround for MCP context issue 

635 

636 request_id = f"invoke-{uuid_module.uuid4()}" 

637 except Exception as e: 

638 logger.error(f"UUID IMPORT FAILED: {e}") 

639 raise RuntimeError(f"CUSTOM ERROR: UUID import failed: {e}") 

640 

641 wrapped_payload = { 

642 "project": project_id, 

643 "model": api_model, 

644 "userAgent": "antigravity", 

645 "requestId": request_id, 

646 "request": inner_payload, 

647 } 

648 

649 # Get pooled HTTP client for connection reuse 

650 client = await _get_http_client() 

651 

652 # Try endpoints in fallback order with thinking recovery 

653 response = None 

654 last_error = None 

655 max_retries = 2 # For thinking recovery 

656 

657 for retry_attempt in range(max_retries): 

658 for endpoint in ANTIGRAVITY_ENDPOINTS: 

659 # Reference uses: {endpoint}/v1internal:generateContent (NOT /models/{model}) 

660 api_url = f"{endpoint}/v1internal:generateContent" 

661 

662 try: 

663 response = await client.post( 

664 api_url, 

665 headers=headers, 

666 json=wrapped_payload, 

667 timeout=120.0, 

668 ) 

669 

670 # 401/403 might be endpoint-specific, try next endpoint 

671 if response.status_code in (401, 403): 

672 logger.warning( 

673 f"[Gemini] Endpoint {endpoint} returned {response.status_code}, trying next" 

674 ) 

675 last_error = Exception(f"{response.status_code} from {endpoint}") 

676 continue 

677 

678 # Check for thinking-related errors that need recovery 

679 if response.status_code in (400, 500): 

680 error_text = response.text.lower() 

681 if "thinking" in error_text or "signature" in error_text: 

682 logger.warning( 

683 "[Gemini] Thinking error detected, clearing session cache and retrying" 

684 ) 

685 clear_session_cache() 

686 # Update session ID for retry 

687 wrapped_payload["request"]["sessionId"] = _get_session_id() 

688 last_error = Exception(f"Thinking error: {response.text[:200]}") 

689 break # Break inner loop to retry with new session 

690 

691 # If we got a non-retryable response (success or 4xx client error), use it 

692 if response.status_code < 500 and response.status_code != 429: 

693 break 

694 

695 except httpx.TimeoutException as e: 

696 last_error = e 

697 continue 

698 except Exception as e: 

699 last_error = e 

700 continue 

701 else: 

702 # Inner loop completed without break - no thinking recovery needed 

703 break 

704 

705 # If we broke out of inner loop for thinking recovery, continue outer retry loop 

706 if response and response.status_code in (400, 500): 

707 continue 

708 break 

709 

710 if response is None: 

711 # FALLBACK: Try Claude sonnet-4.5 for agents that support it 

712 agent_context = params.get("agent_context", {}) 

713 agent_type = agent_context.get("agent_type", "unknown") 

714 

715 if agent_type in ("dewey", "explore", "document_writer", "multimodal"): 

716 logger.warning(f"[{agent_type}] Gemini failed, falling back to Claude sonnet-4.5") 

717 try: 

718 import subprocess 

719 fallback_result = subprocess.run( 

720 ["claude", "-p", prompt, "--model", "sonnet", "--output-format", "text"], 

721 capture_output=True, 

722 text=True, 

723 timeout=120, 

724 cwd=os.getcwd(), 

725 ) 

726 if fallback_result.returncode == 0 and fallback_result.stdout.strip(): 

727 return fallback_result.stdout.strip() 

728 except Exception as fallback_error: 

729 logger.error(f"Fallback to Claude also failed: {fallback_error}") 

730 

731 raise ValueError(f"All Antigravity endpoints failed: {last_error}") 

732 

733 response.raise_for_status() 

734 data = response.json() 

735 

736 # Extract text from response using thinking-aware parser 

737 return _extract_gemini_response(data) 

738 

739 

740# ======================== 

741# AGENTIC FUNCTION CALLING 

742# ======================== 

743 

744# Tool definitions for background agents 

745AGENT_TOOLS = [ 

746 { 

747 "functionDeclarations": [ 

748 { 

749 "name": "read_file", 

750 "description": "Read the contents of a file. Returns the file contents as text.", 

751 "parameters": { 

752 "type": "object", 

753 "properties": { 

754 "path": { 

755 "type": "string", 

756 "description": "Absolute or relative path to the file", 

757 } 

758 }, 

759 "required": ["path"], 

760 }, 

761 }, 

762 { 

763 "name": "list_directory", 

764 "description": "List files and directories in a path", 

765 "parameters": { 

766 "type": "object", 

767 "properties": { 

768 "path": {"type": "string", "description": "Directory path to list"} 

769 }, 

770 "required": ["path"], 

771 }, 

772 }, 

773 { 

774 "name": "grep_search", 

775 "description": "Search for a pattern in files using ripgrep. Returns matching lines with file paths and line numbers.", 

776 "parameters": { 

777 "type": "object", 

778 "properties": { 

779 "pattern": {"type": "string", "description": "The search pattern (regex)"}, 

780 "path": {"type": "string", "description": "Directory or file to search in"}, 

781 }, 

782 "required": ["pattern", "path"], 

783 }, 

784 }, 

785 { 

786 "name": "write_file", 

787 "description": "Write content to a file", 

788 "parameters": { 

789 "type": "object", 

790 "properties": { 

791 "path": {"type": "string", "description": "Path to the file to write"}, 

792 "content": { 

793 "type": "string", 

794 "description": "Content to write to the file", 

795 }, 

796 }, 

797 "required": ["path", "content"], 

798 }, 

799 }, 

800 ] 

801 } 

802] 

803 

804 

805def _execute_tool(name: str, args: dict) -> str: 

806 """Execute a tool and return the result.""" 

807 import subprocess 

808 from pathlib import Path 

809 

810 try: 

811 if name == "read_file": 

812 path = Path(args["path"]) 

813 if not path.exists(): 

814 return f"Error: File not found: {path}" 

815 return path.read_text() 

816 

817 elif name == "list_directory": 

818 path = Path(args["path"]) 

819 if not path.exists(): 

820 return f"Error: Directory not found: {path}" 

821 entries = [] 

822 for entry in path.iterdir(): 

823 entry_type = "DIR" if entry.is_dir() else "FILE" 

824 entries.append(f"[{entry_type}] {entry.name}") 

825 return "\n".join(entries) if entries else "(empty directory)" 

826 

827 elif name == "grep_search": 

828 pattern = args["pattern"] 

829 search_path = args["path"] 

830 result = subprocess.run( 

831 ["rg", "--json", "-m", "50", pattern, search_path], 

832 capture_output=True, 

833 text=True, 

834 timeout=30, 

835 ) 

836 if result.returncode == 0: 

837 return result.stdout[:10000] # Limit output size 

838 elif result.returncode == 1: 

839 return "No matches found" 

840 else: 

841 return f"Search error: {result.stderr}" 

842 

843 elif name == "write_file": 

844 path = Path(args["path"]) 

845 path.parent.mkdir(parents=True, exist_ok=True) 

846 path.write_text(args["content"]) 

847 return f"Successfully wrote {len(args['content'])} bytes to {path}" 

848 

849 else: 

850 return f"Unknown tool: {name}" 

851 

852 except Exception as e: 

853 return f"Tool error: {str(e)}" 

854 

855 

856async def _invoke_gemini_agentic_with_api_key( 

857 api_key: str, 

858 prompt: str, 

859 model: str = "gemini-3-flash", 

860 max_turns: int = 10, 

861 timeout: int = 120, 

862) -> str: 

863 """ 

864 Invoke Gemini with function calling using API key authentication (google-genai library). 

865 

866 This implements a multi-turn agentic loop: 

867 1. Send prompt with tool definitions 

868 2. If model returns FunctionCall, execute the tool 

869 3. Send FunctionResponse back to model 

870 4. Repeat until model returns text or max_turns reached 

871 

872 Args: 

873 api_key: Gemini API key (from GEMINI_API_KEY or GOOGLE_API_KEY env var) 

874 prompt: The task prompt 

875 model: Gemini model to use 

876 max_turns: Maximum number of tool-use turns 

877 timeout: Request timeout in seconds (currently unused by google-genai) 

878 

879 Returns: 

880 Final text response from the model 

881 

882 Raises: 

883 ImportError: If google-genai library is not installed 

884 ValueError: If API request fails 

885 """ 

886 # USER-VISIBLE NOTIFICATION (stderr) - Shows agentic mode with API key 

887 import sys 

888 print(f"🔮 GEMINI (API/Agentic): {model} | max_turns={max_turns}", file=sys.stderr) 

889 

890 try: 

891 from google import genai 

892 from google.genai import types 

893 except ImportError: 

894 raise ImportError( 

895 "google-genai library not installed. Install with: pip install google-genai" 

896 ) 

897 

898 # Map stravinsky model names to google-genai model names 

899 model_map = { 

900 "gemini-3-flash": "gemini-2.0-flash-exp", 

901 "gemini-3-pro-low": "gemini-2.0-flash-exp", 

902 "gemini-3-pro-high": "gemini-exp-1206", 

903 "gemini-flash": "gemini-2.0-flash-exp", 

904 "gemini-pro": "gemini-2.0-flash-exp", 

905 "gemini-3-pro": "gemini-2.0-flash-exp", 

906 "gemini": "gemini-2.0-flash-exp", 

907 } 

908 genai_model = model_map.get(model, model) 

909 

910 # Initialize client with API key 

911 client = genai.Client(api_key=api_key) 

912 

913 # Convert AGENT_TOOLS to google-genai format 

914 # google-genai expects tools as a list of Tool objects containing function_declarations 

915 function_declarations = [] 

916 for tool_group in AGENT_TOOLS: 

917 for func_decl in tool_group.get("functionDeclarations", []): 

918 function_declarations.append( 

919 types.FunctionDeclaration( 

920 name=func_decl["name"], 

921 description=func_decl["description"], 

922 parameters=func_decl["parameters"], 

923 ) 

924 ) 

925 

926 # Wrap function declarations in a Tool object 

927 tools = [types.Tool(function_declarations=function_declarations)] 

928 

929 # Initialize conversation with user message 

930 contents = [types.Content(role="user", parts=[types.Part(text=prompt)])] 

931 

932 for turn in range(max_turns): 

933 try: 

934 # Generate content with tools 

935 response = client.models.generate_content( 

936 model=genai_model, 

937 contents=contents, 

938 config=types.GenerateContentConfig( 

939 tools=tools, 

940 temperature=0.7, 

941 max_output_tokens=8192, 

942 ), 

943 ) 

944 

945 # Check if response has function calls 

946 if not response.candidates or not response.candidates[0].content.parts: 

947 return "No response generated" 

948 

949 parts = response.candidates[0].content.parts 

950 function_calls = [] 

951 text_parts = [] 

952 

953 for part in parts: 

954 if part.function_call: 

955 function_calls.append(part.function_call) 

956 elif part.text: 

957 text_parts.append(part.text) 

958 

959 # If no function calls, return text response 

960 if not function_calls: 

961 result = "".join(text_parts) 

962 return result if result.strip() else "Task completed" 

963 

964 # Execute function calls and prepare responses 

965 function_responses = [] 

966 for func_call in function_calls: 

967 func_name = func_call.name 

968 func_args = dict(func_call.args) if func_call.args else {} 

969 

970 logger.info(f"[AgenticGemini] Turn {turn + 1}: Executing {func_name}") 

971 result = _execute_tool(func_name, func_args) 

972 

973 function_responses.append( 

974 types.Part( 

975 function_response=types.FunctionResponse( 

976 name=func_name, 

977 response={"result": result}, 

978 ) 

979 ) 

980 ) 

981 

982 # Add model's response to conversation 

983 contents.append(response.candidates[0].content) 

984 

985 # Add function responses to conversation 

986 contents.append( 

987 types.Content( 

988 role="user", 

989 parts=function_responses, 

990 ) 

991 ) 

992 

993 except Exception as e: 

994 logger.error(f"[AgenticGemini] Error in turn {turn + 1}: {e}") 

995 raise ValueError(f"Gemini API key request failed: {e}") 

996 

997 return "Max turns reached without final response" 

998 

999 

1000async def invoke_gemini_agentic( 

1001 token_store: TokenStore, 

1002 prompt: str, 

1003 model: str = "gemini-3-flash", 

1004 max_turns: int = 10, 

1005 timeout: int = 120, 

1006) -> str: 

1007 """ 

1008 Invoke Gemini with function calling for agentic tasks. 

1009 

1010 This function implements a multi-turn agentic loop: 

1011 1. Send prompt with tool definitions 

1012 2. If model returns FunctionCall, execute the tool 

1013 3. Send FunctionResponse back to model 

1014 4. Repeat until model returns text or max_turns reached 

1015 

1016 Supports two authentication methods (API key takes precedence): 

1017 1. API Key: Set GEMINI_API_KEY or GOOGLE_API_KEY in environment 

1018 2. OAuth: Use Google OAuth via Antigravity (requires stravinsky-auth login gemini) 

1019 

1020 Args: 

1021 token_store: Token store for OAuth credentials 

1022 prompt: The task prompt 

1023 model: Gemini model to use 

1024 max_turns: Maximum number of tool-use turns 

1025 timeout: Request timeout in seconds 

1026 

1027 Returns: 

1028 Final text response from the model 

1029 """ 

1030 # Check for API key authentication (takes precedence over OAuth) 

1031 api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") 

1032 if api_key: 

1033 logger.info("[AgenticGemini] Using API key authentication (GEMINI_API_KEY)") 

1034 return await _invoke_gemini_agentic_with_api_key( 

1035 api_key=api_key, 

1036 prompt=prompt, 

1037 model=model, 

1038 max_turns=max_turns, 

1039 timeout=timeout, 

1040 ) 

1041 

1042 # Fallback to OAuth authentication (Antigravity) 

1043 logger.info("[AgenticGemini] Using OAuth authentication (Antigravity)") 

1044 

1045 # USER-VISIBLE NOTIFICATION (stderr) - Shows agentic mode with OAuth 

1046 import sys 

1047 print(f"🔮 GEMINI (OAuth/Agentic): {model} | max_turns={max_turns}", file=sys.stderr) 

1048 

1049 access_token = await _ensure_valid_token(token_store, "gemini") 

1050 api_model = resolve_gemini_model(model) 

1051 

1052 # Use persistent session ID for this conversation 

1053 session_id = _get_session_id(conversation_key="agentic") 

1054 

1055 # Project ID from environment or default 

1056 project_id = os.getenv("STRAVINSKY_ANTIGRAVITY_PROJECT_ID", ANTIGRAVITY_DEFAULT_PROJECT_ID) 

1057 

1058 headers = { 

1059 "Authorization": f"Bearer {access_token}", 

1060 "Content-Type": "application/json", 

1061 **ANTIGRAVITY_HEADERS, 

1062 } 

1063 

1064 # Initialize conversation 

1065 contents = [{"role": "user", "parts": [{"text": prompt}]}] 

1066 

1067 # Get pooled HTTP client for connection reuse 

1068 client = await _get_http_client() 

1069 

1070 for turn in range(max_turns): 

1071 # Build inner request payload (what goes inside "request" wrapper) 

1072 inner_payload = { 

1073 "contents": contents, 

1074 "tools": AGENT_TOOLS, 

1075 "generationConfig": { 

1076 "temperature": 0.7, 

1077 "maxOutputTokens": 8192, 

1078 }, 

1079 "sessionId": session_id, 

1080 } 

1081 

1082 # Wrap request body per reference implementation 

1083 # From request.ts wrapRequestBody() 

1084 import uuid as uuid_module # Local import workaround 

1085 

1086 wrapped_payload = { 

1087 "project": project_id, 

1088 "model": api_model, 

1089 "userAgent": "antigravity", 

1090 "requestId": f"agent-{uuid_module.uuid4()}", 

1091 "request": inner_payload, 

1092 } 

1093 

1094 # Try endpoints in fallback order 

1095 response = None 

1096 last_error = None 

1097 

1098 for endpoint in ANTIGRAVITY_ENDPOINTS: 

1099 # Reference uses: {endpoint}/v1internal:generateContent (NOT /models/{model}) 

1100 api_url = f"{endpoint}/v1internal:generateContent" 

1101 

1102 try: 

1103 response = await client.post( 

1104 api_url, 

1105 headers=headers, 

1106 json=wrapped_payload, 

1107 timeout=float(timeout), 

1108 ) 

1109 

1110 # 401/403 might be endpoint-specific, try next endpoint 

1111 if response.status_code in (401, 403): 

1112 logger.warning( 

1113 f"[AgenticGemini] Endpoint {endpoint} returned {response.status_code}, trying next" 

1114 ) 

1115 last_error = Exception(f"{response.status_code} from {endpoint}") 

1116 continue 

1117 

1118 # If we got a non-retryable response (success or 4xx client error), use it 

1119 if response.status_code < 500 and response.status_code != 429: 

1120 break 

1121 

1122 logger.warning( 

1123 f"[AgenticGemini] Endpoint {endpoint} returned {response.status_code}, trying next" 

1124 ) 

1125 

1126 except httpx.TimeoutException as e: 

1127 last_error = e 

1128 logger.warning(f"[AgenticGemini] Endpoint {endpoint} timed out, trying next") 

1129 continue 

1130 except Exception as e: 

1131 last_error = e 

1132 logger.warning(f"[AgenticGemini] Endpoint {endpoint} failed: {e}, trying next") 

1133 continue 

1134 

1135 if response is None: 

1136 raise ValueError(f"All Antigravity endpoints failed: {last_error}") 

1137 

1138 response.raise_for_status() 

1139 data = response.json() 

1140 

1141 # Extract response - unwrap outer "response" envelope if present 

1142 inner_response = data.get("response", data) 

1143 candidates = inner_response.get("candidates", []) 

1144 if not candidates: 

1145 return "No response generated" 

1146 

1147 content = candidates[0].get("content", {}) 

1148 parts = content.get("parts", []) 

1149 

1150 if not parts: 

1151 return "No response parts" 

1152 

1153 # Check for function call 

1154 function_call = None 

1155 text_response = None 

1156 

1157 for part in parts: 

1158 if "functionCall" in part: 

1159 function_call = part["functionCall"] 

1160 break 

1161 elif "text" in part: 

1162 text_response = part["text"] 

1163 

1164 if function_call: 

1165 # Execute the function 

1166 func_name = function_call.get("name") 

1167 func_args = function_call.get("args", {}) 

1168 

1169 logger.info(f"[AgenticGemini] Turn {turn + 1}: Executing {func_name}") 

1170 result = _execute_tool(func_name, func_args) 

1171 

1172 # Add model's response and function result to conversation 

1173 contents.append({"role": "model", "parts": [{"functionCall": function_call}]}) 

1174 contents.append( 

1175 { 

1176 "role": "user", 

1177 "parts": [ 

1178 {"functionResponse": {"name": func_name, "response": {"result": result}}} 

1179 ], 

1180 } 

1181 ) 

1182 else: 

1183 # No function call, return text response 

1184 return text_response or "Task completed" 

1185 

1186 return "Max turns reached without final response" 

1187 

1188 

1189@retry( 

1190 stop=stop_after_attempt(2), # Reduced from 5 to 2 attempts 

1191 wait=wait_exponential(multiplier=2, min=10, max=120), # Longer waits: 10s → 20s → 40s 

1192 retry=retry_if_exception(is_retryable_exception), 

1193 before_sleep=lambda retry_state: logger.info( 

1194 f"Server error, retrying in {retry_state.next_action.sleep} seconds..." 

1195 ), 

1196) 

1197async def invoke_openai( 

1198 token_store: TokenStore, 

1199 prompt: str, 

1200 model: str = "gpt-5.2-codex", 

1201 temperature: float = 0.7, 

1202 max_tokens: int = 4096, 

1203 thinking_budget: int = 0, 

1204) -> str: 

1205 """ 

1206 Invoke an OpenAI model with the given prompt. 

1207 

1208 Args: 

1209 token_store: Token store for API key 

1210 prompt: The prompt to send to OpenAI 

1211 model: OpenAI model to use 

1212 temperature: Sampling temperature (0.0-2.0) 

1213 max_tokens: Maximum tokens in response 

1214 

1215 Returns: 

1216 The model's response text. 

1217 

1218 Raises: 

1219 ValueError: If not authenticated with OpenAI 

1220 httpx.HTTPStatusError: If API request fails 

1221 """ 

1222 # Execute pre-model invoke hooks 

1223 params = { 

1224 "prompt": prompt, 

1225 "model": model, 

1226 "temperature": temperature, 

1227 "max_tokens": max_tokens, 

1228 "thinking_budget": thinking_budget, 

1229 "token_store": token_store, # Pass for hooks that need model access 

1230 "provider": "openai", # Identify which provider is being called 

1231 } 

1232 hook_manager = get_hook_manager() 

1233 params = await hook_manager.execute_pre_model_invoke(params) 

1234 

1235 # Update local variables from possibly modified params 

1236 prompt = params["prompt"] 

1237 model = params["model"] 

1238 temperature = params["temperature"] 

1239 max_tokens = params["max_tokens"] 

1240 thinking_budget = params["thinking_budget"] 

1241 

1242 # Extract agent context for logging (may be passed via params or original call) 

1243 agent_context = params.get("agent_context", {}) 

1244 agent_type = agent_context.get("agent_type", "direct") 

1245 task_id = agent_context.get("task_id", "") 

1246 description = agent_context.get("description", "") 

1247 prompt_summary = _summarize_prompt(prompt) 

1248 

1249 # Log with agent context and prompt summary 

1250 logger.info(f"[{agent_type}] → {model}: {prompt_summary}") 

1251 

1252 # USER-VISIBLE NOTIFICATION (stderr) - Shows when OpenAI is invoked 

1253 import sys 

1254 task_info = f" task={task_id}" if task_id else "" 

1255 desc_info = f" | {description}" if description else "" 

1256 print(f"🧠 OPENAI: {model} | agent={agent_type}{task_info}{desc_info}", file=sys.stderr) 

1257 

1258 access_token = await _ensure_valid_token(token_store, "openai") 

1259 logger.info("[invoke_openai] Got access token") 

1260 

1261 # ChatGPT Backend API - Uses Codex Responses endpoint 

1262 # Replicates opencode-openai-codex-auth plugin behavior 

1263 api_url = "https://chatgpt.com/backend-api/codex/responses" 

1264 

1265 # Extract account ID from JWT token 

1266 logger.info("[invoke_openai] Extracting account ID from JWT") 

1267 try: 

1268 parts = access_token.split(".") 

1269 payload_b64 = parts[1] 

1270 padding = 4 - len(payload_b64) % 4 

1271 if padding != 4: 

1272 payload_b64 += "=" * padding 

1273 jwt_payload = json_module.loads(base64.urlsafe_b64decode(payload_b64)) 

1274 account_id = jwt_payload.get("https://api.openai.com/auth", {}).get("chatgpt_account_id") 

1275 except Exception as e: 

1276 logger.error(f"Failed to extract account ID from JWT: {e}") 

1277 account_id = None 

1278 

1279 # Fetch official Codex instructions from GitHub 

1280 instructions = await _fetch_codex_instructions(model) 

1281 

1282 # Headers matching opencode-openai-codex-auth plugin 

1283 headers = { 

1284 "Authorization": f"Bearer {access_token}", 

1285 "Content-Type": "application/json", 

1286 "Accept": "text/event-stream", # SSE stream 

1287 "openai-beta": "responses=experimental", 

1288 "openai-originator": "codex_cli_rs", 

1289 } 

1290 

1291 if account_id: 

1292 headers["x-openai-account-id"] = account_id 

1293 

1294 # Request body matching opencode transformation 

1295 payload = { 

1296 "model": model, 

1297 "store": False, # Required by ChatGPT backend 

1298 "stream": True, # Always stream (handler converts to non-stream if needed) 

1299 "instructions": instructions, 

1300 "input": [{"role": "user", "content": prompt}], 

1301 "reasoning": {"effort": "high" if thinking_budget > 0 else "medium", "summary": "auto"}, 

1302 "text": {"verbosity": "medium"}, 

1303 "include": ["reasoning.encrypted_content"], 

1304 } 

1305 

1306 # Stream the response and collect text 

1307 text_chunks = [] 

1308 

1309 logger.info(f"[invoke_openai] Calling {api_url} with model {model}") 

1310 logger.info(f"[invoke_openai] Payload keys: {list(payload.keys())}") 

1311 logger.info(f"[invoke_openai] Instructions length: {len(instructions)}") 

1312 

1313 try: 

1314 async with httpx.AsyncClient() as client, client.stream( 

1315 "POST", api_url, headers=headers, json=payload, timeout=120.0 

1316 ) as response: 

1317 logger.info(f"[invoke_openai] Response status: {response.status_code}") 

1318 if response.status_code == 401: 

1319 raise ValueError( 

1320 "OpenAI authentication failed. Run: stravinsky-auth login openai" 

1321 ) 

1322 

1323 if response.status_code >= 400: 

1324 error_body = await response.aread() 

1325 error_text = error_body.decode("utf-8") 

1326 logger.error(f"OpenAI API error {response.status_code}: {error_text}") 

1327 logger.error(f"Request payload was: {payload}") 

1328 logger.error(f"Request headers were: {headers}") 

1329 raise ValueError(f"OpenAI API error {response.status_code}: {error_text}") 

1330 

1331 # Parse SSE stream for text deltas 

1332 async for line in response.aiter_lines(): 

1333 if line.startswith("data: "): 

1334 data_json = line[6:] # Remove "data: " prefix 

1335 try: 

1336 data = json_module.loads(data_json) 

1337 event_type = data.get("type") 

1338 

1339 # Extract text deltas from SSE stream 

1340 if event_type == "response.output_text.delta": 

1341 delta = data.get("delta", "") 

1342 text_chunks.append(delta) 

1343 

1344 except json_module.JSONDecodeError: 

1345 pass # Skip malformed JSON 

1346 except Exception as e: 

1347 logger.warning(f"Error processing SSE event: {e}") 

1348 

1349 # Return collected text 

1350 result = "".join(text_chunks) 

1351 if not result: 

1352 return "No response generated" 

1353 return result 

1354 

1355 except httpx.HTTPStatusError as e: 

1356 logger.error(f"HTTP error: {e}") 

1357 raise 

1358 except Exception as e: 

1359 logger.error(f"Unexpected error in invoke_openai: {e}") 

1360 raise ValueError(f"Failed to invoke OpenAI: {e}")