Coverage for agentos/channels/adapters/whatsapp.py: 0%

122 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2WhatsApp Channel Adapter — WhatsApp Business Cloud API. 

3 

4Meta Developer App → Phone Number ID + Access Token → webhook → ChannelMessage. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import hmac 

11import hashlib 

12from typing import Optional 

13 

14from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult 

15from agentos.channels.message import ChannelMessage, ChannelType, MessageType 

16 

17 

18class WhatsAppAdapter(BaseChannelAdapter): 

19 """WhatsApp Business Cloud API adapter. 

20 

21 Config fields: 

22 access_token: Meta permanent page access token 

23 phone_number_id: WhatsApp Business phone number ID 

24 verify_token: Webhook verify token (for Meta handshake) 

25 app_secret: Meta App secret (for payload signing, optional) 

26 business_id: WhatsApp Business Account ID 

27 """ 

28 

29 CHANNEL_TYPE = ChannelType.WHATSAPP 

30 API_BASE = "https://graph.facebook.com/v19.0" 

31 

32 def __init__(self, config: ChannelConfig): 

33 super().__init__(config) 

34 self._access_token = config.extra.get("access_token", "") 

35 self._phone_number_id = config.extra.get("phone_number_id", "") 

36 self._verify_token = config.extra.get("verify_token", "") 

37 self._app_secret = config.extra.get("app_secret", "") 

38 self._business_id = config.extra.get("business_id", "") 

39 

40 # ── Webhook verification ── 

41 

42 def verify_webhook(self, query_params: dict) -> tuple[bool, str]: 

43 """Handle Meta webhook verification challenge. 

44 

45 Returns (verified, challenge_token). 

46 """ 

47 mode = query_params.get("hub.mode", "") 

48 token = query_params.get("hub.verify_token", "") 

49 challenge = query_params.get("hub.challenge", "") 

50 

51 if mode == "subscribe" and token == self._verify_token: 

52 return True, challenge 

53 return False, "" 

54 

55 # ── Message parsing ── 

56 

57 async def parse_incoming(self, payload: dict) -> Optional[ChannelMessage]: 

58 """Parse WhatsApp webhook payload into ChannelMessage.""" 

59 entries = payload.get("entry", []) 

60 

61 for entry in entries: 

62 changes = entry.get("changes", []) 

63 for change in changes: 

64 value = change.get("value", {}) 

65 

66 # Messages 

67 messages = value.get("messages", []) 

68 for msg in messages: 

69 return self._parse_message(msg, value) 

70 

71 # Status updates 

72 statuses = value.get("statuses", []) 

73 for status in statuses: 

74 return ChannelMessage( 

75 channel_type=ChannelType.WHATSAPP, 

76 channel_id=status.get("recipient_id", ""), 

77 user_id=status.get("recipient_id", ""), 

78 content=f"Message status: {status.get('status', 'unknown')}", 

79 message_type=MessageType.SYSTEM, 

80 raw=status, 

81 metadata={"status": status.get("status")}, 

82 ) 

83 

84 return None 

85 

86 def _parse_message(self, msg: dict, value: dict) -> Optional[ChannelMessage]: 

87 """Parse a WhatsApp message object.""" 

88 msg_type = msg.get("type", "text") 

89 user_phone = msg.get("from", "") 

90 msg_id = msg.get("id", "") 

91 

92 content = "" 

93 mtype = MessageType.TEXT 

94 metadata = { 

95 "phone": user_phone, 

96 "display_name": value.get("contacts", [{}])[0].get("profile", {}).get("name", ""), 

97 } 

98 

99 if msg_type == "text": 

100 content = msg.get("text", {}).get("body", "") 

101 elif msg_type == "image": 

102 content = msg.get("image", {}).get("caption", "[Image]") 

103 mtype = MessageType.IMAGE 

104 metadata["media_id"] = msg.get("image", {}).get("id", "") 

105 elif msg_type == "audio": 

106 content = "[Voice message]" 

107 mtype = MessageType.VOICE 

108 metadata["media_id"] = msg.get("audio", {}).get("id", "") 

109 elif msg_type == "video": 

110 content = msg.get("video", {}).get("caption", "[Video]") 

111 mtype = MessageType.VIDEO 

112 elif msg_type == "document": 

113 content = msg.get("document", {}).get("caption", "[Document]") 

114 mtype = MessageType.FILE 

115 metadata["file_name"] = msg.get("document", {}).get("filename", "") 

116 elif msg_type == "location": 

117 loc = msg.get("location", {}) 

118 content = f"[Location: {loc.get('latitude')}, {loc.get('longitude')}]" 

119 mtype = MessageType.LOCATION 

120 elif msg_type == "button": 

121 content = msg.get("button", {}).get("text", "") 

122 mtype = MessageType.INTERACTIVE 

123 elif msg_type == "interactive": 

124 interactive = msg.get("interactive", {}) 

