# rye:signed:2026-04-07T03:32:53Z:bb499e7bf397a37e2a30a90d42082bc6f5e6ca6ce02096f66382b8a3e3b9d990:0YPPYp0xqAH-Sx7YUYAocwLUdin9hT8ceHDPCByGSB07ygiB_bJe80WkXD57zrrGGQYbxGpV6gJwXp4JpgbtAA:6ea18199041a1ea8
"""
http_provider.py: ProviderAdapter for LLM HTTP API calls.

Makes HTTP calls to LLM providers (Anthropic, OpenAI, etc.) using httpx directly.
Auth, env resolution, retries, and SSE streaming are handled inline.

This adapter handles:
1. Formatting messages/tools into params the provider tool expects
2. Parsing the API response using the provider YAML's response_schema config
3. Converting messages using the provider YAML's message_schema config
4. Assembling streaming events using the provider YAML's stream_schema config

All provider-specific behavior is driven by YAML schemas — no hardcoded format handlers.
"""

__version__ = "2.0.0"
__tool_type__ = "python"
__category__ = "rye/agent/threads/adapters"
__tool_description__ = "HTTP provider adapter for LLM API calls"

import json
import logging
import os
import re as _re
import uuid
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

import httpx

from .provider_adapter import ProviderAdapter

logger = logging.getLogger(__name__)


@dataclass
class _HttpResult:
    """Lightweight result object matching the interface _raise_on_error expects."""
    success: bool
    status_code: int
    body: Any = None
    headers: Dict[str, str] = field(default_factory=dict)
    error: Optional[str] = None


class _ReturnSink:
    """In-memory sink that buffers SSE event strings for response assembly."""

    def __init__(self):
        self._events: List[str] = []

    async def write(self, event: str) -> None:
        self._events.append(event)

    async def close(self) -> None:
        pass

    def get_events(self) -> List[str]:
        return self._events


def _resolve_env(value: str) -> str:
    """Resolve ``${VAR}`` and ``${VAR:-default}`` placeholders in a string."""
    def _sub(m):
        var = m.group(1)
        default = m.group(3)  # may be None
        return os.environ.get(var, default if default is not None else "")
    return _re.sub(r"\$\{(\w+)(:-(.*?))?\}", _sub, value)


