Coverage for agentos/channels/adapters/telegram.py: 0%
94 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Telegram Channel Adapter — Telegram Bot API.
4BotFather token → long-polling / webhook → ChannelMessage.
5"""
7from __future__ import annotations
9import json
10from typing import Optional, Any
12from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult
13from agentos.channels.message import ChannelMessage, ChannelType, MessageType
16class TelegramAdapter(BaseChannelAdapter):
17 """Telegram Bot API adapter.
19 Config fields:
20 bot_token: Telegram bot token from @BotFather
21 webhook_url: HTTPS webhook URL (leave empty for long-polling)
22 allowed_chats: list of chat IDs (empty = all)
23 parse_mode: "HTML" | "MarkdownV2" | None
24 """
26 CHANNEL_TYPE = ChannelType.TELEGRAM
28 # Telegram API base URL
29 API_BASE = "https://api.telegram.org"
31 def __init__(self, config: ChannelConfig):
32 super().__init__(config)
33 self._bot_token = config.extra.get("bot_token", "")
34 self._webhook_url = config.extra.get("webhook_url", "")
35 self._allowed_chats = config.extra.get("allowed_chats", [])
36 self._parse_mode = config.extra.get("parse_mode", "")
38 @property
39 def _api(self) -> str:
40 return f"{self.API_BASE}/bot{self._bot_token}"
42 # ── Message parsing ──
44 async def parse_incoming(self, payload: dict) -> Optional[ChannelMessage]:
45 """Parse Telegram Update into ChannelMessage."""
46 if "message" in payload:
47 return self._parse_message(payload["message"])
48 elif "callback_query" in payload:
49 return self._parse_callback(payload["callback_query"])
50 elif "inline_query" in payload:
51 return self._parse_inline(payload["inline_query"])
52 return None
54 def _parse_message(self, msg: dict) -> Optional[ChannelMessage]:
55 """Parse a Telegram Message object."""
56 chat = msg.get("chat", {})
57 user = msg.get("from", {})
58 chat_id = str(chat.get("id", ""))
59 user_id = str(user.get("id", ""))
61 if self._allowed_chats and chat_id not in self._allowed_chats:
62 return None
64 content = ""
65 msg_type = MessageType.TEXT
66 metadata = {
67 "chat_type": chat.get("type", "private"),
68 "chat_title": chat.get("title", ""),
69 "username": user.get("username", ""),
70 "first_name": user.get("first_name", ""),
71 }
73 if "text" in msg:
74 content = msg["text"]
75 elif "photo" in msg:
76 content = msg.get("caption", "[Photo]")
77 msg_type = MessageType.IMAGE
78 metadata["file_id"] = msg["photo"][-1]["file_id"]
79 elif "document" in msg:
80 content = msg.get("caption", "[Document]")
81 msg_type = MessageType.FILE
82 metadata["file_id"] = msg["document"]["file_id"]
83 metadata["file_name"] = msg["document"].get("file_name", "")
84 elif "voice" in msg:
85 content = "[Voice message]"
86 msg_type = MessageType.VOICE
87 elif "video" in msg:
88 content = msg.get("caption", "[Video]")
89 msg_type = MessageType.VIDEO
90 elif "sticker" in msg:
91 content = f"[Sticker: {msg['sticker'].get('emoji', '')}]"
92 msg_type = MessageType.TEXT
93 else:
94 return None
96 return ChannelMessage(
97 channel_type=ChannelType.TELEGRAM,
98 channel_id=chat_id,
99 user_id=user_id,
100 content=content,
101 message_type=msg_type,
102 raw=msg,
103 reply_token=str(msg.get("message_id", "")),
104 metadata=metadata,
105 )
107 def _parse_callback(self, cb: dict) -> Optional[ChannelMessage]:
108 """Parse callback query (inline button press)."""
109 user = cb.get("from", {})
110 msg = cb.get("message", {})
111 return ChannelMessage(
112 channel_type=ChannelType.TELEGRAM,
113 channel_id=str(msg.get("chat", {}).get("id", "")),
114 user_id=str(user.get("id", "")),
115 content=cb.get("data", ""),
116 message_type=MessageType.INTERACTIVE,
117 raw=cb,
118 reply_token=cb.get("id", ""),
119 )
121 def _parse_inline(self, inline: dict) -> Optional[ChannelMessage]:
122 """Parse inline query."""
123 user = inline.get("from", {})
124 return ChannelMessage(
125 channel_type=ChannelType.TELEGRAM,
126 channel_id="inline",
127 user_id=str(user.get("id", "")),
128 content=inline.get("query", ""),
129 message_type=MessageType.COMMAND,
130 raw=inline,
131 reply_token=inline.get("id", ""),
132 )
134 # ── Reply ──
136 async def reply(self, channel_id: str, content: str, **kwargs) -> ReplyResult:
137 """Send message via sendMessage."""
138 return await self._api_call("sendMessage", {
139 "chat_id": channel_id,
140 "text": content,
141 "parse_mode": self._parse_mode,
142 "reply_to_message_id": kwargs.get("reply_token"),
143 })
145 async def reply_inline_keyboard(
146 self, channel_id: str, text: str,
147 buttons: list[list[dict]], **kwargs,
148 ) -> ReplyResult:
149 """Send message with inline keyboard buttons.
151 buttons = [[{"text": "Yes", "callback_data": "yes"}], ...]
152 """
153 return await self._api_call("sendMessage", {
154 "chat_id": channel_id,
155 "text": text,
156 "parse_mode": self._parse_mode,
157 "reply_markup": json.dumps({"inline_keyboard": buttons}),
158 })
160 async def reply_photo(
161 self, channel_id: str, photo_url: str, caption: str = "", **kwargs
162 ) -> ReplyResult:
163 """Send a photo."""
164 return await self._api_call("sendPhoto", {
165 "chat_id": channel_id,
166 "photo": photo_url,
167 "caption": caption,
168 })
170 async def answer_callback(self, callback_query_id: str, text: str = "") -> ReplyResult:
171 """Answer a callback query (dismiss loading spinner)."""
172 return await self._api_call("answerCallbackQuery", {
173 "callback_query_id": callback_query_id,
174 "text": text,
175 })
177 # ── Internal ──
179 async def _api_call(self, method: str, params: dict) -> ReplyResult:
180 """Call Telegram Bot API."""
181 url = f"{self._api}/{method}"
183 try:
184 import aiohttp
185 async with aiohttp.ClientSession() as session:
186 async with session.post(url, json=params) as resp:
187 data = await resp.json()
188 if data.get("ok"):
189 return ReplyResult(
190 success=True,
191 message_id=str(data.get("result", {}).get("message_id", "")),
192 )
193 return ReplyResult(
194 success=False,
195 error=data.get("description", "unknown"),
196 )
197 except ImportError:
198 import urllib.request
199 req = urllib.request.Request(
200 url, data=json.dumps(params).encode(),
201 headers={"Content-Type": "application/json"},
202 )
203 with urllib.request.urlopen(req) as resp:
204 data = json.loads(resp.read())
205 return ReplyResult(
206 success=data.get("ok", False),
207 message_id=str(data.get("result", {}).get("message_id", "")),
208 )
210 # ── Webhook ──
212 async def set_webhook(self, url: str) -> bool:
213 """Register webhook URL with Telegram."""
214 result = await self._api_call("setWebhook", {"url": url})
215 return result.success
217 async def delete_webhook(self) -> bool:
218 """Remove webhook (switch to long-polling)."""
219 result = await self._api_call("deleteWebhook", {})
220 return result.success