125 if interactive.get("type") == "button_reply": 

126 content = interactive.get("button_reply", {}).get("id", "") 

127 else: 

128 content = interactive.get("list_reply", {}).get("id", "") 

129 mtype = MessageType.INTERACTIVE 

130 else: 

131 content = f"[{msg_type}]" 

132 

133 return ChannelMessage( 

134 channel_type=ChannelType.WHATSAPP, 

135 channel_id=user_phone, 

136 user_id=user_phone, 

137 content=content, 

138 message_type=mtype, 

139 raw=msg, 

140 reply_token=msg_id, 

141 metadata=metadata, 

142 ) 

143 

144 # ── Reply ── 

145 

146 async def reply(self, channel_id: str, content: str, **kwargs) -> ReplyResult: 

147 """Send a text message via WhatsApp Cloud API.""" 

148 return await self._send_msg(channel_id, { 

149 "type": "text", 

150 "text": {"body": content, "preview_url": False}, 

151 }) 

152 

153 async def reply_template( 

154 self, channel_id: str, template_name: str, 

155 language_code: str = "en", components: list = None, **kwargs, 

156 ) -> ReplyResult: 

157 """Send a WhatsApp message template.""" 

158 body = { 

159 "type": "template", 

160 "template": { 

161 "name": template_name, 

162 "language": {"code": language_code}, 

163 }, 

164 } 

165 if components: 

166 body["template"]["components"] = components 

167 return await self._send_msg(channel_id, body) 

168 

169 async def reply_interactive( 

170 self, channel_id: str, body_text: str, 

171 buttons: list[dict], **kwargs, 

172 ) -> ReplyResult: 

173 """Send an interactive message with reply buttons. 

174 

175 buttons = [{"id": "yes", "title": "Yes"}, ...] 

176 """ 

177 button_list = [ 

178 {"type": "reply", "reply": {"id": b["id"], "title": b["title"]}} 

179 for b in buttons[:3] 

180 ] 

181 return await self._send_msg(channel_id, { 

182 "type": "interactive", 

183 "interactive": { 

184 "type": "button", 

185 "body": {"text": body_text}, 

186 "action": {"buttons": button_list}, 

187 }, 

188 }) 

189 

190 async def reply_image( 

191 self, channel_id: str, image_url: str, caption: str = "", **kwargs 

192 ) -> ReplyResult: 

193 """Send an image.""" 

194 return await self._send_msg(channel_id, { 

195 "type": "image", 

196 "image": {"link": image_url, "caption": caption}, 

197 }) 

198 

199 # ── API Helper ── 

200 

201 async def _send_msg(self, to: str, msg_data: dict) -> ReplyResult: 

202 """Send message via WhatsApp Cloud API.""" 

203 url = f"{self.API_BASE}/{self._phone_number_id}/messages" 

204 headers = { 

205 "Authorization": f"Bearer {self._access_token}", 

206 "Content-Type": "application/json", 

207 } 

208 body = { 

209 "messaging_product": "whatsapp", 

210 "recipient_type": "individual", 

211 "to": to, 

212 **msg_data, 

213 } 

214 

215 try: 

216 import aiohttp 

217 async with aiohttp.ClientSession() as session: 

218 async with session.post(url, headers=headers, json=body) as resp: 

219 data = await resp.json() 

220 wa_id = data.get("messages", [{}])[0].get("id", "") 

221 if wa_id: 

222 return ReplyResult(success=True, message_id=wa_id) 

223 return ReplyResult( 

224 success=False, 

225 error=data.get("error", {}).get("message", "unknown"), 

226 ) 

227 except ImportError: 

228 import urllib.request 

229 req = urllib.request.Request( 

230 url, data=json.dumps(body).encode(), headers=headers 

231 ) 

232 with urllib.request.urlopen(req) as resp: 

233 data = json.loads(resp.read()) 

234 wa_id = data.get("messages", [{}])[0].get("id", "") 

235 return ReplyResult(success=True, message_id=wa_id) 

236 

237 async def mark_as_read(self, message_id: str) -> bool: 

238 """Mark a message as read.""" 

239 url = f"{self.API_BASE}/{self._phone_number_id}/messages" 

240 headers = { 

241 "Authorization": f"Bearer {self._access_token}", 

242 "Content-Type": "application/json", 

243 } 

244 body = { 

245 "messaging_product": "whatsapp", 

246 "status": "read", 

247 "message_id": message_id, 

248 } 

249 try: 

250 import aiohttp 

251 async with aiohttp.ClientSession() as session: 

252 async with session.post(url, headers=headers, json=body) as resp: 

253 return resp.status == 200 

254 except ImportError: 

255 import urllib.request 

256 req = urllib.request.Request( 

257 url, data=json.dumps(body).encode(), headers=headers 

258 ) 

259 with urllib.request.urlopen(req) as resp: 

260 return resp.status == 200