Coverage for agentos/llm/anthropic_provider.py: 17%

158 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Anthropic Claude Provider — 基于 httpx 直接调用 Anthropic Messages API。 

3零额外依赖,不依赖 anthropic SDK。 

4v1.3.36: 首个纯 httpx 实现,支持同步/异步/流式/Function Calling。 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10from typing import Any, Iterator 

11 

12import httpx 

13 

14from agentos.llm.base import ( 

15 CompletionChoice, 

16 CompletionResult, 

17 CompletionUsage, 

18 LLMProvider, 

19 Message, 

20 MessageRole, 

21 StreamChunk, 

22 Tool, 

23 ToolCall, 

24) 

25 

26__all__ = ["AnthropicProvider"] 

27 

28ANTHROPIC_API_BASE = "https://api.anthropic.com" 

29ANTHROPIC_VERSION = "2023-06-01" 

30 

31# USD per 1M tokens (Anthropic pricing as of 2026-07) 

32_PRICING: dict[str, tuple[float, float]] = { 

33 "claude-sonnet-5-20250630": (3.0, 15.0), # Sonnet 5 — 性价比最高的 Agent 模型 

34 "claude-sonnet-5-20250701": (3.0, 15.0), # Sonnet 5 (alternate ID) 

35 "claude-sonnet-4-20250514": (3.0, 15.0), 

36 "claude-3-5-sonnet-20241022": (3.0, 15.0), 

37 "claude-3-5-haiku-20241022": (0.80, 4.0), 

38 "claude-3-opus-20240229": (15.0, 75.0), 

39 "claude-3-haiku-20240307": (0.25, 1.25), 

40 "claude-opus-4-20250514": (15.0, 75.0), # Opus 4 

41 "claude-opus-4-5-20251101": (15.0, 75.0), # Opus 4.5 

42} 

43 

44# 5-series models identified by prefix 

45_SONNET5_PREFIXES = ("claude-sonnet-5", "claude-sonnet5", "sonnet-5") 

46def _is_sonnet5(model: str) -> bool: 

47 return any(model.startswith(p) for p in _SONNET5_PREFIXES) or "sonnet-5" in model 

48 

49 

50def _messages_to_anthropic(messages: list[Message]) -> tuple[str | None, list[dict[str, Any]]]: 

51 """将 Message 列表转换为 Anthropic Messages API 格式。 

52 Returns (system_prompt, api_messages) — Anthropic 的 system 是顶层字段。 

53 """ 

54 system_parts: list[str] = [] 

55 api_messages: list[dict[str, Any]] = [] 

56 

57 for m in messages: 

58 if m.role == MessageRole.SYSTEM: 

59 system_parts.append(m.content) 

60 continue 

61 

62 entry: dict[str, Any] = {"role": _ROLE_MAP[m.role], "content": m.content} 

63 if m.tool_calls: 

64 # 将 tool_calls 转成 Anthropic tool_use content blocks 

65 content_blocks: list[dict[str, Any]] = [] 

66 if m.content: 

67 content_blocks.append({"type": "text", "text": m.content}) 

68 for tc in m.tool_calls: 

69 content_blocks.append({ 

70 "type": "tool_use", 

71 "id": tc.id, 

72 "name": tc.name, 

73 "input": json.loads(tc.arguments), 

74 }) 

75 entry["content"] = content_blocks 

76 

77 if m.role == MessageRole.TOOL and m.tool_call_id: 

78 entry["content"] = [{ 

79 "type": "tool_result", 

80 "tool_use_id": m.tool_call_id, 

81 "content": m.content, 

82 }] 

83 del entry["role"] 

84 api_messages.append(entry) 

85 

86 system_prompt = "\n".join(system_parts) if system_parts else None 

87 return system_prompt, api_messages 

88 

89 

90_ROLE_MAP: dict[MessageRole, str] = { 

91 MessageRole.USER: "user", 

92 MessageRole.ASSISTANT: "assistant", 

93 MessageRole.TOOL: "user", # Anthropic 用 user 角色承载 tool_result 

94} 

95 

96 

97def _tools_to_anthropic(tools: list[Tool] | None) -> list[dict[str, Any]] | None: 

98 if not tools: 

99 return None 

100 result: list[dict[str, Any]] = [] 

101 for t in tools: 

102 fn = t.function 

