Coverage for agentos/channels/adapters/whatsapp.py: 0%
122 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2WhatsApp Channel Adapter — WhatsApp Business Cloud API.
4Meta Developer App → Phone Number ID + Access Token → webhook → ChannelMessage.
5"""
7from __future__ import annotations
9import json
10import hmac
11import hashlib
12from typing import Optional
14from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult
15from agentos.channels.message import ChannelMessage, ChannelType, MessageType
18class WhatsAppAdapter(BaseChannelAdapter):
19 """WhatsApp Business Cloud API adapter.
21 Config fields:
22 access_token: Meta permanent page access token
23 phone_number_id: WhatsApp Business phone number ID
24 verify_token: Webhook verify token (for Meta handshake)
25 app_secret: Meta App secret (for payload signing, optional)
26 business_id: WhatsApp Business Account ID
27 """
29 CHANNEL_TYPE = ChannelType.WHATSAPP
30 API_BASE = "https://graph.facebook.com/v19.0"
32 def __init__(self, config: ChannelConfig):
33 super().__init__(config)
34 self._access_token = config.extra.get("access_token", "")
35 self._phone_number_id = config.extra.get("phone_number_id", "")
36 self._verify_token = config.extra.get("verify_token", "")
37 self._app_secret = config.extra.get("app_secret", "")
38 self._business_id = config.extra.get("business_id", "")
40 # ── Webhook verification ──
42 def verify_webhook(self, query_params: dict) -> tuple[bool, str]:
43 """Handle Meta webhook verification challenge.
45 Returns (verified, challenge_token).
46 """
47 mode = query_params.get("hub.mode", "")
48 token = query_params.get("hub.verify_token", "")
49 challenge = query_params.get("hub.challenge", "")
51 if mode == "subscribe" and token == self._verify_token:
52 return True, challenge
53 return False, ""
55 # ── Message parsing ──
57 async def parse_incoming(self, payload: dict) -> Optional[ChannelMessage]:
58 """Parse WhatsApp webhook payload into ChannelMessage."""
59 entries = payload.get("entry", [])
61 for entry in entries:
62 changes = entry.get("changes", [])
63 for change in changes:
64 value = change.get("value", {})
66 # Messages
67 messages = value.get("messages", [])
68 for msg in messages:
69 return self._parse_message(msg, value)
71 # Status updates
72 statuses = value.get("statuses", [])
73 for status in statuses:
74 return ChannelMessage(
75 channel_type=ChannelType.WHATSAPP,
76 channel_id=status.get("recipient_id", ""),
77 user_id=status.get("recipient_id", ""),
78 content=f"Message status: {status.get('status', 'unknown')}",
79 message_type=MessageType.SYSTEM,
80 raw=status,
81 metadata={"status": status.get("status")},
82 )
84 return None
86 def _parse_message(self, msg: dict, value: dict) -> Optional[ChannelMessage]:
87 """Parse a WhatsApp message object."""
88 msg_type = msg.get("type", "text")
89 user_phone = msg.get("from", "")
90 msg_id = msg.get("id", "")
92 content = ""
93 mtype = MessageType.TEXT
94 metadata = {
95 "phone": user_phone,
96 "display_name": value.get("contacts", [{}])[0].get("profile", {}).get("name", ""),
97 }
99 if msg_type == "text":
100 content = msg.get("text", {}).get("body", "")
101 elif msg_type == "image":
102 content = msg.get("image", {}).get("caption", "[Image]")
103 mtype = MessageType.IMAGE
104 metadata["media_id"] = msg.get("image", {}).get("id", "")
105 elif msg_type == "audio":
106 content = "[Voice message]"
107 mtype = MessageType.VOICE
108 metadata["media_id"] = msg.get("audio", {}).get("id", "")
109 elif msg_type == "video":
110 content = msg.get("video", {}).get("caption", "[Video]")
111 mtype = MessageType.VIDEO
112 elif msg_type == "document":
113 content = msg.get("document", {}).get("caption", "[Document]")
114 mtype = MessageType.FILE
115 metadata["file_name"] = msg.get("document", {}).get("filename", "")
116 elif msg_type == "location":
117 loc = msg.get("location", {})
118 content = f"[Location: {loc.get('latitude')}, {loc.get('longitude')}]"
119 mtype = MessageType.LOCATION
120 elif msg_type == "button":
121 content = msg.get("button", {}).get("text", "")
122 mtype = MessageType.INTERACTIVE
123 elif msg_type == "interactive":
124 interactive = msg.get("interactive", {})
125 if interactive.get("type") == "button_reply":
126 content = interactive.get("button_reply", {}).get("id", "")
127 else:
128 content = interactive.get("list_reply", {}).get("id", "")
129 mtype = MessageType.INTERACTIVE
130 else:
131 content = f"[{msg_type}]"
133 return ChannelMessage(
134 channel_type=ChannelType.WHATSAPP,
135 channel_id=user_phone,
136 user_id=user_phone,
137 content=content,
138 message_type=mtype,
139 raw=msg,
140 reply_token=msg_id,
141 metadata=metadata,
142 )
144 # ── Reply ──
146 async def reply(self, channel_id: str, content: str, **kwargs) -> ReplyResult:
147 """Send a text message via WhatsApp Cloud API."""
148 return await self._send_msg(channel_id, {
149 "type": "text",
150 "text": {"body": content, "preview_url": False},
151 })
153 async def reply_template(
154 self, channel_id: str, template_name: str,
155 language_code: str = "en", components: list = None, **kwargs,
156 ) -> ReplyResult:
157 """Send a WhatsApp message template."""
158 body = {
159 "type": "template",
160 "template": {
161 "name": template_name,
162 "language": {"code": language_code},
163 },
164 }
165 if components:
166 body["template"]["components"] = components
167 return await self._send_msg(channel_id, body)
169 async def reply_interactive(
170 self, channel_id: str, body_text: str,
171 buttons: list[dict], **kwargs,
172 ) -> ReplyResult:
173 """Send an interactive message with reply buttons.
175 buttons = [{"id": "yes", "title": "Yes"}, ...]
176 """
177 button_list = [
178 {"type": "reply", "reply": {"id": b["id"], "title": b["title"]}}
179 for b in buttons[:3]
180 ]
181 return await self._send_msg(channel_id, {
182 "type": "interactive",
183 "interactive": {
184 "type": "button",
185 "body": {"text": body_text},
186 "action": {"buttons": button_list},
187 },
188 })
190 async def reply_image(
191 self, channel_id: str, image_url: str, caption: str = "", **kwargs
192 ) -> ReplyResult:
193 """Send an image."""
194 return await self._send_msg(channel_id, {
195 "type": "image",
196 "image": {"link": image_url, "caption": caption},
197 })
199 # ── API Helper ──
201 async def _send_msg(self, to: str, msg_data: dict) -> ReplyResult:
202 """Send message via WhatsApp Cloud API."""
203 url = f"{self.API_BASE}/{self._phone_number_id}/messages"
204 headers = {
205 "Authorization": f"Bearer {self._access_token}",
206 "Content-Type": "application/json",
207 }
208 body = {
209 "messaging_product": "whatsapp",
210 "recipient_type": "individual",
211 "to": to,
212 **msg_data,
213 }
215 try:
216 import aiohttp
217 async with aiohttp.ClientSession() as session:
218 async with session.post(url, headers=headers, json=body) as resp:
219 data = await resp.json()
220 wa_id = data.get("messages", [{}])[0].get("id", "")
221 if wa_id:
222 return ReplyResult(success=True, message_id=wa_id)
223 return ReplyResult(
224 success=False,
225 error=data.get("error", {}).get("message", "unknown"),
226 )
227 except ImportError:
228 import urllib.request
229 req = urllib.request.Request(
230 url, data=json.dumps(body).encode(), headers=headers
231 )
232 with urllib.request.urlopen(req) as resp:
233 data = json.loads(resp.read())
234 wa_id = data.get("messages", [{}])[0].get("id", "")
235 return ReplyResult(success=True, message_id=wa_id)
237 async def mark_as_read(self, message_id: str) -> bool:
238 """Mark a message as read."""
239 url = f"{self.API_BASE}/{self._phone_number_id}/messages"
240 headers = {
241 "Authorization": f"Bearer {self._access_token}",
242 "Content-Type": "application/json",
243 }
244 body = {
245 "messaging_product": "whatsapp",
246 "status": "read",
247 "message_id": message_id,
248 }
249 try:
250 import aiohttp
251 async with aiohttp.ClientSession() as session:
252 async with session.post(url, headers=headers, json=body) as resp:
253 return resp.status == 200
254 except ImportError:
255 import urllib.request
256 req = urllib.request.Request(
257 url, data=json.dumps(body).encode(), headers=headers
258 )
259 with urllib.request.urlopen(req) as resp:
260 return resp.status == 200