class HttpProvider(ProviderAdapter):
    """Provider that dispatches LLM calls through the tool execution chain.

    Fully data-driven: response parsing, message conversion, and stream assembly
    are all configured via provider YAML schemas (response_schema, message_schema,
    stream_schema). No provider-specific code paths.

    Args:
        model: Resolved model ID (e.g., "claude-3-5-haiku-20241022")
        provider_config: Full provider YAML config dict
        dispatcher: ToolDispatcher instance for dispatching tool calls
        provider_item_id: Tool item_id for the provider (e.g., "rye/agent/providers/anthropic")
    """

    def __init__(
        self,
        model: str,
        provider_config: Dict,
        dispatcher: Any,
        provider_item_id: str,
    ):
        super().__init__(model, provider_config)
        self._dispatcher = dispatcher
        self._provider_item_id = provider_item_id
        self._tool_use = provider_config.get("tool_use", {})
        self._http_config = provider_config.get("config", {})
        # max_output_tokens: per-model pricing override > provider default > 16384
        model_pricing = provider_config.get("pricing", {}).get(model, {})
        self._max_output_tokens = model_pricing.get(
            "max_output_tokens",
            provider_config.get("max_output_tokens", 16384),
        )

    @property
    def supports_streaming(self) -> bool:
        return True

    @property
    def _response_format(self) -> str:
        stream_schema = self._tool_use.get("stream_schema", {})
        return stream_schema.get("stream_mode", "content_blocks")

    # ── Utilities ──────────────────────────────────────────────────────

    def _resolve_path(self, obj: Any, path: str) -> Any:
        """Navigate nested dicts/lists via dot-separated path.

        Example: _resolve_path(data, "choices.0.message") navigates
        data["choices"][0]["message"].
        """
        if not path:
            return obj
        for key in path.split("."):
            if obj is None:
                return None
            if isinstance(obj, list):
                try:
                    obj = obj[int(key)]
                except (IndexError, ValueError):
                    return None
            elif isinstance(obj, dict):
                obj = obj.get(key)
            else:
                return None
        return obj

    def _detect_block(self, block: dict, detect_config: dict) -> bool:
        """Check if a content block matches a detection rule.

        Supports two modes:
        - field/value: block["type"] == "text"
        - key presence: "text" in block
        """
        if not detect_config:
            return False
        if "field" in detect_config:
            return block.get(detect_config["field"]) == detect_config["value"]
        if "key" in detect_config:
            return detect_config["key"] in block
        return False

    def _wrap_text_block(self, text: str, mode: str) -> Any:
        """Wrap text content per content_wrap mode."""
        if mode == "blocks_array":
            return {"type": "text", "text": text}
        elif mode == "parts_array":
            return {"text": text}
        return text

    # ── Message Conversion ─────────────────────────────────────────────

    def _convert_messages(self, messages: List[Dict], system_prompt: str = "") -> List[Dict]:
        """Convert runner message format to provider format using message_schema.

        Handles three concerns driven by YAML config:
        1. Tool result messages → provider-specific format (grouped or individual)
        2. Assistant messages with tool_calls → reconstructed with provider block format
        3. Regular messages → role-mapped and content-wrapped if needed
        4. System prompt → prepended as system role message (when system_mode is "message")
        """
        schema = self._tool_use.get("message_schema", {})
        role_map = schema.get("role_map", {"user": "user", "assistant": "assistant"})
        content_key = schema.get("content_key", "content")
        content_wrap = schema.get("content_wrap", "string")
        tr_config = schema.get("tool_result", {})
        tc_template = schema.get("tool_call_block_template", {})

        tr_role = tr_config.get("role", "user")
        tr_wrap = tr_config.get("wrap_mode", "content_blocks")
        tr_template = tr_config.get("block_template", {})
        tr_error_field = tr_config.get("error_field")

        converted = []
        pending_results = []
        # Map tool_call_id → name for providers that need name in tool results (Gemini)
        tc_name_map: Dict[str, str] = {}

        def flush_results():
            nonlocal pending_results
            if pending_results:
                converted.append({"role": tr_role, content_key: pending_results})
                pending_results = []

        for msg in messages:
            role = msg.get("role", "")

            if role == "tool":
                tc_id = msg.get("tool_call_id", "")
                tool_name = msg.get("tool_name", msg.get("name", ""))
                if not tool_name:
                    tool_name = tc_name_map.get(tc_id, "")
                data = {
                    "tool_call_id": tc_id,
                    "tool_name": tool_name,
                    "content": msg.get("content", ""),
                }
                block = self._apply_template(tr_template, data)
                if msg.get("is_error") and tr_error_field:
                    block[tr_error_field] = True

                if tr_wrap == "content_blocks":
                    pending_results.append(block)
                elif tr_wrap == "direct":
                    result_msg = {"role": tr_role}
                    result_msg.update(block)
                    converted.append(result_msg)
                elif tr_wrap == "parts":
                    converted.append({"role": tr_role, content_key: [block]})

            elif role == "assistant" and msg.get("tool_calls"):
                flush_results()
                assistant_role = role_map.get("assistant", "assistant")
                for tc in msg["tool_calls"]:
                    tc_name_map[tc["id"]] = tc["name"]

                if content_wrap == "string":
                    # OpenAI: tool_calls as top-level array on the message
                    assistant_msg = {
                        "role": assistant_role,
                        "content": msg.get("content") or None,
                    }
                    tc_list = []
                    for tc in msg["tool_calls"]:
                        tc_data = {
                            "id": tc["id"],
                            "name": tc["name"],
                            "input": tc["input"],
                            "input_json": json.dumps(tc["input"])
                            if isinstance(tc["input"], dict)
                            else str(tc["input"]),
                        }
                        tc_list.append(self._apply_template(tc_template, tc_data))
                    assistant_msg["tool_calls"] = tc_list
                    converted.append(assistant_msg)
                else:
                    # Block-based: tool calls are content blocks (Anthropic, Gemini)
                    blocks = []
                    thinking = msg.get("_thinking", "")
                    if thinking:
                        blocks.append({"thought": True, "text": thinking})
                    text = msg.get("content", "")
                    if text:
                        blocks.append(self._wrap_text_block(text, content_wrap))
                    for tc in msg["tool_calls"]:
                        if "_raw_block" in tc:
                            # Replay raw block (preserves thoughtSignature for Gemini)
                            blocks.append(tc["_raw_block"])
                        else:
                            tc_data = {
                                "id": tc["id"],
                                "name": tc["name"],
                                "input": tc["input"],
                            }
                            blocks.append(self._apply_template(tc_template, tc_data))
                    converted.append({"role": assistant_role, content_key: blocks})

            else:
                flush_results()
                mapped_role = role_map.get(role, role)
                if content_key == "content":
                    # Pass through as-is (Anthropic/OpenAI accept string content)
                    out = dict(msg)
                    if mapped_role != role:
                        out["role"] = mapped_role
                    converted.append(out)
                else:
                    # Different content key (e.g., Gemini "parts")
                    parts = []
                    thinking = msg.get("_thinking", "")
                    if thinking:
                        parts.append({"thought": True, "text": thinking})
                    content = msg.get("content", "")
                    if content:
                        parts.append(self._wrap_text_block(content, content_wrap))
                    converted.append({"role": mapped_role, content_key: parts})

        flush_results()

        # Prepend system message for providers that use message-role system prompts
        sys_config = self._tool_use.get("system_message", {})
        if system_prompt and sys_config.get("mode") == "message_role":
            converted.insert(0, {"role": "system", "content": system_prompt})

        return converted

    # ── Tool Formatting ────────────────────────────────────────────────

    def _format_tools(self, tools: List[Dict]) -> List[Dict]:
        """Format tool schemas using tool_use.tool_definition from provider config.

        The YAML defines field mapping via template placeholders:
            Anthropic: {name: "{name}", description: "{description}", input_schema: "{schema}"}
            OpenAI:    {type: function, function: {name: "{name}", parameters: "{schema}"}}
            Gemini:    {name: "{name}", ...} + tool_list_wrap: "functionDeclarations"

        When tool_list_wrap is set, all formatted tools are grouped into a single
        object under that key (e.g., Gemini needs [{functionDeclarations: [...all...]}]).
        """
        if not tools:
            return tools
        tool_def_template = self._tool_use.get("tool_definition", {})
        if not tool_def_template:
            return tools
        formatted = [self._apply_template(tool_def_template, tool) for tool in tools]
        wrap_key = self._tool_use.get("tool_list_wrap")
        if wrap_key:
            return [{wrap_key: formatted}]
        return formatted

    def _apply_template(self, template: Any, tool: Dict) -> Any:
        """Recursively apply template placeholders from data dict."""
        import re
        if isinstance(template, str):
            match = re.match(r"^\{(\w+)\}$", template.strip())
            if match:
                return tool.get(match.group(1), "")
            return template
        if isinstance(template, dict):
            return {k: self._apply_template(v, tool) for k, v in template.items()}
        if isinstance(template, list):
            return [self._apply_template(item, tool) for item in template]
        return template

    # ── Response Parsing ───────────────────────────────────────────────

    def _parse_response(self, response_body: Dict) -> Dict:
        """Parse any LLM API response using response_schema from provider YAML."""
        schema = self._tool_use.get("response_schema", {})
        mode = schema.get("content_mode", "blocks")

        text_parts = []
        thinking_parts = []
        tool_calls = []

        if mode == "blocks":
            content_path = schema.get("content_path", "content")
            blocks = self._resolve_path(response_body, content_path) or []
            detect = schema.get("block_detect", {})

            for block in blocks:
                if self._detect_block(block, detect.get("thinking", {})):
                    thinking_parts.append(
                        self._resolve_path(block, schema.get("text_value", "text")) or ""
                    )
                elif self._detect_block(block, detect.get("text", {})):
                    text_parts.append(
                        self._resolve_path(block, schema.get("text_value", "text")) or ""
                    )
                elif self._detect_block(block, detect.get("tool_call", {})):
                    name = self._resolve_path(block, schema["tool_call_name"]) or ""
                    raw_input = self._resolve_path(block, schema["tool_call_input"]) or {}
                    tc_id_path = schema.get("tool_call_id")
                    tc_id = (
                        self._resolve_path(block, tc_id_path)
                        if tc_id_path
                        else str(uuid.uuid4())
                    )
                    tc = {"id": tc_id, "name": name, "input": raw_input}
                    # Preserve raw block for providers that need it (Gemini thoughtSignature)
                    if "thoughtSignature" in block:
                        tc["_raw_block"] = block
                    tool_calls.append(tc)

        elif mode == "separate":
            message = (
                self._resolve_path(response_body, schema.get("content_path", ""))
                or {}
            )
            text_parts.append(message.get(schema.get("text_field", "content")) or "")

            raw_calls = message.get(schema.get("tool_calls_field", "tool_calls")) or []
            input_format = schema.get("tool_call_input_format")

            for tc in raw_calls:
                name = self._resolve_path(tc, schema["tool_call_name"]) or ""
                raw_input = self._resolve_path(tc, schema["tool_call_input"]) or {}
                if input_format == "json_string" and isinstance(raw_input, str):
                    try:
                        raw_input = json.loads(raw_input)
                    except (json.JSONDecodeError, ValueError):
                        raw_input = {"_raw": raw_input}
                tc_id = (
                    self._resolve_path(tc, schema.get("tool_call_id", "id")) or ""
                )
                tool_calls.append({"id": tc_id, "name": name, "input": raw_input})

        # Usage — always via dot-path
        usage_obj = (
            self._resolve_path(response_body, schema.get("usage_path", "usage")) or {}
        )
        input_tokens = usage_obj.get(schema.get("input_tokens", "input_tokens"), 0)
        output_tokens = usage_obj.get(schema.get("output_tokens", "output_tokens"), 0)

        # Finish reason — via dot-path
        finish_reason = (
            self._resolve_path(
                response_body, schema.get("finish_reason_path", "stop_reason")
            )
            or "stop"
        )

        # Cost
        pricing = self.config.get("pricing", {}).get(self.model, {})
        spend = (
            input_tokens * pricing.get("input", 0.0)
            + output_tokens * pricing.get("output", 0.0)
        ) / 1_000_000

        result = {
            "text": "\n".join(text_parts)
            if len(text_parts) > 1
            else (text_parts[0] if text_parts else ""),
            "tool_calls": tool_calls,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "spend": spend,
            "finish_reason": finish_reason,
        }
        if thinking_parts:
            result["thinking"] = "\n".join(thinking_parts)
        return result

    # ── HTTP Execution ────────────────────────────────────────────────

    def _build_body(self, params: Dict) -> Dict:
        """Template the body config with params, substituting {key} placeholders."""
        body_template = self._http_config.get("body", {})
        return self._apply_template(body_template, params)

    async def _get_http_client(self):
        """Lazily create and return the httpx client."""
        import httpx
        if not hasattr(self, "_http_client"):
            self._http_client = httpx.AsyncClient(
                limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
                timeout=httpx.Timeout(30.0),
            )
        return self._http_client

    def _inject_system_prompt(self, body: Dict, system: str) -> None:
        """Inject system prompt into the request body using profile config.

        Uses the system_message config from the profile's tool_use section:
        - mode "body_field": sets body[field] = system (Anthropic)
        - mode "body_inject": deep-merges a template structure (Gemini)
        - mode "message_role": handled in _convert_messages, not here
        """
        sys_config = self._tool_use.get("system_message", {})
        mode = sys_config.get("mode", "body_field")

        if mode == "body_field":
            field = sys_config.get("field", "system")
            body[field] = system
        elif mode == "body_inject":
            template = sys_config.get("template", {})
            if template:
                body.update(self._apply_template(template, {"system": system}))

    async def _execute_http(self, params: Dict) -> "_HttpResult":
        """Execute HTTP request using httpx with merged provider config."""
        config = dict(self._http_config)
        mode = params.pop("mode", "sync")
        system = params.pop("system", "")
        sinks = params.pop("__sinks", [])

        url_key = "stream_url" if mode == "stream" and "stream_url" in config else "url"
        url = _resolve_env(config.get(url_key, config.get("url", "")).format(**params))
        body = self._build_body(params)

        if system:
            self._inject_system_prompt(body, system)

        headers = {}
        for k, v in config.get("headers", {}).items():
            headers[k] = _resolve_env(str(v))

        auth_config = config.get("auth", {})
        if auth_config:
            auth_type = auth_config.get("type")
            if auth_type == "bearer":
                headers["Authorization"] = f"Bearer {_resolve_env(auth_config.get('token', ''))}"
            elif auth_type == "api_key":
                headers[auth_config.get("header", "X-API-Key")] = _resolve_env(auth_config.get("key", ""))

        timeout = config.get("timeout", 30)
        retry_config = config.get("retry", {})
        client = await self._get_http_client()

        if mode == "stream":
            return await self._do_stream(client, url, headers, body, timeout, sinks)
        return await self._do_sync(client, url, headers, body, timeout, retry_config)

    async def _do_sync(self, client, url, headers, body, timeout, retry_config) -> "_HttpResult":
        """Synchronous HTTP request with retry."""
        import asyncio as _asyncio
        max_attempts = retry_config.get("max_attempts", 1)
        backoff = retry_config.get("backoff", "exponential")
        last_error = None

        for attempt in range(max_attempts):
            try:
                resp = await client.request(
                    method="POST", url=url, headers=headers,
                    content=json.dumps(body) if body else None, timeout=timeout,
                )
                try:
                    resp_body = resp.json()
                except (json.JSONDecodeError, ValueError):
                    resp_body = resp.text
                success = 200 <= resp.status_code < 400
                return _HttpResult(
                    success=success, status_code=resp.status_code, body=resp_body,
                    headers=dict(resp.headers),
                    error=None if success else f"HTTP {resp.status_code}: {resp.reason_phrase}",
                )
            except Exception as e:
                last_error = str(e)
                if attempt < max_attempts - 1:
                    delay = 2**attempt if backoff == "exponential" else 1
                    await _asyncio.sleep(delay)

        return _HttpResult(
            success=False, status_code=0, body=None, headers={},
            error=f"Request failed after {max_attempts} attempts: {last_error}",
        )

    async def _do_stream(self, client, url, headers, body, timeout, sinks) -> "_HttpResult":
        """Streaming HTTP request (SSE) with sink fan-out."""
        try:
            async with client.stream(
                method="POST", url=url, headers=headers,
                content=json.dumps(body) if body else None, timeout=timeout,
            ) as resp:
                success = 200 <= resp.status_code < 400
                if not success:
                    raw = await resp.aread()
                    try:
                        err_body = json.loads(raw)
                    except (json.JSONDecodeError, ValueError):
                        err_body = {"raw": raw.decode("utf-8", errors="replace")}
                    return _HttpResult(
                        success=False, status_code=resp.status_code, body=err_body,
                        headers=dict(resp.headers),
                        error=f"HTTP {resp.status_code}: {resp.reason_phrase}",
                    )

                event_count = 0
                async for line in resp.aiter_lines():
                    if line.startswith("data:"):
                        event_data = line[5:].strip()
                        if event_data:
                            event_count += 1
                            for sink in sinks:
                                await sink.write(event_data)

                for sink in sinks:
                    await sink.close()

                buffered = None
                for s in sinks:
                    if hasattr(s, "get_events"):
                        buffered = s.get_events()
                        break

                return _HttpResult(
                    success=True, status_code=resp.status_code, body=buffered,
                    headers=dict(resp.headers),
                )
        except Exception as e:
            for sink in sinks:
                try:
                    await sink.close()
                except Exception:
                    pass
            return _HttpResult(
                success=False, status_code=0, body=None, headers={},
                error=f"Unexpected error: {e}",
            )

    def _raise_on_error(self, result, streaming: bool = False):
        """Convert http_client result to ProviderCallError if failed."""
        from ..errors import ProviderCallError

        if result.success:
            return

        if os.environ.get("RYE_DEBUG"):
            logger.error("Provider HTTP failed: status=%s body=%s error=%s", result.status_code, result.body, result.error)

        body = result.body if isinstance(result.body, dict) else {}
        http_status = result.status_code
        request_id = result.headers.get("request-id", "") if result.headers else ""

        error_code = None
        if isinstance(body, dict) and "error" in body:
            api_error = body["error"]
            if isinstance(api_error, dict):
                error_msg = api_error.get("message", str(api_error))
                error_type = api_error.get("type", "api_error")
                error_code = api_error.get("code")
            else:
                error_msg = str(api_error)
                error_type = "api_error"
        else:
            error_msg = result.error or str(body or "Unknown provider error")
            error_type = "unknown"

        # Quota/billing errors are never retryable even on 429
        non_retryable_codes = {"insufficient_quota", "quota_exceeded", "billing_hard_limit_reached"}
        if error_code and error_code in non_retryable_codes:
            is_retryable = False
        elif http_status is not None:
            is_retryable = http_status in (0, 429, 500, 502, 503, 529)
        else:
            is_retryable = True

        raise ProviderCallError(
            provider_id=self._provider_item_id,
            message=error_msg,
            http_status=http_status,
            request_id=request_id,
            error_type=error_code or error_type,
            retryable=is_retryable,
        )

    # ── Completion ─────────────────────────────────────────────────────

    async def create_completion(
        self, messages: List[Dict], tools: List[Dict], system_prompt: str = ""
    ) -> Dict:
        """Send messages to LLM via direct HTTP call using merged provider config."""
        converted_messages = self._convert_messages(messages, system_prompt=system_prompt)
        formatted_tools = self._format_tools(tools) if tools else []

        params = {
            "model": self.model,
            "messages": converted_messages,
            "max_tokens": self._max_output_tokens,
        }
        if formatted_tools:
            params["tools"] = formatted_tools
        if system_prompt:
            params["system"] = system_prompt

        result = await self._execute_http(params)
        self._raise_on_error(result)

        response_body = result.body if isinstance(result.body, dict) else {}
        return self._parse_response(response_body)

    async def create_streaming_completion(
        self, messages: List[Dict], tools: List[Dict], sinks: Optional[List] = None,
        system_prompt: str = "",
    ) -> Dict:
        """Send messages to LLM via streaming, with real-time sink fan-out.

        Sinks receive raw SSE events as they arrive (for transcript writing).
        A _ReturnSink is always added to buffer events for final response assembly.

        Returns the same response dict as create_completion().
        """
        converted_messages = self._convert_messages(messages, system_prompt=system_prompt)
        formatted_tools = self._format_tools(tools) if tools else []

        params = {
            "model": self.model,
            "messages": converted_messages,
            "max_tokens": self._max_output_tokens,
            "stream": True,
            "mode": "stream",
        }
        if formatted_tools:
            params["tools"] = formatted_tools
        if system_prompt:
            params["system"] = system_prompt

        return_sink = _ReturnSink()
        all_sinks = [return_sink] + (sinks or [])
        params["__sinks"] = all_sinks

        result = await self._execute_http(params)
        self._raise_on_error(result, streaming=True)

        events = return_sink.get_events()
        return self._assemble_stream_response(events)

    # ── Stream Assembly ────────────────────────────────────────────────

    def _assemble_stream_response(self, events: List[str]) -> Dict:
        """Assemble buffered SSE events into response dict using stream_schema."""
        schema = self._tool_use.get("stream_schema", {})
        mode = schema.get("stream_mode", "event_typed")

        if mode == "delta_merge":
            return self._assemble_delta_merge(events, schema)
        if mode == "complete_chunks":
            return self._assemble_complete_chunks(events, schema)
        return self._assemble_event_typed(events, schema)

    def _assemble_event_typed(self, events: List[str], schema: Dict) -> Dict:
        """Assemble event-typed SSE stream (Anthropic pattern).

        Events have a type field that determines their structure:
        message_start, content_block_start, content_block_delta, message_delta.
        Field names and paths are all driven by stream_schema + response_schema.
        """
        event_type_field = schema.get("event_type_field", "type")
        block_start_path = schema.get("block_start_path", "content_block")

        resp_schema = self._tool_use.get("response_schema", {})
        block_detect = resp_schema.get("block_detect", {})

        text_parts = []
        tool_calls = []
        finish_reason = "end_turn"
        input_tokens = 0
        output_tokens = 0

        for raw in events:
            try:
                data = json.loads(raw)
            except (json.JSONDecodeError, ValueError):
                continue

            event_type = data.get(event_type_field, "")

            if event_type == schema.get("message_start_type", "message_start"):
                usage = (
                    self._resolve_path(
                        data, schema.get("message_start_usage", "message.usage")
                    )
                    or {}
                )
                input_tokens += usage.get(
                    resp_schema.get("input_tokens", "input_tokens"), 0
                )
                output_tokens += usage.get(
                    resp_schema.get("output_tokens", "output_tokens"), 0
                )

            elif event_type == schema.get("block_start_type", "content_block_start"):
                block = self._resolve_path(data, block_start_path) or {}
                if self._detect_block(block, block_detect.get("tool_call", {})):
                    tc_id_path = resp_schema.get("tool_call_id")
                    tc_id = (
                        self._resolve_path(block, tc_id_path)
                        if tc_id_path
                        else str(uuid.uuid4())
                    )
                    tool_calls.append({
                        "id": tc_id,
                        "name": self._resolve_path(
                            block, resp_schema.get("tool_call_name", "name")
                        ) or "",
                        "input_parts": [],
                    })

            elif event_type == schema.get("block_delta_type", "content_block_delta"):
                delta = self._resolve_path(
                    data, schema.get("delta_path", "delta")
                ) or {}
                delta_type = delta.get(
                    schema.get("delta_type_field", "type"), ""
                )
                if delta_type == schema.get("text_delta_type", "text_delta"):
                    text_parts.append(
                        delta.get(schema.get("text_delta_field", "text"), "")
                    )
                elif delta_type == schema.get(
                    "tool_input_delta_type", "input_json_delta"
                ):
                    if tool_calls:
                        tool_calls[-1]["input_parts"].append(
                            delta.get(
                                schema.get("tool_input_delta_field", "partial_json"),
                                "",
                            )
                        )

            elif event_type == schema.get("message_delta_type", "message_delta"):
                fr = self._resolve_path(
                    data, schema.get("finish_reason_path", "delta.stop_reason")
                )
                if fr:
                    finish_reason = fr
                usage = (
                    self._resolve_path(
                        data, schema.get("delta_usage_path", "usage")
                    )
                    or {}
                )
                output_tokens += usage.get(
                    resp_schema.get("output_tokens", "output_tokens"), 0
                )

        assembled_calls = []
        for tc in tool_calls:
            input_str = "".join(tc["input_parts"])
            try:
                inp = json.loads(input_str) if input_str else {}
            except (json.JSONDecodeError, ValueError):
                inp = {"_raw": input_str}
            assembled_calls.append({"id": tc["id"], "name": tc["name"], "input": inp})

        pricing = self.config.get("pricing", {}).get(self.model, {})
        spend = (
            input_tokens * pricing.get("input", 0.0)
            + output_tokens * pricing.get("output", 0.0)
        ) / 1_000_000

        return {
            "text": "".join(text_parts),
            "tool_calls": assembled_calls,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "spend": spend,
            "finish_reason": finish_reason,
        }

    def _assemble_delta_merge(self, events: List[str], schema: Dict) -> Dict:
        """Assemble delta-merge SSE stream (OpenAI pattern).

        Events are progressive deltas with choices array. Text and tool call
        fragments are merged across events. Field names driven by stream_schema.
        """
        done_signal = schema.get("done_signal", "[DONE]")
        resp_schema = self._tool_use.get("response_schema", {})

        text_parts = []
        tool_calls = {}  # index -> {id, name, arguments_parts}
        finish_reason = "stop"
        input_tokens = 0
        output_tokens = 0

        in_tok_field = schema.get(
            "input_tokens_field", resp_schema.get("input_tokens", "prompt_tokens")
        )
        out_tok_field = schema.get(
            "output_tokens_field", resp_schema.get("output_tokens", "completion_tokens")
        )

        for raw in events:
            if raw == done_signal:
                continue
            try:
                data = json.loads(raw)
            except (json.JSONDecodeError, ValueError):
                continue

            choices = data.get(schema.get("choices_field", "choices"), [])
            if not choices:
                usage = (
                    self._resolve_path(data, schema.get("usage_path", "usage"))
                    or {}
                )
                input_tokens += usage.get(in_tok_field, 0)
                output_tokens += usage.get(out_tok_field, 0)
                continue

            choice = choices[0]
            delta = choice.get(schema.get("delta_field", "delta"), {})

            text_field = schema.get("text_delta_field", "content")
            if text_field in delta and delta[text_field]:
                text_parts.append(delta[text_field])

            tc_field = schema.get("tool_calls_field", "tool_calls")
            for tc in delta.get(tc_field, []):
                idx = tc.get(schema.get("tool_call_index_field", "index"), 0)
                if idx not in tool_calls:
                    tool_calls[idx] = {"id": "", "name": "", "arguments_parts": []}
                tc_id_field = schema.get("tool_call_id_field", "id")
                if tc.get(tc_id_field):
                    tool_calls[idx]["id"] = tc[tc_id_field]
                func = (
                    self._resolve_path(
                        tc, schema.get("tool_call_func_path", "function")
                    )
                    or {}
                )
                name_field = schema.get("tool_call_name_field", "name")
                args_field = schema.get("tool_call_args_field", "arguments")
                if func.get(name_field):
                    tool_calls[idx]["name"] = func[name_field]
                if func.get(args_field):
                    tool_calls[idx]["arguments_parts"].append(func[args_field])

            fr_field = schema.get("finish_reason_field", "finish_reason")
            if choice.get(fr_field):
                finish_reason = choice[fr_field]

            usage = (
                self._resolve_path(data, schema.get("usage_path", "usage")) or {}
            )
            input_tokens += usage.get(in_tok_field, 0)
            output_tokens += usage.get(out_tok_field, 0)

        assembled_calls = []
        for idx in sorted(tool_calls):
            tc = tool_calls[idx]
            args_str = "".join(tc["arguments_parts"])
            try:
                args = json.loads(args_str) if args_str else {}
            except (json.JSONDecodeError, ValueError):
                args = {"_raw": args_str}
            assembled_calls.append({"id": tc["id"], "name": tc["name"], "input": args})

        pricing = self.config.get("pricing", {}).get(self.model, {})
        spend = (
            input_tokens * pricing.get("input", 0.0)
            + output_tokens * pricing.get("output", 0.0)
        ) / 1_000_000

        return {
            "text": "".join(text_parts),
            "tool_calls": assembled_calls,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "spend": spend,
            "finish_reason": finish_reason,
        }

    def _assemble_complete_chunks(self, events: List[str], schema: Dict) -> Dict:
        """Assemble complete-chunk SSE stream (Gemini pattern).

        Each event is a complete response-like object with candidates/parts.
        Reuses response_schema to extract content from each chunk, then
        accumulates text and tool calls across chunks.
        """
        resp_schema = self._tool_use.get("response_schema", {})
        done_signal = schema.get("done_signal")
        detect = resp_schema.get("block_detect", {})
        content_path = resp_schema.get("content_path", "content")

        text_parts = []
        thinking_parts = []
        tool_calls = []
        input_tokens = 0
        output_tokens = 0
        finish_reason = "stop"

        for raw in events:
            if done_signal and raw == done_signal:
                continue
            try:
                data = json.loads(raw)
            except (json.JSONDecodeError, ValueError):
                continue

            blocks = self._resolve_path(data, content_path) or []

            for block in blocks:
                if self._detect_block(block, detect.get("thinking", {})):
                    thinking_parts.append(
                        self._resolve_path(
                            block, resp_schema.get("text_value", "text")
                        ) or ""
                    )
                elif self._detect_block(block, detect.get("text", {})):
                    text_parts.append(
                        self._resolve_path(
                            block, resp_schema.get("text_value", "text")
                        ) or ""
                    )
                elif self._detect_block(block, detect.get("tool_call", {})):
                    name = (
                        self._resolve_path(block, resp_schema["tool_call_name"])
                        or ""
                    )
                    raw_input = (
                        self._resolve_path(block, resp_schema["tool_call_input"])
                        or {}
                    )
                    tc_id_path = resp_schema.get("tool_call_id")
                    tc_id = (
                        self._resolve_path(block, tc_id_path)
                        if tc_id_path
                        else str(uuid.uuid4())
                    )
                    tc = {"id": tc_id, "name": name, "input": raw_input}
                    if "thoughtSignature" in block:
                        tc["_raw_block"] = block
                    tool_calls.append(tc)

            # Usage — Gemini reports cumulative, take the max
            usage_obj = (
                self._resolve_path(data, resp_schema.get("usage_path", "usage"))
                or {}
            )
            chunk_in = usage_obj.get(
                resp_schema.get("input_tokens", "input_tokens"), 0
            )
            chunk_out = usage_obj.get(
                resp_schema.get("output_tokens", "output_tokens"), 0
            )
            input_tokens = max(input_tokens, chunk_in)
            output_tokens = max(output_tokens, chunk_out)

            fr = self._resolve_path(
                data, resp_schema.get("finish_reason_path", "stop_reason")
            )
            if fr:
                finish_reason = fr

        pricing = self.config.get("pricing", {}).get(self.model, {})
        spend = (
            input_tokens * pricing.get("input", 0.0)
            + output_tokens * pricing.get("output", 0.0)
        ) / 1_000_000

        result = {
            "text": "".join(text_parts),
            "tool_calls": tool_calls,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "spend": spend,
            "finish_reason": finish_reason,
        }
        if thinking_parts:
            result["thinking"] = "".join(thinking_parts)
        return result
