Coverage for agentos/channels/adapters/feishu.py: 0%
74 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS Channels — 飞书适配器。
4Webhook 规范: https://open.feishu.cn/document/server-docs/im-v1/message-content-description
6特性:
7 - JSON 报文解析
8 - 应用 Token + tenant access token 双 token 管理
9 - 卡片消息支持
10 - 消息回复(被动 + 主动)
11"""
13from __future__ import annotations
15import json
16import time
17from typing import Optional
19import httpx
21from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult
22from agentos.channels.message import ChannelMessage, ChannelType, MessageType
25class FeishuAdapter(BaseChannelAdapter):
26 """飞书适配器。"""
28 channel_type = ChannelType.FEISHU
30 def __init__(self, config: ChannelConfig):
31 super().__init__(config)
32 self._app_token: str = ""
33 self._tenant_token: str = ""
34 self._token_expires: float = 0
36 # ── Webhook ──
38 def verify_signature(self, raw_body: bytes, headers: dict) -> bool:
39 """飞书不要求 webhook 签名验证(在事件订阅 URL 验证时完成)。"""
40 return True
42 def parse_webhook(self, raw_body: bytes, headers: dict) -> ChannelMessage | list[ChannelMessage]:
43 data = json.loads(raw_body.decode("utf-8"))
44 # 飞书事件格式: {"schema": "2.0", "header": {...}, "event": {...}}
45 event = data.get("event", data)
46 header = data.get("header", {})
48 # 处理 URL 验证
49 if data.get("type") == "url_verification":
50 return ChannelMessage(
51 msg_id="url_verify",
52 channel=ChannelType.FEISHU,
53 msg_type=MessageType.EVENT,
54 content=data.get("challenge", ""),
55 reply_token=data.get("token", ""),
56 extra={"is_challenge": True, "challenge": data.get("challenge", "")},
57 )
59 msg_type_str = event.get("message", {}).get("message_type", "text")
60 msg_type_map = {
61 "text": MessageType.TEXT, "image": MessageType.IMAGE,
62 "audio": MessageType.VOICE, "media": MessageType.FILE,
63 "file": MessageType.FILE, "post": MessageType.TEXT,
64 }
65 msg_type = msg_type_map.get(msg_type_str, MessageType.TEXT)
67 message = event.get("message", {})
68 content = ""
69 if msg_type_str == "text":
70 content = json.loads(message.get("content", "{}")).get("text", "")
71 elif msg_type_str == "post":
72 content = str(message.get("content", ""))[:200]
74 sender = event.get("sender", {})
75 sender_id = sender.get("sender_id", {}).get("open_id", "")
77 return ChannelMessage(
78 msg_id=header.get("event_id", event.get("message", {}).get("message_id", "")),
79 channel=ChannelType.FEISHU,
80 msg_type=msg_type,
81 content=content,
82 sender_id=sender_id,
83 sender_name="",
84 timestamp=float(header.get("create_time", str(int(time.time() * 1000)))) / 1000,
85 conversation_id=event.get("message", {}).get("chat_id", ""),
86 reply_token=event.get("message", {}).get("message_id", ""),
87 media_url=message.get("image_key", ""),
88 extra={
89 "tenant_key": header.get("tenant_key"),
90 "event_type": header.get("event_type"),
91 "chat_type": event.get("message", {}).get("chat_type", "p2p"),
92 "root_id": event.get("message", {}).get("root_id"),
93 "parent_id": event.get("message", {}).get("parent_id"),
94 },
95 )
97 def build_reply(self, msg: ChannelMessage, reply_text: str) -> str:
98 return json.dumps({
99 "msg_type": "text",
100 "content": json.dumps({"text": reply_text}),
101 })
103 # ── 主动推送 ──
105 async def send_message(self, user_id: str, content: str, msg_type: str = "text") -> ReplyResult:
106 token = await self.get_access_token()
107 url = "https://open.feishu.cn/open-apis/im/v1/messages"
108 payload = {
109 "receive_id": user_id,
110 "msg_type": "text",
111 "content": json.dumps({"text": content}),
112 }
113 headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
114 async with httpx.AsyncClient() as client:
115 resp = await client.post(url, params={"receive_id_type": "open_id"}, json=payload, headers=headers, timeout=10)
116 data = resp.json()
117 if data.get("code") == 0:
118 return ReplyResult(success=True, msg_id=data.get("data", {}).get("message_id", ""))
119 return ReplyResult(success=False, error=f"feishu error {data.get('code')}: {data.get('msg')}")
121 async def send_image(self, user_id: str, image_url: str) -> ReplyResult:
122 token = await self.get_access_token()
123 url = "https://open.feishu.cn/open-apis/im/v1/messages"
124 payload = {
125 "receive_id": user_id,
126 "msg_type": "image",
127 "content": json.dumps({"image_key": image_url}),
128 }
129 headers = {"Authorization": f"Bearer {token}"}
130 async with httpx.AsyncClient() as client:
131 resp = await client.post(url, params={"receive_id_type": "open_id"}, json=payload, headers=headers, timeout=10)
132 return ReplyResult(success=resp.json().get("code") == 0)
134 async def send_file(self, user_id: str, file_url: str, filename: str) -> ReplyResult:
135 token = await self.get_access_token()
136 url = "https://open.feishu.cn/open-apis/im/v1/messages"
137 payload = {
138 "receive_id": user_id,
139 "msg_type": "file",
140 "content": json.dumps({"file_key": file_url}),
141 }
142 headers = {"Authorization": f"Bearer {token}"}
143 async with httpx.AsyncClient() as client:
144 resp = await client.post(url, params={"receive_id_type": "open_id"}, json=payload, headers=headers, timeout=10)
145 return ReplyResult(success=resp.json().get("code") == 0)
147 # ── Token ──
149 async def get_access_token(self) -> str:
150 if self._tenant_token and time.time() < self._token_expires - 300:
151 return self._tenant_token
152 url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
153 payload = {"app_id": self.config.app_id, "app_secret": self.config.app_secret}
154 async with httpx.AsyncClient() as client:
155 resp = await client.post(url, json=payload, timeout=10)
156 data = resp.json()
157 self._tenant_token = data["tenant_access_token"]
158 self._token_expires = time.time() + data.get("expire", 7200)
159 return self._tenant_token