Coverage for agentos/models/backends/anthropic.py: 26%
100 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""Anthropic Claude backend for AgentOS.
3Supports Claude 3.5 Sonnet, Claude 3 Opus, Claude 3 Haiku.
4Uses Anthropic Messages API.
5"""
7from dataclasses import dataclass, field
8from typing import Any, AsyncIterator, Dict, List, Optional
10import os
13@dataclass
14class ClaudeConfig:
15 """Configuration for Anthropic Claude backend."""
16 api_key: str = field(default_factory=lambda: os.environ.get("ANTHROPIC_API_KEY", ""))
17 base_url: str = "https://api.anthropic.com/v1"
18 model: str = "claude-sonnet-4-20250514"
19 temperature: float = 0.7
20 max_tokens: int = 4096
21 top_p: float = 1.0
22 top_k: int = -1
23 timeout: int = 90
24 max_retries: int = 3
25 anthropic_version: str = "2023-06-01"
28class ClaudeClient:
29 """Anthropic Claude LLM client.
31 Supports:
32 - Claude Opus 4 / Claude Sonnet 4 / Claude Haiku 3.5
33 - Streaming and non-streaming
34 - Tool use (function calling)
35 - System prompts
36 """
38 def __init__(self, config: Optional[ClaudeConfig] = None):
39 self.config = config or ClaudeConfig()
41 @property
42 def _headers(self) -> Dict[str, str]:
43 return {
44 "x-api-key": self.config.api_key,
45 "anthropic-version": self.config.anthropic_version,
46 "content-type": "application/json",
47 }
49 @property
50 def _messages_url(self) -> str:
51 return f"{self.config.base_url}/messages"
53 def _build_payload(
54 self,
55 messages: List[Dict[str, str]],
56 system: Optional[str] = None,
57 **kwargs,
58 ) -> Dict[str, Any]:
59 payload: Dict[str, Any] = {
60 "model": self.config.model,
61 "max_tokens": kwargs.get("max_tokens", self.config.max_tokens),
62 "messages": messages,
63 }
65 if system:
66 payload["system"] = system
68 temp = kwargs.get("temperature", self.config.temperature)
69 if temp > 0:
70 payload["temperature"] = temp
72 if kwargs.get("top_p", self.config.top_p) < 1.0:
73 payload["top_p"] = kwargs.get("top_p", self.config.top_p)
75 if self.config.top_k > 0:
76 payload["top_k"] = self.config.top_k
78 if kwargs.get("tools"):
79 payload["tools"] = kwargs["tools"]
81 return payload
83 async def _async_request(self, payload: Dict) -> Dict:
84 import httpx
86 timeout = httpx.Timeout(self.config.timeout)
87 for attempt in range(self.config.max_retries):
88 try:
89 async with httpx.AsyncClient() as client:
90 resp = await client.post(
91 self._messages_url,
92 json=payload,
93 headers=self._headers,
94 timeout=timeout,
95 )
96 resp.raise_for_status()
97 return resp.json()
98 except httpx.HTTPStatusError as e:
99 if attempt == self.config.max_retries - 1:
100 raise
101 if e.response.status_code >= 500 or e.response.status_code == 429:
102 import asyncio
103 await asyncio.sleep(2 ** attempt)
104 continue
105 raise
107 async def chat(
108 self,
109 messages: List[Dict[str, str]],
110 system: Optional[str] = None,
111 **kwargs,
112 ) -> Dict[str, Any]:
113 """Send a chat completion request.
115 Messages format: [{"role": "user", "content": "..."}, ...]
116 Return format: {"content": str, "role": str, "usage": dict, "model": str}
117 """
118 payload = self._build_payload(messages, system, **kwargs)
119 result = await self._async_request(payload)
121 content_blocks = result.get("content", [])
122 text_content = ""
123 tool_calls = []
125 for block in content_blocks:
126 if block.get("type") == "text":
127 text_content += block.get("text", "")
128 elif block.get("type") == "tool_use":
129 tool_calls.append({
130 "id": block.get("id", ""),
131 "name": block.get("name", ""),
132 "arguments": block.get("input", {}),
133 })
135 usage = result.get("usage", {})
136 return {
137 "content": text_content,
138 "role": "assistant",
139 "usage": {
140 "input_tokens": usage.get("input_tokens", 0),
141 "output_tokens": usage.get("output_tokens", 0),
142 },
143 "model": result.get("model", self.config.model),
144 "stop_reason": result.get("stop_reason", ""),
145 "tool_calls": tool_calls,
146 }
148 async def chat_stream(
149 self,
150 messages: List[Dict[str, str]],
151 system: Optional[str] = None,
152 **kwargs,
153 ) -> AsyncIterator[Dict[str, Any]]:
154 """Stream chat completion events."""
155 payload = self._build_payload(messages, system, **kwargs)
156 payload["stream"] = True
158 import httpx
160 timeout = httpx.Timeout(self.config.timeout * 2)
161 async with httpx.AsyncClient() as client:
162 async with client.stream(
163 "POST",
164 self._messages_url,
165 json=payload,
166 headers=self._headers,
167 timeout=timeout,
168 ) as response:
169 response.raise_for_status()
170 async for line in response.aiter_lines():
171 if line.startswith("data: "):
172 data = line[6:].strip()
173 import json
174 try:
175 event = json.loads(data)
176 event_type = event.get("type", "")
177 if event_type == "content_block_delta":
178 delta = event.get("delta", {})
179 yield {
180 "delta": delta.get("text", ""),
181 "type": delta.get("type", "text"),
182 }
183 elif event_type == "message_stop":
184 yield {"delta": "", "finish_reason": "stop"}
185 break
186 elif event_type == "error":
187 yield {"error": event.get("error", {}).get("message", "")}
188 break
189 except json.JSONDecodeError:
190 continue
192 def sync_chat(
193 self,
194 messages: List[Dict[str, str]],
195 system: Optional[str] = None,
196 **kwargs,
197 ) -> Dict[str, Any]:
198 """Synchronous chat completion."""
199 import asyncio
201 loop = asyncio.new_event_loop()
202 try:
203 return loop.run_until_complete(self.chat(messages, system, **kwargs))
204 finally:
205 loop.close()