Coverage for agentos/channels/adapters/dingtalk.py: 0%

58 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS Channels — 钉钉适配器。 

3 

4Webhook 规范: https://open.dingtalk.com/document/orgapp/receive-message 

5 

6特性: 

7 - JSON 报文解析 

8 - 签名验证(timestamp + sign) 

9 - access_token 管理 

10 - 主动推送(工作通知 + 群机器人) 

11""" 

12 

13from __future__ import annotations 

14 

15import json 

16import time 

17from typing import Optional 

18from urllib.parse import unquote 

19 

20import httpx 

21 

22from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult 

23from agentos.channels.message import ChannelMessage, ChannelType, MessageType 

24 

25 

26class DingTalkAdapter(BaseChannelAdapter): 

27 """钉钉适配器。""" 

28 

29 channel_type = ChannelType.DINGTALK 

30 

31 def __init__(self, config: ChannelConfig): 

32 super().__init__(config) 

33 self._token: str = "" 

34 self._token_expires: float = 0 

35 

36 # ── Webhook ── 

37 

38 def verify_signature(self, raw_body: bytes, headers: dict) -> bool: 

39 """验证钉钉签名(timestamp + sign SHA256)。""" 

40 params = headers.get("x-ding-params", {}) 

41 timestamp = params.get("timestamp", "") 

42 sign = params.get("sign", "") 

43 if not sign: 

44 return True 

45 computed = self.hmac_sha256(timestamp, self.config.app_secret) 

46 return computed == sign 

47 

48 def parse_webhook(self, raw_body: bytes, headers: dict) -> ChannelMessage | list[ChannelMessage]: 

49 data = json.loads(raw_body.decode("utf-8")) 

50 msg_type_str = data.get("msgtype", "text") 

51 msg_type_map = { 

52 "text": MessageType.TEXT, "image": MessageType.IMAGE, 

53 "voice": MessageType.VOICE, "video": MessageType.VIDEO, 

54 "file": MessageType.FILE, "link": MessageType.LINK, 

55 } 

56 msg_type = msg_type_map.get(msg_type_str, MessageType.TEXT) 

57 

58 content = "" 

59 if msg_type_str == "text": 

60 content = data.get("text", {}).get("content", "") 

61 elif msg_type_str == "image": 

62 content = "[图片]" 

63 

64 return ChannelMessage( 

65 msg_id=data.get("msgId", data.get("msgid", "")), 

66 channel=ChannelType.DINGTALK, 

67 msg_type=msg_type, 

68 content=content, 

69 sender_id=data.get("senderStaffId", data.get("senderId", "")), 

70 sender_name=data.get("senderNick", ""), 

71 timestamp=float(data.get("createAt", time.time() * 1000)) / 1000, 

72 conversation_id=data.get("sessionWebhook", ""), 

73 reply_token="", 

74 media_url=data.get("image", {}).get("picUrl", ""), 

75 media_id=data.get("image", {}).get("mediaId", ""), 

76 extra={ 

77 "robot_code": data.get("robotCode"), 

78 "chatbot_user_id": data.get("chatbotUserId"), 

79 "chat_id": data.get("chatId"), 

80 "is_admin": data.get("isAdmin", False), 

81 "conversation_type": data.get("conversationType"), 

82 "at_users": data.get("atUsers", []), 

83 }, 

84 ) 

85 

86 def build_reply(self, msg: ChannelMessage, reply_text: str) -> str: 

87 return json.dumps({"msgtype": "text", "text": {"content": reply_text}}) 

88 

89 # ── 主动推送 ── 

90 

91 async def send_message(self, user_id: str, content: str, msg_type: str = "text") -> ReplyResult: 

92 token = await self.get_access_token() 

93 url = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend" 

94 payload = { 

95 "robotCode": self.config.app_id, 

96 "userIds": [user_id], 

97 "msgKey": "sampleText", 

98 "msgParam": json.dumps({"content": content}), 

99 } 

100 headers = {"x-acs-dingtalk-access-token": token, "Content-Type": "application/json"} 

101 async with httpx.AsyncClient() as client: 

102 resp = await client.post(url, json=payload, headers=headers, timeout=10) 

103 return ReplyResult(success=resp.status_code == 200, msg_id=str(time.time())) 

104 

105 async def send_image(self, user_id: str, image_url: str) -> ReplyResult: 

106 return await self.send_message(user_id, f"[图片] {image_url}") 

107 

108 async def send_file(self, user_id: str, file_url: str, filename: str) -> ReplyResult: 

109 return await self.send_message(user_id, f"文件: {filename}\n{file_url}") 

110 

111 # ── Token ── 

112 

113 async def get_access_token(self) -> str: 

114 if self._token and time.time() < self._token_expires - 300: 

115 return self._token 

116 url = "https://api.dingtalk.com/v1.0/oauth2/accessToken" 

117 payload = {"appKey": self.config.app_id, "appSecret": self.config.app_secret} 

118 async with httpx.AsyncClient() as client: 

119 resp = await client.post(url, json=payload, timeout=10) 

120 data = resp.json() 

121 self._token = data["accessToken"] 

122 self._token_expires = time.time() + data.get("expireIn", 7200) 

123 return self._token