Coverage for agentos/llm/anthropic_provider.py: 17%
158 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Anthropic Claude Provider — 基于 httpx 直接调用 Anthropic Messages API。
3零额外依赖,不依赖 anthropic SDK。
4v1.3.36: 首个纯 httpx 实现,支持同步/异步/流式/Function Calling。
5"""
7from __future__ import annotations
9import json
10from typing import Any, Iterator
12import httpx
14from agentos.llm.base import (
15 CompletionChoice,
16 CompletionResult,
17 CompletionUsage,
18 LLMProvider,
19 Message,
20 MessageRole,
21 StreamChunk,
22 Tool,
23 ToolCall,
24)
26__all__ = ["AnthropicProvider"]
28ANTHROPIC_API_BASE = "https://api.anthropic.com"
29ANTHROPIC_VERSION = "2023-06-01"
31# USD per 1M tokens (Anthropic pricing as of 2026-07)
32_PRICING: dict[str, tuple[float, float]] = {
33 "claude-sonnet-5-20250630": (3.0, 15.0), # Sonnet 5 — 性价比最高的 Agent 模型
34 "claude-sonnet-5-20250701": (3.0, 15.0), # Sonnet 5 (alternate ID)
35 "claude-sonnet-4-20250514": (3.0, 15.0),
36 "claude-3-5-sonnet-20241022": (3.0, 15.0),
37 "claude-3-5-haiku-20241022": (0.80, 4.0),
38 "claude-3-opus-20240229": (15.0, 75.0),
39 "claude-3-haiku-20240307": (0.25, 1.25),
40 "claude-opus-4-20250514": (15.0, 75.0), # Opus 4
41 "claude-opus-4-5-20251101": (15.0, 75.0), # Opus 4.5
42}
44# 5-series models identified by prefix
45_SONNET5_PREFIXES = ("claude-sonnet-5", "claude-sonnet5", "sonnet-5")
46def _is_sonnet5(model: str) -> bool:
47 return any(model.startswith(p) for p in _SONNET5_PREFIXES) or "sonnet-5" in model
50def _messages_to_anthropic(messages: list[Message]) -> tuple[str | None, list[dict[str, Any]]]:
51 """将 Message 列表转换为 Anthropic Messages API 格式。
52 Returns (system_prompt, api_messages) — Anthropic 的 system 是顶层字段。
53 """
54 system_parts: list[str] = []
55 api_messages: list[dict[str, Any]] = []
57 for m in messages:
58 if m.role == MessageRole.SYSTEM:
59 system_parts.append(m.content)
60 continue
62 entry: dict[str, Any] = {"role": _ROLE_MAP[m.role], "content": m.content}
63 if m.tool_calls:
64 # 将 tool_calls 转成 Anthropic tool_use content blocks
65 content_blocks: list[dict[str, Any]] = []
66 if m.content:
67 content_blocks.append({"type": "text", "text": m.content})
68 for tc in m.tool_calls:
69 content_blocks.append({
70 "type": "tool_use",
71 "id": tc.id,
72 "name": tc.name,
73 "input": json.loads(tc.arguments),
74 })
75 entry["content"] = content_blocks
77 if m.role == MessageRole.TOOL and m.tool_call_id:
78 entry["content"] = [{
79 "type": "tool_result",
80 "tool_use_id": m.tool_call_id,
81 "content": m.content,
82 }]
83 del entry["role"]
84 api_messages.append(entry)
86 system_prompt = "\n".join(system_parts) if system_parts else None
87 return system_prompt, api_messages
90_ROLE_MAP: dict[MessageRole, str] = {
91 MessageRole.USER: "user",
92 MessageRole.ASSISTANT: "assistant",
93 MessageRole.TOOL: "user", # Anthropic 用 user 角色承载 tool_result
94}
97def _tools_to_anthropic(tools: list[Tool] | None) -> list[dict[str, Any]] | None:
98 if not tools:
99 return None
100 result: list[dict[str, Any]] = []
101 for t in tools:
102 fn = t.function
103 props = fn.parameters
104 result.append({
105 "name": fn.name,
106 "description": fn.description,
107 "input_schema": {
108 "type": "object",
109 "properties": {k: v.as_schema() for k, v in props.items()},
110 "required": fn.required or [k for k, v in props.items() if v.required],
111 },
112 })
113 return result
116def _parse_anthropic_tool_calls(content_blocks: list[dict[str, Any]]) -> list[ToolCall]:
117 """从 Anthropic content 中提取 tool_use blocks。"""
118 result: list[ToolCall] = []
119 for block in content_blocks:
120 if block.get("type") == "tool_use":
121 result.append(ToolCall(
122 id=block["id"],
123 name=block["name"],
124 arguments=json.dumps(block.get("input", {})),
125 ))
126 return result
129def _parse_text_content(content_blocks: list[dict[str, Any]]) -> str:
130 """提取 text content blocks 中的文本。"""
131 texts: list[str] = []
132 for block in content_blocks:
133 if block.get("type") == "text":
134 texts.append(block.get("text", ""))
135 return "".join(texts)
138def _build_result(data: dict[str, Any], model: str) -> CompletionResult:
139 content = data.get("content", [])
140 if isinstance(content, str):
141 text = content
142 tool_calls = None
143 else:
144 text = _parse_text_content(content)
145 tool_calls = _parse_anthropic_tool_calls(content)
146 if not tool_calls:
147 tool_calls = None
149 choice = CompletionChoice(
150 index=0,
151 message=Message(role=MessageRole.ASSISTANT, content=text, tool_calls=tool_calls),
152 finish_reason=data.get("stop_reason", "end_turn"),
153 )
154 usage_data = data.get("usage", {})
155 tokens = CompletionUsage(
156 prompt_tokens=usage_data.get("input_tokens", 0),
157 completion_tokens=usage_data.get("output_tokens", 0),
158 total_tokens=usage_data.get("input_tokens", 0) + usage_data.get("output_tokens", 0),
159 )
160 if model in _PRICING:
161 in_price, out_price = _PRICING[model]
162 tokens.cost_usd = round(
163 tokens.prompt_tokens / 1_000_000 * in_price
164 + tokens.completion_tokens / 1_000_000 * out_price,
165 6,
166 )
167 return CompletionResult(
168 id=data.get("id", ""), model=model, choices=[choice], usage=tokens,
169 )
172class AnthropicProvider(LLMProvider):
173 """Anthropic Claude Provider — 纯 httpx 实现,零 SDK 依赖。"""
175 def __init__(
176 self,
177 model: str = "claude-sonnet-4-20250514",
178 api_key: str = "",
179 base_url: str = "",
180 timeout: float = 120.0,
181 ):
182 super().__init__(
183 model=model, api_key=api_key,
184 base_url=base_url or ANTHROPIC_API_BASE,
185 )
186 self._timeout = timeout
188 @property
189 def provider_name(self) -> str:
190 return "anthropic"
192 def _headers(self) -> dict[str, str]:
193 return {
194 "x-api-key": self.api_key,
195 "anthropic-version": ANTHROPIC_VERSION,
196 "content-type": "application/json",
197 }
199 def _build_body(
200 self,
201 messages: list[Message],
202 temperature: float,
203 max_tokens: int,
204 top_p: float,
205 stop: list[str] | None,
206 tools: list[Tool] | None,
207 tool_choice: str,
208 ) -> dict[str, Any]:
209 system, api_messages = _messages_to_anthropic(messages)
210 body: dict[str, Any] = {
211 "model": self.model,
212 "messages": api_messages,
213 "max_tokens": max_tokens,
214 "temperature": temperature,
215 }
216 if system:
217 body["system"] = system
218 if top_p < 1.0:
219 body["top_p"] = top_p
220 if stop:
221 body["stop_sequences"] = stop
222 if tools:
223 body["tools"] = _tools_to_anthropic(tools)
224 if tool_choice == "any":
225 body["tool_choice"] = {"type": "any"}
226 elif tool_choice == "auto":
227 body["tool_choice"] = {"type": "auto"}
228 return body
230 def chat(
231 self,
232 messages: list[Message],
233 *,
234 temperature: float = 0.7,
235 max_tokens: int = 4096,
236 top_p: float = 1.0,
237 stop: list[str] | None = None,
238 tools: list[Tool] | None = None,
239 tool_choice: str = "auto",
240 **kwargs: Any,
241 ) -> CompletionResult:
242 url = f"{self.base_url}/v1/messages"
243 body = self._build_body(messages, temperature, max_tokens, top_p, stop, tools, tool_choice)
244 with httpx.Client(timeout=self._timeout) as client:
245 resp = client.post(url, headers=self._headers(), json=body)
246 resp.raise_for_status()
247 return _build_result(resp.json(), self.model)
249 async def achat(
250 self,
251 messages: list[Message],
252 *,
253 temperature: float = 0.7,
254 max_tokens: int = 4096,
255 top_p: float = 1.0,
256 stop: list[str] | None = None,
257 tools: list[Tool] | None = None,
258 tool_choice: str = "auto",
259 **kwargs: Any,
260 ) -> CompletionResult:
261 url = f"{self.base_url}/v1/messages"
262 body = self._build_body(messages, temperature, max_tokens, top_p, stop, tools, tool_choice)
263 async with httpx.AsyncClient(timeout=self._timeout) as client:
264 resp = await client.post(url, headers=self._headers(), json=body)
265 resp.raise_for_status()
266 return _build_result(resp.json(), self.model)
268 def stream(
269 self,
270 messages: list[Message],
271 *,
272 temperature: float = 0.7,
273 max_tokens: int = 4096,
274 tools: list[Tool] | None = None,
275 **kwargs: Any,
276 ) -> Iterator[StreamChunk]:
277 url = f"{self.base_url}/v1/messages"
278 body = self._build_body(messages, temperature, max_tokens, 1.0, None, tools, "auto")
279 body["stream"] = True
280 with httpx.Client(timeout=self._timeout) as client:
281 with client.stream("POST", url, headers=self._headers(), json=body) as resp:
282 resp.raise_for_status()
283 for line in resp.iter_lines():
284 if not line.startswith("data: "):
285 continue
286 data_str = line[6:]
287 if data_str == "[DONE]":
288 break
289 try:
290 event = json.loads(data_str)
291 except json.JSONDecodeError:
292 continue
293 if event.get("type") == "content_block_delta":
294 delta = event.get("delta", {})
295 text = delta.get("text", "")
296 if text:
297 yield StreamChunk(content=text)
298 elif event.get("type") == "message_stop":
299 yield StreamChunk(finish_reason="end_turn")
301 async def astream( # pyright: ignore[reportIncompatibleMethodOverride]
302 self,
303 messages: list[Message],
304 *,
305 temperature: float = 0.7,
306 max_tokens: int = 4096,
307 tools: list[Tool] | None = None,
308 **kwargs: Any,
309 ):
310 url = f"{self.base_url}/v1/messages"
311 body = self._build_body(messages, temperature, max_tokens, 1.0, None, tools, "auto")
312 body["stream"] = True
313 async with httpx.AsyncClient(timeout=self._timeout) as client:
314 async with client.stream("POST", url, headers=self._headers(), json=body) as resp:
315 resp.raise_for_status()
316 async for line in resp.aiter_lines():
317 if not line.startswith("data: "):
318 continue
319 data_str = line[6:]
320 if data_str == "[DONE]":
321 break
322 try:
323 event = json.loads(data_str)
324 except json.JSONDecodeError:
325 continue
326 if event.get("type") == "content_block_delta":
327 delta = event.get("delta", {})
328 text = delta.get("text", "")
329 if text:
330 yield StreamChunk(content=text)
331 elif event.get("type") == "message_stop":
332 yield StreamChunk(finish_reason="end_turn")