Coverage for agentos/channels/adapters/wecom.py: 0%
82 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS Channels — 企业微信适配器。
4Webhook 规范: https://developer.work.weixin.qq.com/document/path/90238
6特性:
7 - XML/JSON 双报文解析
8 - SHA1 签名验证
9 - 被动回复 + 主动群机器人 webhook 推送
10 - access_token 自动续期
11"""
13from __future__ import annotations
15import json
16import time
17import xml.etree.ElementTree as ET
18from typing import Optional
20import httpx
22from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult
23from agentos.channels.message import ChannelMessage, ChannelType, MessageType
26class WeComAdapter(BaseChannelAdapter):
27 """企业微信适配器。"""
29 channel_type = ChannelType.WECOM
31 def __init__(self, config: ChannelConfig):
32 super().__init__(config)
33 self._token: str = ""
34 self._token_expires: float = 0
36 # ── Webhook ──
38 def verify_signature(self, raw_body: bytes, headers: dict) -> bool:
39 """验证企微签名。"""
40 params = headers.get("x-wx-params", {})
41 msg_signature = params.get("msg_signature", "")
42 timestamp = str(params.get("timestamp", ""))
43 nonce = str(params.get("nonce", ""))
44 signature = self.make_signature(self.config.verify_token, timestamp, nonce, "")
45 return msg_signature == signature
47 def parse_webhook(self, raw_body: bytes, headers: dict) -> ChannelMessage | list[ChannelMessage]:
48 text = raw_body.decode("utf-8")
49 data = json.loads(text) if text.strip().startswith("{") else self._parse_xml(text)
50 msg_type_str = data.get("MsgType", data.get("msgtype", "text"))
51 msg_type_map = {
52 "text": MessageType.TEXT, "image": MessageType.IMAGE,
53 "voice": MessageType.VOICE, "video": MessageType.VIDEO,
54 "file": MessageType.FILE, "event": MessageType.EVENT,
55 }
56 msg_type = msg_type_map.get(msg_type_str, MessageType.TEXT)
57 content = ""
58 if msg_type_str == "text":
59 content = data.get("Content", data.get("text", {}).get("content", ""))
60 elif msg_type_str == "image":
61 content = "[图片]"
63 return ChannelMessage(
64 msg_id=data.get("MsgId", "") or "",
65 channel=ChannelType.WECOM,
66 msg_type=msg_type,
67 content=content,
68 sender_id=data.get("FromUserName", data.get("UserID", "")),
69 sender_name=data.get("Name", ""),
70 timestamp=float(data.get("CreateTime", time.time())),
71 conversation_id=data.get("ChatId", data.get("FromUserName", "")),
72 media_url=data.get("PicUrl", ""),
73 media_id=data.get("MediaId", ""),
74 extra={
75 "to_user": data.get("ToUserName"),
76 "agent_id": data.get("AgentID"),
77 "msg_type_raw": msg_type_str,
78 "webhook_url": data.get("WebhookUrl", ""),
79 "chat_type": data.get("ChatType", "single"),
80 },
81 )
83 def build_reply(self, msg: ChannelMessage, reply_text: str) -> str:
84 if msg.extra.get("webhook_url"):
85 return json.dumps({"msgtype": "text", "text": {"content": reply_text}})
86 to_user = msg.extra.get("to_user", msg.sender_id)
87 create_time = int(time.time())
88 return (
89 "<xml>"
90 f"<ToUserName><![CDATA[{to_user}]]></ToUserName>"
91 f"<FromUserName><![CDATA[{msg.sender_id}]]></FromUserName>"
92 f"<CreateTime>{create_time}</CreateTime>"
93 "<MsgType><![CDATA[text]]></MsgType>"
94 f"<Content><![CDATA[{reply_text}]]></Content>"
95 "</xml>"
96 )
98 # ── 主动推送(群机器人 webhook 或应用消息)──
100 async def send_message(self, user_id: str, content: str, msg_type: str = "text") -> ReplyResult:
101 # 如果有 webhook_url 则走群机器人推送
102 webhook_url = self.config.extra.get("webhook_url", "")
103 if webhook_url:
104 async with httpx.AsyncClient() as client:
105 resp = await client.post(webhook_url, json={
106 "msgtype": "text",
107 "text": {"content": content},
108 }, timeout=10)
109 return ReplyResult(success=resp.status_code == 200)
111 token = await self.get_access_token()
112 url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}"
113 payload = {
114 "touser": user_id,
115 "msgtype": "text",
116 "agentid": int(self.config.agent_id or 0),
117 "text": {"content": content},
118 }
119 async with httpx.AsyncClient() as client:
120 resp = await client.post(url, json=payload, timeout=10)
121 data = resp.json()
122 if data.get("errcode") == 0:
123 return ReplyResult(success=True, msg_id=data.get("msgid", ""))
124 return ReplyResult(success=False, error=f"wecom error {data.get('errcode')}: {data.get('errmsg')}")
126 async def send_image(self, user_id: str, image_url: str) -> ReplyResult:
127 token = await self.get_access_token()
128 url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}"
129 payload = {
130 "touser": user_id,
131 "msgtype": "image",
132 "agentid": int(self.config.agent_id or 0),
133 "image": {"media_id": image_url},
134 }
135 async with httpx.AsyncClient() as client:
136 resp = await client.post(url, json=payload, timeout=10)
137 return ReplyResult(success=resp.json().get("errcode") == 0)
139 async def send_file(self, user_id: str, file_url: str, filename: str) -> ReplyResult:
140 token = await self.get_access_token()
141 url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}"
142 payload = {
143 "touser": user_id,
144 "msgtype": "file",
145 "agentid": int(self.config.agent_id or 0),
146 "file": {"media_id": file_url},
147 }
148 async with httpx.AsyncClient() as client:
149 resp = await client.post(url, json=payload, timeout=10)
150 return ReplyResult(success=resp.json().get("errcode") == 0)
152 # ── Token ──
154 async def get_access_token(self) -> str:
155 if self._token and time.time() < self._token_expires - 300:
156 return self._token
157 url = (
158 "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
159 f"?corpid={self.config.corp_id}"
160 f"&corpsecret={self.config.app_secret}"
161 )
162 async with httpx.AsyncClient() as client:
163 resp = await client.get(url, timeout=10)
164 data = resp.json()
165 self._token = data["access_token"]
166 self._token_expires = time.time() + data.get("expires_in", 7200)
167 return self._token
169 @staticmethod
170 def _parse_xml(xml_str: str) -> dict:
171 root = ET.fromstring(xml_str)
172 return {child.tag: child.text for child in root}