Source code for jeevesagent.model.anthropic

"""Adapter for Anthropic's Claude models via the official ``anthropic`` SDK.

Streams via ``messages.stream``; normalises Anthropic's event types into
our :class:`ModelChunk` shape:

* ``text_delta`` -> ``ModelChunk(kind="text", text=...)``
* ``input_json_delta`` accumulates partial tool-use JSON; on
  ``content_block_stop`` we emit ``ModelChunk(kind="tool_call", ...)``
* ``message_delta`` carries the final ``stop_reason`` and output token
  count; ``message_start`` carries the input token count
* a single trailing ``ModelChunk(kind="finish", ...)`` is emitted when
  the stream ends, regardless of whether tools were called

The SDK is imported lazily inside ``__init__`` so users can
``from jeevesagent.model import AnthropicModel`` without the
``anthropic`` extra installed; the import only fires when the
constructor needs to build a default client.
"""

from __future__ import annotations

import json
import os
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import Any

from ..core.types import Message, ModelChunk, Role, ToolCall, ToolDef, Usage

DEFAULT_MAX_TOKENS = 4096


@dataclass
class _PartialTool:
    id: str = ""
    name: str = ""
    args_json: str = ""


[docs] class AnthropicModel: """Talks to Claude via :class:`anthropic.AsyncAnthropic`.""" def __init__( self, model: str = "claude-opus-4-7", *, client: Any = None, api_key: str | None = None, max_tokens: int = DEFAULT_MAX_TOKENS, ) -> None: self.name = model self._max_tokens = max_tokens if client is not None: self._client = client else: try: from anthropic import AsyncAnthropic except ImportError as exc: # pragma: no cover — depends on user env raise ImportError( "Anthropic SDK not installed. " "Install with: pip install 'jeevesagent[anthropic]'" ) from exc key = api_key or os.environ.get("ANTHROPIC_API_KEY") self._client = AsyncAnthropic(api_key=key)
[docs] async def complete( self, messages: list[Message], *, tools: list[ToolDef] | None = None, temperature: float = 1.0, max_tokens: int | None = None, ) -> tuple[str, list[ToolCall], Usage, str]: """Single-shot non-streaming completion. Calls ``client.messages.create(...)`` (no ``stream=True``, no ``stream`` context manager) — Anthropic returns the full ``Message`` in one HTTP response. We walk its ``content`` blocks once to assemble ``(text, tool_calls, usage, stop_reason)``. Used by the non-streaming hot path (``agent.run()``); ``agent.stream()`` keeps using :meth:`stream`. Falls back to consuming :meth:`stream` if the underlying client raises (test fakes that only support streaming, or transports that don't honour single-shot creation). """ system, anth_messages = _to_anthropic_messages(messages) anth_tools = [_to_anthropic_tool(t) for t in (tools or [])] kwargs: dict[str, Any] = { "model": self.name, "messages": anth_messages, "max_tokens": max_tokens or self._max_tokens, "temperature": temperature, } if system: kwargs["system"] = system if anth_tools: kwargs["tools"] = anth_tools try: response = await self._client.messages.create(**kwargs) except Exception: # noqa: BLE001 — fallback for fake / non-conforming clients return await _consume_anthropic_stream( self.stream( messages, tools=tools, temperature=temperature, max_tokens=max_tokens, ) ) # If the SDK actually returned a stream object instead of a # Message (some test fakes), drain the stream path. if hasattr(response, "__aiter__") and not hasattr(response, "content"): return await _consume_anthropic_stream( self.stream( messages, tools=tools, temperature=temperature, max_tokens=max_tokens, ) ) text_parts: list[str] = [] tool_calls: list[ToolCall] = [] for block in getattr(response, "content", None) or []: btype = getattr(block, "type", None) if btype == "text": text_parts.append(getattr(block, "text", "") or "") elif btype == "tool_use": args_raw = getattr(block, "input", None) args: dict[str, Any] = ( dict(args_raw) if isinstance(args_raw, dict) else {} ) tool_calls.append( ToolCall( id=getattr(block, "id", "") or "", tool=getattr(block, "name", "") or "", args=args, ) ) u = getattr(response, "usage", None) usage = Usage( input_tokens=getattr(u, "input_tokens", 0) or 0, output_tokens=getattr(u, "output_tokens", 0) or 0, ) stop_reason = ( getattr(response, "stop_reason", None) or "end_turn" ) return "".join(text_parts), tool_calls, usage, str(stop_reason)
[docs] async def stream( self, messages: list[Message], *, tools: list[ToolDef] | None = None, temperature: float = 1.0, max_tokens: int | None = None, ) -> AsyncIterator[ModelChunk]: system, anth_messages = _to_anthropic_messages(messages) anth_tools = [_to_anthropic_tool(t) for t in (tools or [])] kwargs: dict[str, Any] = { "model": self.name, "messages": anth_messages, "max_tokens": max_tokens or self._max_tokens, "temperature": temperature, } if system: kwargs["system"] = system if anth_tools: kwargs["tools"] = anth_tools partials: dict[int, _PartialTool] = {} agg_input = 0 agg_output = 0 finish_reason: str | None = None async with self._client.messages.stream(**kwargs) as stream: async for event in stream: etype = getattr(event, "type", None) if etype == "message_start": msg = getattr(event, "message", None) usage = getattr(msg, "usage", None) if msg is not None else None if usage is not None: agg_input += getattr(usage, "input_tokens", 0) or 0 agg_output += getattr(usage, "output_tokens", 0) or 0 elif etype == "content_block_start": block = event.content_block if getattr(block, "type", None) == "tool_use": partials[event.index] = _PartialTool( id=getattr(block, "id", "") or "", name=getattr(block, "name", "") or "", ) elif etype == "content_block_delta": delta = event.delta dtype = getattr(delta, "type", None) if dtype == "text_delta": text = getattr(delta, "text", "") or "" if text: yield ModelChunk(kind="text", text=text) elif dtype == "input_json_delta": partial = partials.get(event.index) if partial is not None: partial.args_json += ( getattr(delta, "partial_json", "") or "" ) elif etype == "content_block_stop": partial = partials.pop(event.index, None) if partial is not None: try: args = ( json.loads(partial.args_json) if partial.args_json else {} ) except json.JSONDecodeError: args = {} yield ModelChunk( kind="tool_call", tool_call=ToolCall( id=partial.id, tool=partial.name, args=args, ), ) elif etype == "message_delta": usage = getattr(event, "usage", None) if usage is not None: agg_output += getattr(usage, "output_tokens", 0) or 0 delta = getattr(event, "delta", None) if delta is not None: sr = getattr(delta, "stop_reason", None) if sr: finish_reason = sr yield ModelChunk( kind="finish", finish_reason=finish_reason or "end_turn", usage=Usage(input_tokens=agg_input, output_tokens=agg_output), )
async def _consume_anthropic_stream( chunks: AsyncIterator[ModelChunk], ) -> tuple[str, list[ToolCall], Usage, str]: """Drain a ``ModelChunk`` stream into the same return tuple as :meth:`AnthropicModel.complete`. Used when the non-streaming transport path is unavailable (test fakes / niche SDKs).""" text_parts: list[str] = [] tool_calls: list[ToolCall] = [] usage = Usage() finish_reason = "end_turn" async for chunk in chunks: if chunk.kind == "text" and chunk.text is not None: text_parts.append(chunk.text) elif ( chunk.kind == "tool_call" and chunk.tool_call is not None ): tool_calls.append(chunk.tool_call) elif chunk.kind == "finish": if chunk.usage is not None: usage = chunk.usage if chunk.finish_reason: finish_reason = chunk.finish_reason return "".join(text_parts), tool_calls, usage, finish_reason # --------------------------------------------------------------------------- # Message conversion # --------------------------------------------------------------------------- def _to_anthropic_messages( messages: list[Message], ) -> tuple[str, list[dict[str, Any]]]: """Convert our messages to ``(system_text, [anthropic_message, ...])``. Anthropic requires ``system`` as a top-level field and structures tool calls as ``tool_use`` content blocks on the assistant turn, with ``tool_result`` blocks returned in the next user turn. """ system_parts: list[str] = [] out: list[dict[str, Any]] = [] pending_results: list[dict[str, Any]] = [] for m in messages: if m.role == Role.SYSTEM: system_parts.append(m.content) continue # Flush queued tool_results into a user turn before emitting any # non-tool message that follows. if m.role != Role.TOOL and pending_results: out.append({"role": "user", "content": pending_results}) pending_results = [] if m.role == Role.USER: out.append({"role": "user", "content": m.content}) elif m.role == Role.ASSISTANT: blocks: list[dict[str, Any]] = [] if m.content: blocks.append({"type": "text", "text": m.content}) for tc in m.tool_calls: blocks.append( { "type": "tool_use", "id": tc.id, "name": tc.tool, "input": tc.args, } ) out.append( {"role": "assistant", "content": blocks if blocks else m.content} ) elif m.role == Role.TOOL: pending_results.append( { "type": "tool_result", "tool_use_id": m.tool_call_id or "", "content": m.content, } ) if pending_results: out.append({"role": "user", "content": pending_results}) return "\n\n".join(system_parts), out def _to_anthropic_tool(t: ToolDef) -> dict[str, Any]: return { "name": t.name, "description": t.description, "input_schema": t.input_schema or {"type": "object", "properties": {}}, }