103 props = fn.parameters 

104 result.append({ 

105 "name": fn.name, 

106 "description": fn.description, 

107 "input_schema": { 

108 "type": "object", 

109 "properties": {k: v.as_schema() for k, v in props.items()}, 

110 "required": fn.required or [k for k, v in props.items() if v.required], 

111 }, 

112 }) 

113 return result 

114 

115 

116def _parse_anthropic_tool_calls(content_blocks: list[dict[str, Any]]) -> list[ToolCall]: 

117 """从 Anthropic content 中提取 tool_use blocks。""" 

118 result: list[ToolCall] = [] 

119 for block in content_blocks: 

120 if block.get("type") == "tool_use": 

121 result.append(ToolCall( 

122 id=block["id"], 

123 name=block["name"], 

124 arguments=json.dumps(block.get("input", {})), 

125 )) 

126 return result 

127 

128 

129def _parse_text_content(content_blocks: list[dict[str, Any]]) -> str: 

130 """提取 text content blocks 中的文本。""" 

131 texts: list[str] = [] 

132 for block in content_blocks: 

133 if block.get("type") == "text": 

134 texts.append(block.get("text", "")) 

135 return "".join(texts) 

136 

137 

138def _build_result(data: dict[str, Any], model: str) -> CompletionResult: 

139 content = data.get("content", []) 

140 if isinstance(content, str): 

141 text = content 

142 tool_calls = None 

143 else: 

144 text = _parse_text_content(content) 

145 tool_calls = _parse_anthropic_tool_calls(content) 

146 if not tool_calls: 

147 tool_calls = None 

148 

149 choice = CompletionChoice( 

150 index=0, 

151 message=Message(role=MessageRole.ASSISTANT, content=text, tool_calls=tool_calls), 

152 finish_reason=data.get("stop_reason", "end_turn"), 

153 ) 

154 usage_data = data.get("usage", {}) 

155 tokens = CompletionUsage( 

156 prompt_tokens=usage_data.get("input_tokens", 0), 

157 completion_tokens=usage_data.get("output_tokens", 0), 

158 total_tokens=usage_data.get("input_tokens", 0) + usage_data.get("output_tokens", 0), 

159 ) 

160 if model in _PRICING: 

161 in_price, out_price = _PRICING[model] 

162 tokens.cost_usd = round( 

163 tokens.prompt_tokens / 1_000_000 * in_price 

164 + tokens.completion_tokens / 1_000_000 * out_price, 

165 6, 

166 ) 

167 return CompletionResult( 

168 id=data.get("id", ""), model=model, choices=[choice], usage=tokens, 

169 ) 

170 

171 

172class AnthropicProvider(LLMProvider): 

173 """Anthropic Claude Provider — 纯 httpx 实现,零 SDK 依赖。""" 

174 

175 def __init__( 

176 self, 

177 model: str = "claude-sonnet-4-20250514", 

178 api_key: str = "", 

179 base_url: str = "", 

180 timeout: float = 120.0, 

181 ): 

182 super().__init__( 

183 model=model, api_key=api_key, 

184 base_url=base_url or ANTHROPIC_API_BASE, 

185 ) 

186 self._timeout = timeout 

187 

188 @property 

189 def provider_name(self) -> str: 

190 return "anthropic" 

191 

192 def _headers(self) -> dict[str, str]: 

193 return { 

194 "x-api-key": self.api_key, 

195 "anthropic-version": ANTHROPIC_VERSION, 

196 "content-type": "application/json", 

197 } 

198 

199 def _build_body( 

200 self, 

201 messages: list[Message], 

202 temperature: float, 

203 max_tokens: int, 

204 top_p: float, 

205 stop: list[str] | None, 

206 tools: list[Tool] | None, 

207 tool_choice: str, 

208 ) -> dict[str, Any]: 

209 system, api_messages = _messages_to_anthropic(messages) 

210 body: dict[str, Any] = { 

211 "model": self.model, 

212 "messages": api_messages, 

213 "max_tokens": max_tokens, 

214 "temperature": temperature, 

215 } 

216 if system: 

217 body["system"] = system 

218 if top_p < 1.0: 

219 body["top_p"] = top_p 

220 if stop: 

221 body["stop_sequences"] = stop 

222 if tools: 

223 body["tools"] = _tools_to_anthropic(tools) 

