Coverage for agentos/models/backends/anthropic.py: 26%

100 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1"""Anthropic Claude backend for AgentOS. 

2 

3Supports Claude 3.5 Sonnet, Claude 3 Opus, Claude 3 Haiku. 

4Uses Anthropic Messages API. 

5""" 

6 

7from dataclasses import dataclass, field 

8from typing import Any, AsyncIterator, Dict, List, Optional 

9 

10import os 

11 

12 

13@dataclass 

14class ClaudeConfig: 

15 """Configuration for Anthropic Claude backend.""" 

16 api_key: str = field(default_factory=lambda: os.environ.get("ANTHROPIC_API_KEY", "")) 

17 base_url: str = "https://api.anthropic.com/v1" 

18 model: str = "claude-sonnet-4-20250514" 

19 temperature: float = 0.7 

20 max_tokens: int = 4096 

21 top_p: float = 1.0 

22 top_k: int = -1 

23 timeout: int = 90 

24 max_retries: int = 3 

25 anthropic_version: str = "2023-06-01" 

26 

27 

28class ClaudeClient: 

29 """Anthropic Claude LLM client. 

30 

31 Supports: 

32 - Claude Opus 4 / Claude Sonnet 4 / Claude Haiku 3.5 

33 - Streaming and non-streaming 

34 - Tool use (function calling) 

35 - System prompts 

36 """ 

37 

38 def __init__(self, config: Optional[ClaudeConfig] = None): 

39 self.config = config or ClaudeConfig() 

40 

41 @property 

42 def _headers(self) -> Dict[str, str]: 

43 return { 

44 "x-api-key": self.config.api_key, 

45 "anthropic-version": self.config.anthropic_version, 

46 "content-type": "application/json", 

47 } 

48 

49 @property 

50 def _messages_url(self) -> str: 

51 return f"{self.config.base_url}/messages" 

52 

53 def _build_payload( 

54 self, 

55 messages: List[Dict[str, str]], 

56 system: Optional[str] = None, 

57 **kwargs, 

58 ) -> Dict[str, Any]: 

59 payload: Dict[str, Any] = { 

60 "model": self.config.model, 

61 "max_tokens": kwargs.get("max_tokens", self.config.max_tokens), 

62 "messages": messages, 

63 } 

64 

65 if system: 

66 payload["system"] = system 

67 

68 temp = kwargs.get("temperature", self.config.temperature) 

69 if temp > 0: 

70 payload["temperature"] = temp 

71 

72 if kwargs.get("top_p", self.config.top_p) < 1.0: 

73 payload["top_p"] = kwargs.get("top_p", self.config.top_p) 

74 

75 if self.config.top_k > 0: 

76 payload["top_k"] = self.config.top_k 

77 

78 if kwargs.get("tools"): 

79 payload["tools"] = kwargs["tools"] 

80 

81 return payload 

82 

83 async def _async_request(self, payload: Dict) -> Dict: 

84 import httpx 

85 

86 timeout = httpx.Timeout(self.config.timeout) 

87 for attempt in range(self.config.max_retries): 

88 try: 

89 async with httpx.AsyncClient() as client: 

90 resp = await client.post( 

91 self._messages_url, 

92 json=payload, 

93 headers=self._headers, 

94 timeout=timeout, 

95 ) 

96 resp.raise_for_status() 

97 return resp.json() 

98 except httpx.HTTPStatusError as e: 

99 if attempt == self.config.max_retries - 1: 

100 raise 

101 if e.response.status_code >= 500 or e.response.status_code == 429: 

102 import asyncio 

103 await asyncio.sleep(2 ** attempt) 

104 continue 

105 raise 

106 

107 async def chat( 

108 self, 

109 messages: List[Dict[str, str]], 

110 system: Optional[str] = None, 

111 **kwargs, 

112 ) -> Dict[str, Any]: 

113 """Send a chat completion request. 

114 

115 Messages format: [{"role": "user", "content": "..."}, ...] 

116 Return format: {"content": str, "role": str, "usage": dict, "model": str} 

117 """ 

118 payload = self._build_payload(messages, system, **kwargs) 

119 result = await self._async_request(payload) 

120 

121 content_blocks = result.get("content", []) 

122 text_content = "" 

123 tool_calls = [] 

124 

125 for block in content_blocks: 

126 if block.get("type") == "text": 

127 text_content += block.get("text", "") 

128 elif block.get("type") == "tool_use": 

129 tool_calls.append({ 

130 "id": block.get("id", ""), 

131 "name": block.get("name", ""), 

132 "arguments": block.get("input", {}), 

133 }) 

134 

135 usage = result.get("usage", {}) 

136 return { 

137 "content": text_content, 

138 "role": "assistant", 

139 "usage": { 

140 "input_tokens": usage.get("input_tokens", 0), 

141 "output_tokens": usage.get("output_tokens", 0), 

142 }, 

143 "model": result.get("model", self.config.model), 

144 "stop_reason": result.get("stop_reason", ""), 

145 "tool_calls": tool_calls, 

146 } 

147 

148 async def chat_stream( 

149 self, 

150 messages: List[Dict[str, str]], 

151 system: Optional[str] = None, 

152 **kwargs, 

153 ) -> AsyncIterator[Dict[str, Any]]: 

154 """Stream chat completion events.""" 

155 payload = self._build_payload(messages, system, **kwargs) 

156 payload["stream"] = True 

157 

158 import httpx 

159 

160 timeout = httpx.Timeout(self.config.timeout * 2) 

161 async with httpx.AsyncClient() as client: 

162 async with client.stream( 

163 "POST", 

164 self._messages_url, 

165 json=payload, 

166 headers=self._headers, 

167 timeout=timeout, 

168 ) as response: 

169 response.raise_for_status() 

170 async for line in response.aiter_lines(): 

171 if line.startswith("data: "): 

172 data = line[6:].strip() 

173 import json 

174 try: 

175 event = json.loads(data) 

176 event_type = event.get("type", "") 

177 if event_type == "content_block_delta": 

178 delta = event.get("delta", {}) 

179 yield { 

180 "delta": delta.get("text", ""), 

181 "type": delta.get("type", "text"), 

182 } 

183 elif event_type == "message_stop": 

184 yield {"delta": "", "finish_reason": "stop"} 

185 break 

186 elif event_type == "error": 

187 yield {"error": event.get("error", {}).get("message", "")} 

188 break 

189 except json.JSONDecodeError: 

190 continue 

191 

192 def sync_chat( 

193 self, 

194 messages: List[Dict[str, str]], 

195 system: Optional[str] = None, 

196 **kwargs, 

197 ) -> Dict[str, Any]: 

198 """Synchronous chat completion.""" 

199 import asyncio 

200 

201 loop = asyncio.new_event_loop() 

202 try: 

203 return loop.run_until_complete(self.chat(messages, system, **kwargs)) 

204 finally: 

205 loop.close()