Coverage for agentos/channels/adapters/wecom.py: 0%

82 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS Channels — 企业微信适配器。 

3 

4Webhook 规范: https://developer.work.weixin.qq.com/document/path/90238 

5 

6特性: 

7 - XML/JSON 双报文解析 

8 - SHA1 签名验证 

9 - 被动回复 + 主动群机器人 webhook 推送 

10 - access_token 自动续期 

11""" 

12 

13from __future__ import annotations 

14 

15import json 

16import time 

17import xml.etree.ElementTree as ET 

18from typing import Optional 

19 

20import httpx 

21 

22from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult 

23from agentos.channels.message import ChannelMessage, ChannelType, MessageType 

24 

25 

26class WeComAdapter(BaseChannelAdapter): 

27 """企业微信适配器。""" 

28 

29 channel_type = ChannelType.WECOM 

30 

31 def __init__(self, config: ChannelConfig): 

32 super().__init__(config) 

33 self._token: str = "" 

34 self._token_expires: float = 0 

35 

36 # ── Webhook ── 

37 

38 def verify_signature(self, raw_body: bytes, headers: dict) -> bool: 

39 """验证企微签名。""" 

40 params = headers.get("x-wx-params", {}) 

41 msg_signature = params.get("msg_signature", "") 

42 timestamp = str(params.get("timestamp", "")) 

43 nonce = str(params.get("nonce", "")) 

44 signature = self.make_signature(self.config.verify_token, timestamp, nonce, "") 

45 return msg_signature == signature 

46 

47 def parse_webhook(self, raw_body: bytes, headers: dict) -> ChannelMessage | list[ChannelMessage]: 

48 text = raw_body.decode("utf-8") 

49 data = json.loads(text) if text.strip().startswith("{") else self._parse_xml(text) 

50 msg_type_str = data.get("MsgType", data.get("msgtype", "text")) 

51 msg_type_map = { 

52 "text": MessageType.TEXT, "image": MessageType.IMAGE, 

53 "voice": MessageType.VOICE, "video": MessageType.VIDEO, 

54 "file": MessageType.FILE, "event": MessageType.EVENT, 

55 } 

56 msg_type = msg_type_map.get(msg_type_str, MessageType.TEXT) 

57 content = "" 

58 if msg_type_str == "text": 

59 content = data.get("Content", data.get("text", {}).get("content", "")) 

60 elif msg_type_str == "image": 

61 content = "[图片]" 

62 

63 return ChannelMessage( 

64 msg_id=data.get("MsgId", "") or "", 

65 channel=ChannelType.WECOM, 

66 msg_type=msg_type, 

67 content=content, 

68 sender_id=data.get("FromUserName", data.get("UserID", "")), 

69 sender_name=data.get("Name", ""), 

70 timestamp=float(data.get("CreateTime", time.time())), 

71 conversation_id=data.get("ChatId", data.get("FromUserName", "")), 

72 media_url=data.get("PicUrl", ""), 

73 media_id=data.get("MediaId", ""), 

74 extra={ 

75 "to_user": data.get("ToUserName"), 

76 "agent_id": data.get("AgentID"), 

77 "msg_type_raw": msg_type_str, 

78 "webhook_url": data.get("WebhookUrl", ""), 

79 "chat_type": data.get("ChatType", "single"), 

80 }, 

81 ) 

82 

83 def build_reply(self, msg: ChannelMessage, reply_text: str) -> str: 

84 if msg.extra.get("webhook_url"): 

85 return json.dumps({"msgtype": "text", "text": {"content": reply_text}}) 

86 to_user = msg.extra.get("to_user", msg.sender_id) 

87 create_time = int(time.time()) 

88 return ( 

89 "<xml>" 

90 f"<ToUserName><![CDATA[{to_user}]]></ToUserName>" 

91 f"<FromUserName><![CDATA[{msg.sender_id}]]></FromUserName>" 

92 f"<CreateTime>{create_time}</CreateTime>" 

93 "<MsgType><![CDATA[text]]></MsgType>" 

94 f"<Content><![CDATA[{reply_text}]]></Content>" 

95 "</xml>" 

96 ) 

97 

98 # ── 主动推送(群机器人 webhook 或应用消息)── 

99 

100 async def send_message(self, user_id: str, content: str, msg_type: str = "text") -> ReplyResult: 

101 # 如果有 webhook_url 则走群机器人推送 

102 webhook_url = self.config.extra.get("webhook_url", "") 

103 if webhook_url: 

104 async with httpx.AsyncClient() as client: 

105 resp = await client.post(webhook_url, json={ 

106 "msgtype": "text", 

107 "text": {"content": content}, 

108 }, timeout=10) 

109 return ReplyResult(success=resp.status_code == 200) 

110 

111 token = await self.get_access_token() 

112 url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}" 

113 payload = { 

114 "touser": user_id, 

115 "msgtype": "text", 

116 "agentid": int(self.config.agent_id or 0), 

117 "text": {"content": content}, 

118 } 

119 async with httpx.AsyncClient() as client: 

120 resp = await client.post(url, json=payload, timeout=10) 

121 data = resp.json() 

122 if data.get("errcode") == 0: 

123 return ReplyResult(success=True, msg_id=data.get("msgid", "")) 

124 return ReplyResult(success=False, error=f"wecom error {data.get('errcode')}: {data.get('errmsg')}") 

125 

126 async def send_image(self, user_id: str, image_url: str) -> ReplyResult: 

127 token = await self.get_access_token() 

128 url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}" 

129 payload = { 

130 "touser": user_id, 

131 "msgtype": "image", 

132 "agentid": int(self.config.agent_id or 0), 

133 "image": {"media_id": image_url}, 

134 } 

135 async with httpx.AsyncClient() as client: 

136 resp = await client.post(url, json=payload, timeout=10) 

137 return ReplyResult(success=resp.json().get("errcode") == 0) 

138 

139 async def send_file(self, user_id: str, file_url: str, filename: str) -> ReplyResult: 

140 token = await self.get_access_token() 

141 url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}" 

142 payload = { 

143 "touser": user_id, 

144 "msgtype": "file", 

145 "agentid": int(self.config.agent_id or 0), 

146 "file": {"media_id": file_url}, 

147 } 

148 async with httpx.AsyncClient() as client: 

149 resp = await client.post(url, json=payload, timeout=10) 

150 return ReplyResult(success=resp.json().get("errcode") == 0) 

151 

152 # ── Token ── 

153 

154 async def get_access_token(self) -> str: 

155 if self._token and time.time() < self._token_expires - 300: 

156 return self._token 

157 url = ( 

158 "https://qyapi.weixin.qq.com/cgi-bin/gettoken" 

159 f"?corpid={self.config.corp_id}" 

160 f"&corpsecret={self.config.app_secret}" 

161 ) 

162 async with httpx.AsyncClient() as client: 

163 resp = await client.get(url, timeout=10) 

164 data = resp.json() 

165 self._token = data["access_token"] 

166 self._token_expires = time.time() + data.get("expires_in", 7200) 

167 return self._token 

168 

169 @staticmethod 

170 def _parse_xml(xml_str: str) -> dict: 

171 root = ET.fromstring(xml_str) 

172 return {child.tag: child.text for child in root}