224 if tool_choice == "any": 

225 body["tool_choice"] = {"type": "any"} 

226 elif tool_choice == "auto": 

227 body["tool_choice"] = {"type": "auto"} 

228 return body 

229 

230 def chat( 

231 self, 

232 messages: list[Message], 

233 *, 

234 temperature: float = 0.7, 

235 max_tokens: int = 4096, 

236 top_p: float = 1.0, 

237 stop: list[str] | None = None, 

238 tools: list[Tool] | None = None, 

239 tool_choice: str = "auto", 

240 **kwargs: Any, 

241 ) -> CompletionResult: 

242 url = f"{self.base_url}/v1/messages" 

243 body = self._build_body(messages, temperature, max_tokens, top_p, stop, tools, tool_choice) 

244 with httpx.Client(timeout=self._timeout) as client: 

245 resp = client.post(url, headers=self._headers(), json=body) 

246 resp.raise_for_status() 

247 return _build_result(resp.json(), self.model) 

248 

249 async def achat( 

250 self, 

251 messages: list[Message], 

252 *, 

253 temperature: float = 0.7, 

254 max_tokens: int = 4096, 

255 top_p: float = 1.0, 

256 stop: list[str] | None = None, 

257 tools: list[Tool] | None = None, 

258 tool_choice: str = "auto", 

259 **kwargs: Any, 

260 ) -> CompletionResult: 

261 url = f"{self.base_url}/v1/messages" 

262 body = self._build_body(messages, temperature, max_tokens, top_p, stop, tools, tool_choice) 

263 async with httpx.AsyncClient(timeout=self._timeout) as client: 

264 resp = await client.post(url, headers=self._headers(), json=body) 

265 resp.raise_for_status() 

266 return _build_result(resp.json(), self.model) 

267 

268 def stream( 

269 self, 

270 messages: list[Message], 

271 *, 

272 temperature: float = 0.7, 

273 max_tokens: int = 4096, 

274 tools: list[Tool] | None = None, 

275 **kwargs: Any, 

276 ) -> Iterator[StreamChunk]: 

277 url = f"{self.base_url}/v1/messages" 

278 body = self._build_body(messages, temperature, max_tokens, 1.0, None, tools, "auto") 

279 body["stream"] = True 

280 with httpx.Client(timeout=self._timeout) as client: 

281 with client.stream("POST", url, headers=self._headers(), json=body) as resp: 

282 resp.raise_for_status() 

283 for line in resp.iter_lines(): 

284 if not line.startswith("data: "): 

285 continue 

286 data_str = line[6:] 

287 if data_str == "[DONE]": 

288 break 

289 try: 

290 event = json.loads(data_str) 

291 except json.JSONDecodeError: 

292 continue 

293 if event.get("type") == "content_block_delta": 

294 delta = event.get("delta", {}) 

295 text = delta.get("text", "") 

296 if text: 

297 yield StreamChunk(content=text) 

298 elif event.get("type") == "message_stop": 

299 yield StreamChunk(finish_reason="end_turn") 

300 

301 async def astream( # pyright: ignore[reportIncompatibleMethodOverride] 

302 self, 

303 messages: list[Message], 

304 *, 

305 temperature: float = 0.7, 

306 max_tokens: int = 4096, 

307 tools: list[Tool] | None = None, 

308 **kwargs: Any, 

309 ): 

310 url = f"{self.base_url}/v1/messages" 

311 body = self._build_body(messages, temperature, max_tokens, 1.0, None, tools, "auto") 

312 body["stream"] = True 

313 async with httpx.AsyncClient(timeout=self._timeout) as client: 

314 async with client.stream("POST", url, headers=self._headers(), json=body) as resp: 

315 resp.raise_for_status() 

316 async for line in resp.aiter_lines(): 

317 if not line.startswith("data: "): 

318 continue 

319 data_str = line[6:] 

320 if data_str == "[DONE]": 

321 break 

322 try: 

323 event = json.loads(data_str) 

324 except json.JSONDecodeError: 

325 continue 

326 if event.get("type") == "content_block_delta": 

327 delta = event.get("delta", {}) 

328 text = delta.get("text", "") 

329 if text: 

330 yield StreamChunk(content=text) 

331 elif event.get("type") == "message_stop": 

332 yield StreamChunk(finish_reason="end_turn")