Coverage for agentos/channels/adapters/feishu.py: 0%

84 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 18:40 +0800

1""" 

2AgentOS Channels — 飞书适配器。 

3 

4Webhook 规范: https://open.feishu.cn/document/server-docs/im-v1/message-content-description 

5 

6特性: 

7 - JSON 报文解析 

8 - 应用 Token + tenant access token 双 token 管理 

9 - 卡片消息支持 

10 - 消息回复(被动 + 主动) 

11""" 

12 

13from __future__ import annotations 

14 

15import json 

16import time 

17from typing import Optional 

18 

19import httpx 

20 

21from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult 

22from agentos.channels.message import ChannelMessage, ChannelType, MessageType 

23 

24 

25class FeishuAdapter(BaseChannelAdapter): 

26 """飞书适配器。""" 

27 

28 channel_type = ChannelType.FEISHU 

29 

30 def __init__(self, config: ChannelConfig): 

31 super().__init__(config) 

32 self._app_token: str = "" 

33 self._tenant_token: str = "" 

34 self._token_expires: float = 0 

35 

36 # ── Webhook ── 

37 

38 def verify_signature(self, raw_body: bytes, headers: dict) -> bool: 

39 """验证飞书事件订阅签名。 

40 

41 签名算法: Base64Encode(SHA256(timestamp + nonce + encrypt_key)) 

42 文档: https://open.feishu.cn/document/server-docs/event-subscription-guide/event-subscription-configure-/encrypt-key-encryption-configuration- 

43 """ 

44 import base64 

45 import hashlib 

46 

47 timestamp = headers.get("X-Lark-Request-Timestamp", "") 

48 nonce = headers.get("X-Lark-Request-Nonce", "") 

49 signature = headers.get("X-Lark-Signature", "") 

50 

51 encrypt_key = self.config.encoding_aes_key or self.config.verify_token 

52 if not all([timestamp, nonce, signature, encrypt_key]): 

53 return False 

54 

55 raw = f"{timestamp}{nonce}{encrypt_key}" 

56 computed = base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode() 

57 return signature == computed 

58 

59 def parse_webhook(self, raw_body: bytes, headers: dict) -> ChannelMessage | list[ChannelMessage]: 

60 data = json.loads(raw_body.decode("utf-8")) 

61 # 飞书事件格式: {"schema": "2.0", "header": {...}, "event": {...}} 

62 event = data.get("event", data) 

63 header = data.get("header", {}) 

64 

65 # 处理 URL 验证 

66 if data.get("type") == "url_verification": 

67 return ChannelMessage( 

68 msg_id="url_verify", 

69 channel=ChannelType.FEISHU, 

70 msg_type=MessageType.EVENT, 

71 content=data.get("challenge", ""), 

72 reply_token=data.get("token", ""), 

73 extra={"is_challenge": True, "challenge": data.get("challenge", "")}, 

74 ) 

75 

76 msg_type_str = event.get("message", {}).get("message_type", "text") 

77 msg_type_map = { 

78 "text": MessageType.TEXT, "image": MessageType.IMAGE, 

79 "audio": MessageType.VOICE, "media": MessageType.FILE, 

80 "file": MessageType.FILE, "post": MessageType.TEXT, 

81 } 

82 msg_type = msg_type_map.get(msg_type_str, MessageType.TEXT) 

83 

84 message = event.get("message", {}) 

85 content = "" 

86 if msg_type_str == "text": 

87 content = json.loads(message.get("content", "{}")).get("text", "") 

88 elif msg_type_str == "post": 

89 content = str(message.get("content", ""))[:200] 

90 

91 sender = event.get("sender", {}) 

92 sender_id = sender.get("sender_id", {}).get("open_id", "") 

93 

94 return ChannelMessage( 

95 msg_id=header.get("event_id", event.get("message", {}).get("message_id", "")), 

96 channel=ChannelType.FEISHU, 

97 msg_type=msg_type, 

98 content=content, 

99 sender_id=sender_id, 

100 sender_name="", 

101 timestamp=float(header.get("create_time", str(int(time.time() * 1000)))) / 1000, 

102 conversation_id=event.get("message", {}).get("chat_id", ""), 

103 reply_token=event.get("message", {}).get("message_id", ""), 

104 media_url=message.get("image_key", ""), 

105 extra={ 

106 "tenant_key": header.get("tenant_key"), 

107 "event_type": header.get("event_type"), 

108 "chat_type": event.get("message", {}).get("chat_type", "p2p"), 

109 "root_id": event.get("message", {}).get("root_id"), 

110 "parent_id": event.get("message", {}).get("parent_id"), 

111 }, 

112 ) 

113 

114 def build_reply(self, msg: ChannelMessage, reply_text: str) -> str: 

115 return json.dumps({ 

116 "msg_type": "text", 

117 "content": json.dumps({"text": reply_text}), 

118 }) 

119 

120 # ── 主动推送 ── 

121 

122 async def send_message(self, user_id: str, content: str, msg_type: str = "text") -> ReplyResult: 

123 token = await self.get_access_token() 

124 url = "https://open.feishu.cn/open-apis/im/v1/messages" 

125 payload = { 

126 "receive_id": user_id, 

127 "msg_type": "text", 

128 "content": json.dumps({"text": content}), 

129 } 

130 headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} 

131 async with httpx.AsyncClient() as client: 

132 resp = await client.post(url, params={"receive_id_type": "open_id"}, json=payload, headers=headers, timeout=10) 

133 data = resp.json() 

134 if data.get("code") == 0: 

135 return ReplyResult(success=True, msg_id=data.get("data", {}).get("message_id", "")) 

136 return ReplyResult(success=False, error=f"feishu error {data.get('code')}: {data.get('msg')}") 

137 

138 async def send_image(self, user_id: str, image_url: str) -> ReplyResult: 

139 token = await self.get_access_token() 

140 url = "https://open.feishu.cn/open-apis/im/v1/messages" 

141 payload = { 

142 "receive_id": user_id, 

143 "msg_type": "image", 

144 "content": json.dumps({"image_key": image_url}), 

145 } 

146 headers = {"Authorization": f"Bearer {token}"} 

147 async with httpx.AsyncClient() as client: 

148 resp = await client.post(url, params={"receive_id_type": "open_id"}, json=payload, headers=headers, timeout=10) 

149 return ReplyResult(success=resp.json().get("code") == 0) 

150 

151 async def send_file(self, user_id: str, file_url: str, filename: str) -> ReplyResult: 

152 token = await self.get_access_token() 

153 url = "https://open.feishu.cn/open-apis/im/v1/messages" 

154 payload = { 

155 "receive_id": user_id, 

156 "msg_type": "file", 

157 "content": json.dumps({"file_key": file_url}), 

158 } 

159 headers = {"Authorization": f"Bearer {token}"} 

160 async with httpx.AsyncClient() as client: 

161 resp = await client.post(url, params={"receive_id_type": "open_id"}, json=payload, headers=headers, timeout=10) 

162 return ReplyResult(success=resp.json().get("code") == 0) 

163 

164 # ── Token ── 

165 

166 async def get_access_token(self) -> str: 

167 if self._tenant_token and time.time() < self._token_expires - 300: 

168 return self._tenant_token 

169 url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" 

170 payload = {"app_id": self.config.app_id, "app_secret": self.config.app_secret} 

171 async with httpx.AsyncClient() as client: 

172 resp = await client.post(url, json=payload, timeout=10) 

173 data = resp.json() 

174 self._tenant_token = data["tenant_access_token"] 

175 self._token_expires = time.time() + data.get("expire", 7200) 

176 return self._tenant_token