Coverage for agentos/channels/adapters/qq.py: 0%

67 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS Channels — QQ 机器人适配器。 

3 

4Webhook 规范: https://bot.q.qq.com/wiki/develop/api-v2/ 

5 

6特性: 

7 - WebSocket 长连接(QQ 官方推荐) + HTTP webhook 双模式 

8 - JSON 报文解析 

9 - Bot Token 管理 

10 - 主动推送 + 被动回复 

11""" 

12 

13from __future__ import annotations 

14 

15import json 

16import time 

17from typing import Optional 

18 

19import httpx 

20 

21from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult 

22from agentos.channels.message import ChannelMessage, ChannelType, MessageType 

23 

24 

25class QQAdapter(BaseChannelAdapter): 

26 """QQ 机器人适配器。""" 

27 

28 channel_type = ChannelType.QQ 

29 

30 def __init__(self, config: ChannelConfig): 

31 super().__init__(config) 

32 self._token: str = config.bot_token or "" 

33 self._token_expires: float = float("inf") # QQ Bot Token 由配置直接提供 

34 

35 # ── Webhook ── 

36 

37 def verify_signature(self, raw_body: bytes, headers: dict) -> bool: 

38 """QQ Bot 暂不强验证签名。""" 

39 return True 

40 

41 def parse_webhook(self, raw_body: bytes, headers: dict) -> ChannelMessage | list[ChannelMessage]: 

42 data = json.loads(raw_body.decode("utf-8")) 

43 op = data.get("op", 0) 

44 t = data.get("t", "") 

45 event_data = data.get("d", {}) 

46 

47 # 处理不同类型的 QQ 事件 

48 if op == 10: 

49 # Hello 事件 

50 return ChannelMessage( 

51 msg_id="hello", 

52 channel=ChannelType.QQ, 

53 msg_type=MessageType.EVENT, 

54 content="hello", 

55 extra={"op": 10, "heartbeat_interval": event_data.get("heartbeat_interval", 0)}, 

56 ) 

57 

58 if op == 11: 

59 # Heartbeat ACK 

60 return ChannelMessage( 

61 msg_id="heartbeat_ack", 

62 channel=ChannelType.QQ, 

63 msg_type=MessageType.EVENT, 

64 content="heartbeat_ack", 

65 extra={"op": 11}, 

66 ) 

67 

68 # op == 0: Dispatch 事件 

69 msg_map = { 

70 "AT_MESSAGE_CREATE": "text", 

71 "MESSAGE_CREATE": "text", 

72 "DIRECT_MESSAGE_CREATE": "text", 

73 "C2C_MESSAGE_CREATE": "text", 

74 } 

75 msg_type_str = msg_map.get(t, "text") 

76 msg_type_map = {"text": MessageType.TEXT} 

77 msg_type = msg_type_map.get(msg_type_str, MessageType.TEXT) 

78 

79 author = event_data.get("author", {}) 

80 content = event_data.get("content", "").strip() 

81 

82 # 去掉 @机器人 前缀 

83 if content.startswith("<@"): 

84 end = content.find(">") 

85 if end > 0: 

86 content = content[end + 1:].strip() 

87 

88 return ChannelMessage( 

89 msg_id=event_data.get("id", ""), 

90 channel=ChannelType.QQ, 

91 msg_type=msg_type, 

92 content=content, 

93 sender_id=author.get("id", ""), 

94 sender_name=author.get("username", ""), 

95 timestamp=float(time.time()), 

96 conversation_id=event_data.get("channel_id", event_data.get("guild_id", "")), 

97 reply_token="", 

98 extra={ 

99 "guild_id": event_data.get("guild_id"), 

100 "channel_id": event_data.get("channel_id"), 

101 "member": event_data.get("member"), 

102 "event_type": t, 

103 "is_group": bool(event_data.get("guild_id")), 

104 }, 

105 ) 

106 

107 def build_reply(self, msg: ChannelMessage, reply_text: str) -> str: 

108 return json.dumps({ 

109 "msg_type": 0, 

110 "content": reply_text, 

111 "msg_id": msg.msg_id, 

112 "message_reference": {"message_id": msg.msg_id}, 

113 }) 

114 

115 # ── 主动推送 ── 

116 

117 async def send_message(self, user_id: str, content: str, msg_type: str = "text") -> ReplyResult: 

118 token = await self.get_access_token() 

119 # QQ Bot 发消息需要知道 channel_id 

120 channel_id = self.config.extra.get("channel_id", "") 

121 if not channel_id: 

122 return ReplyResult(success=False, error="no channel_id in config") 

123 

124 url = f"https://api.sgroup.qq.com/channels/{channel_id}/messages" 

125 headers = {"Authorization": f"Bot {self.config.app_id}.{token}"} 

126 payload = {"content": content, "msg_type": 0} 

127 

128 async with httpx.AsyncClient() as client: 

129 resp = await client.post(url, json=payload, headers=headers, timeout=10) 

130 data = resp.json() 

131 msg_id = data.get("id", "") 

132 if msg_id: 

133 return ReplyResult(success=True, msg_id=msg_id) 

134 return ReplyResult(success=False, error=f"qq error: {data}") 

135 

136 async def send_c2c_message(self, user_id: str, content: str) -> ReplyResult: 

137 """发送私聊消息。""" 

138 token = await self.get_access_token() 

139 url = f"https://api.sgroup.qq.com/v2/users/{user_id}/messages" 

140 headers = {"Authorization": f"Bot {self.config.app_id}.{token}"} 

141 payload = {"content": content, "msg_type": 0} 

142 

143 async with httpx.AsyncClient() as client: 

144 resp = await client.post(url, json=payload, headers=headers, timeout=10) 

145 data = resp.json() 

146 return ReplyResult(success="id" in data, msg_id=data.get("id", "")) 

147 

148 async def send_image(self, user_id: str, image_url: str) -> ReplyResult: 

149 return await self.send_message(user_id, f"[图片] {image_url}") 

150 

151 async def send_file(self, user_id: str, file_url: str, filename: str) -> ReplyResult: 

152 return await self.send_message(user_id, f"文件: {filename}\n{file_url}") 

153 

154 # ── Token ── 

155 

156 async def get_access_token(self) -> str: 

157 return self._token or self.config.bot_token