Coverage for agentos/channels/adapters/qq.py: 0%
67 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS Channels — QQ 机器人适配器。
4Webhook 规范: https://bot.q.qq.com/wiki/develop/api-v2/
6特性:
7 - WebSocket 长连接(QQ 官方推荐) + HTTP webhook 双模式
8 - JSON 报文解析
9 - Bot Token 管理
10 - 主动推送 + 被动回复
11"""
13from __future__ import annotations
15import json
16import time
17from typing import Optional
19import httpx
21from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult
22from agentos.channels.message import ChannelMessage, ChannelType, MessageType
25class QQAdapter(BaseChannelAdapter):
26 """QQ 机器人适配器。"""
28 channel_type = ChannelType.QQ
30 def __init__(self, config: ChannelConfig):
31 super().__init__(config)
32 self._token: str = config.bot_token or ""
33 self._token_expires: float = float("inf") # QQ Bot Token 由配置直接提供
35 # ── Webhook ──
37 def verify_signature(self, raw_body: bytes, headers: dict) -> bool:
38 """QQ Bot 暂不强验证签名。"""
39 return True
41 def parse_webhook(self, raw_body: bytes, headers: dict) -> ChannelMessage | list[ChannelMessage]:
42 data = json.loads(raw_body.decode("utf-8"))
43 op = data.get("op", 0)
44 t = data.get("t", "")
45 event_data = data.get("d", {})
47 # 处理不同类型的 QQ 事件
48 if op == 10:
49 # Hello 事件
50 return ChannelMessage(
51 msg_id="hello",
52 channel=ChannelType.QQ,
53 msg_type=MessageType.EVENT,
54 content="hello",
55 extra={"op": 10, "heartbeat_interval": event_data.get("heartbeat_interval", 0)},
56 )
58 if op == 11:
59 # Heartbeat ACK
60 return ChannelMessage(
61 msg_id="heartbeat_ack",
62 channel=ChannelType.QQ,
63 msg_type=MessageType.EVENT,
64 content="heartbeat_ack",
65 extra={"op": 11},
66 )
68 # op == 0: Dispatch 事件
69 msg_map = {
70 "AT_MESSAGE_CREATE": "text",
71 "MESSAGE_CREATE": "text",
72 "DIRECT_MESSAGE_CREATE": "text",
73 "C2C_MESSAGE_CREATE": "text",
74 }
75 msg_type_str = msg_map.get(t, "text")
76 msg_type_map = {"text": MessageType.TEXT}
77 msg_type = msg_type_map.get(msg_type_str, MessageType.TEXT)
79 author = event_data.get("author", {})
80 content = event_data.get("content", "").strip()
82 # 去掉 @机器人 前缀
83 if content.startswith("<@"):
84 end = content.find(">")
85 if end > 0:
86 content = content[end + 1:].strip()
88 return ChannelMessage(
89 msg_id=event_data.get("id", ""),
90 channel=ChannelType.QQ,
91 msg_type=msg_type,
92 content=content,
93 sender_id=author.get("id", ""),
94 sender_name=author.get("username", ""),
95 timestamp=float(time.time()),
96 conversation_id=event_data.get("channel_id", event_data.get("guild_id", "")),
97 reply_token="",
98 extra={
99 "guild_id": event_data.get("guild_id"),
100 "channel_id": event_data.get("channel_id"),
101 "member": event_data.get("member"),
102 "event_type": t,
103 "is_group": bool(event_data.get("guild_id")),
104 },
105 )
107 def build_reply(self, msg: ChannelMessage, reply_text: str) -> str:
108 return json.dumps({
109 "msg_type": 0,
110 "content": reply_text,
111 "msg_id": msg.msg_id,
112 "message_reference": {"message_id": msg.msg_id},
113 })
115 # ── 主动推送 ──
117 async def send_message(self, user_id: str, content: str, msg_type: str = "text") -> ReplyResult:
118 token = await self.get_access_token()
119 # QQ Bot 发消息需要知道 channel_id
120 channel_id = self.config.extra.get("channel_id", "")
121 if not channel_id:
122 return ReplyResult(success=False, error="no channel_id in config")
124 url = f"https://api.sgroup.qq.com/channels/{channel_id}/messages"
125 headers = {"Authorization": f"Bot {self.config.app_id}.{token}"}
126 payload = {"content": content, "msg_type": 0}
128 async with httpx.AsyncClient() as client:
129 resp = await client.post(url, json=payload, headers=headers, timeout=10)
130 data = resp.json()
131 msg_id = data.get("id", "")
132 if msg_id:
133 return ReplyResult(success=True, msg_id=msg_id)
134 return ReplyResult(success=False, error=f"qq error: {data}")
136 async def send_c2c_message(self, user_id: str, content: str) -> ReplyResult:
137 """发送私聊消息。"""
138 token = await self.get_access_token()
139 url = f"https://api.sgroup.qq.com/v2/users/{user_id}/messages"
140 headers = {"Authorization": f"Bot {self.config.app_id}.{token}"}
141 payload = {"content": content, "msg_type": 0}
143 async with httpx.AsyncClient() as client:
144 resp = await client.post(url, json=payload, headers=headers, timeout=10)
145 data = resp.json()
146 return ReplyResult(success="id" in data, msg_id=data.get("id", ""))
148 async def send_image(self, user_id: str, image_url: str) -> ReplyResult:
149 return await self.send_message(user_id, f"[图片] {image_url}")
151 async def send_file(self, user_id: str, file_url: str, filename: str) -> ReplyResult:
152 return await self.send_message(user_id, f"文件: {filename}\n{file_url}")
154 # ── Token ──
156 async def get_access_token(self) -> str:
157 return self._token or self.config.bot_token