Coverage for agentos/channels/adapters/dingtalk.py: 0%
58 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS Channels — 钉钉适配器。
4Webhook 规范: https://open.dingtalk.com/document/orgapp/receive-message
6特性:
7 - JSON 报文解析
8 - 签名验证(timestamp + sign)
9 - access_token 管理
10 - 主动推送(工作通知 + 群机器人)
11"""
13from __future__ import annotations
15import json
16import time
17from typing import Optional
18from urllib.parse import unquote
20import httpx
22from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult
23from agentos.channels.message import ChannelMessage, ChannelType, MessageType
26class DingTalkAdapter(BaseChannelAdapter):
27 """钉钉适配器。"""
29 channel_type = ChannelType.DINGTALK
31 def __init__(self, config: ChannelConfig):
32 super().__init__(config)
33 self._token: str = ""
34 self._token_expires: float = 0
36 # ── Webhook ──
38 def verify_signature(self, raw_body: bytes, headers: dict) -> bool:
39 """验证钉钉签名(timestamp + sign SHA256)。"""
40 params = headers.get("x-ding-params", {})
41 timestamp = params.get("timestamp", "")
42 sign = params.get("sign", "")
43 if not sign:
44 return True
45 computed = self.hmac_sha256(timestamp, self.config.app_secret)
46 return computed == sign
48 def parse_webhook(self, raw_body: bytes, headers: dict) -> ChannelMessage | list[ChannelMessage]:
49 data = json.loads(raw_body.decode("utf-8"))
50 msg_type_str = data.get("msgtype", "text")
51 msg_type_map = {
52 "text": MessageType.TEXT, "image": MessageType.IMAGE,
53 "voice": MessageType.VOICE, "video": MessageType.VIDEO,
54 "file": MessageType.FILE, "link": MessageType.LINK,
55 }
56 msg_type = msg_type_map.get(msg_type_str, MessageType.TEXT)
58 content = ""
59 if msg_type_str == "text":
60 content = data.get("text", {}).get("content", "")
61 elif msg_type_str == "image":
62 content = "[图片]"
64 return ChannelMessage(
65 msg_id=data.get("msgId", data.get("msgid", "")),
66 channel=ChannelType.DINGTALK,
67 msg_type=msg_type,
68 content=content,
69 sender_id=data.get("senderStaffId", data.get("senderId", "")),
70 sender_name=data.get("senderNick", ""),
71 timestamp=float(data.get("createAt", time.time() * 1000)) / 1000,
72 conversation_id=data.get("sessionWebhook", ""),
73 reply_token="",
74 media_url=data.get("image", {}).get("picUrl", ""),
75 media_id=data.get("image", {}).get("mediaId", ""),
76 extra={
77 "robot_code": data.get("robotCode"),
78 "chatbot_user_id": data.get("chatbotUserId"),
79 "chat_id": data.get("chatId"),
80 "is_admin": data.get("isAdmin", False),
81 "conversation_type": data.get("conversationType"),
82 "at_users": data.get("atUsers", []),
83 },
84 )
86 def build_reply(self, msg: ChannelMessage, reply_text: str) -> str:
87 return json.dumps({"msgtype": "text", "text": {"content": reply_text}})
89 # ── 主动推送 ──
91 async def send_message(self, user_id: str, content: str, msg_type: str = "text") -> ReplyResult:
92 token = await self.get_access_token()
93 url = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend"
94 payload = {
95 "robotCode": self.config.app_id,
96 "userIds": [user_id],
97 "msgKey": "sampleText",
98 "msgParam": json.dumps({"content": content}),
99 }
100 headers = {"x-acs-dingtalk-access-token": token, "Content-Type": "application/json"}
101 async with httpx.AsyncClient() as client:
102 resp = await client.post(url, json=payload, headers=headers, timeout=10)
103 return ReplyResult(success=resp.status_code == 200, msg_id=str(time.time()))
105 async def send_image(self, user_id: str, image_url: str) -> ReplyResult:
106 return await self.send_message(user_id, f"[图片] {image_url}")
108 async def send_file(self, user_id: str, file_url: str, filename: str) -> ReplyResult:
109 return await self.send_message(user_id, f"文件: {filename}\n{file_url}")
111 # ── Token ──
113 async def get_access_token(self) -> str:
114 if self._token and time.time() < self._token_expires - 300:
115 return self._token
116 url = "https://api.dingtalk.com/v1.0/oauth2/accessToken"
117 payload = {"appKey": self.config.app_id, "appSecret": self.config.app_secret}
118 async with httpx.AsyncClient() as client:
119 resp = await client.post(url, json=payload, timeout=10)
120 data = resp.json()
121 self._token = data["accessToken"]
122 self._token_expires = time.time() + data.get("expireIn", 7200)
123 return self._token