Coverage for agentos/channels/adapters/telegram.py: 0%

94 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Telegram Channel Adapter — Telegram Bot API. 

3 

4BotFather token → long-polling / webhook → ChannelMessage. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10from typing import Optional, Any 

11 

12from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult 

13from agentos.channels.message import ChannelMessage, ChannelType, MessageType 

14 

15 

16class TelegramAdapter(BaseChannelAdapter): 

17 """Telegram Bot API adapter. 

18 

19 Config fields: 

20 bot_token: Telegram bot token from @BotFather 

21 webhook_url: HTTPS webhook URL (leave empty for long-polling) 

22 allowed_chats: list of chat IDs (empty = all) 

23 parse_mode: "HTML" | "MarkdownV2" | None 

24 """ 

25 

26 CHANNEL_TYPE = ChannelType.TELEGRAM 

27 

28 # Telegram API base URL 

29 API_BASE = "https://api.telegram.org" 

30 

31 def __init__(self, config: ChannelConfig): 

32 super().__init__(config) 

33 self._bot_token = config.extra.get("bot_token", "") 

34 self._webhook_url = config.extra.get("webhook_url", "") 

35 self._allowed_chats = config.extra.get("allowed_chats", []) 

36 self._parse_mode = config.extra.get("parse_mode", "") 

37 

38 @property 

39 def _api(self) -> str: 

40 return f"{self.API_BASE}/bot{self._bot_token}" 

41 

42 # ── Message parsing ── 

43 

44 async def parse_incoming(self, payload: dict) -> Optional[ChannelMessage]: 

45 """Parse Telegram Update into ChannelMessage.""" 

46 if "message" in payload: 

47 return self._parse_message(payload["message"]) 

48 elif "callback_query" in payload: 

49 return self._parse_callback(payload["callback_query"]) 

50 elif "inline_query" in payload: 

51 return self._parse_inline(payload["inline_query"]) 

52 return None 

53 

54 def _parse_message(self, msg: dict) -> Optional[ChannelMessage]: 

55 """Parse a Telegram Message object.""" 

56 chat = msg.get("chat", {}) 

57 user = msg.get("from", {}) 

58 chat_id = str(chat.get("id", "")) 

59 user_id = str(user.get("id", "")) 

60 

61 if self._allowed_chats and chat_id not in self._allowed_chats: 

62 return None 

63 

64 content = "" 

65 msg_type = MessageType.TEXT 

66 metadata = { 

67 "chat_type": chat.get("type", "private"), 

68 "chat_title": chat.get("title", ""), 

69 "username": user.get("username", ""), 

70 "first_name": user.get("first_name", ""), 

71 } 

72 

73 if "text" in msg: 

74 content = msg["text"] 

75 elif "photo" in msg: 

76 content = msg.get("caption", "[Photo]") 

77 msg_type = MessageType.IMAGE 

78 metadata["file_id"] = msg["photo"][-1]["file_id"] 

79 elif "document" in msg: 

80 content = msg.get("caption", "[Document]") 

81 msg_type = MessageType.FILE 

82 metadata["file_id"] = msg["document"]["file_id"] 

83 metadata["file_name"] = msg["document"].get("file_name", "") 

84 elif "voice" in msg: 

85 content = "[Voice message]" 

86 msg_type = MessageType.VOICE 

87 elif "video" in msg: 

88 content = msg.get("caption", "[Video]") 

89 msg_type = MessageType.VIDEO 

90 elif "sticker" in msg: 

91 content = f"[Sticker: {msg['sticker'].get('emoji', '')}]" 

92 msg_type = MessageType.TEXT 

93 else: 

94 return None 

95 

96 return ChannelMessage( 

97 channel_type=ChannelType.TELEGRAM, 

98 channel_id=chat_id, 

99 user_id=user_id, 

100 content=content, 

101 message_type=msg_type, 

102 raw=msg, 

103 reply_token=str(msg.get("message_id", "")), 

104 metadata=metadata, 

105 ) 

106 

107 def _parse_callback(self, cb: dict) -> Optional[ChannelMessage]: 

108 """Parse callback query (inline button press).""" 

109 user = cb.get("from", {}) 

110 msg = cb.get("message", {}) 

111 return ChannelMessage( 

112 channel_type=ChannelType.TELEGRAM, 

113 channel_id=str(msg.get("chat", {}).get("id", "")), 

114 user_id=str(user.get("id", "")), 

115 content=cb.get("data", ""), 

116 message_type=MessageType.INTERACTIVE, 

117 raw=cb, 

118 reply_token=cb.get("id", ""), 

119 ) 

120 

121 def _parse_inline(self, inline: dict) -> Optional[ChannelMessage]: 

122 """Parse inline query.""" 

123 user = inline.get("from", {}) 

124 return ChannelMessage( 

125 channel_type=ChannelType.TELEGRAM, 

126 channel_id="inline", 

127 user_id=str(user.get("id", "")), 

128 content=inline.get("query", ""), 

129 message_type=MessageType.COMMAND, 

130 raw=inline, 

131 reply_token=inline.get("id", ""), 

132 ) 

133 

134 # ── Reply ── 

135 

136 async def reply(self, channel_id: str, content: str, **kwargs) -> ReplyResult: 

137 """Send message via sendMessage.""" 

138 return await self._api_call("sendMessage", { 

139 "chat_id": channel_id, 

140 "text": content, 

141 "parse_mode": self._parse_mode, 

142 "reply_to_message_id": kwargs.get("reply_token"), 

143 }) 

144 

145 async def reply_inline_keyboard( 

146 self, channel_id: str, text: str, 

147 buttons: list[list[dict]], **kwargs, 

148 ) -> ReplyResult: 

149 """Send message with inline keyboard buttons. 

150 

151 buttons = [[{"text": "Yes", "callback_data": "yes"}], ...] 

152 """ 

153 return await self._api_call("sendMessage", { 

154 "chat_id": channel_id, 

155 "text": text, 

156 "parse_mode": self._parse_mode, 

157 "reply_markup": json.dumps({"inline_keyboard": buttons}), 

158 }) 

159 

160 async def reply_photo( 

161 self, channel_id: str, photo_url: str, caption: str = "", **kwargs 

162 ) -> ReplyResult: 

163 """Send a photo.""" 

164 return await self._api_call("sendPhoto", { 

165 "chat_id": channel_id, 

166 "photo": photo_url, 

167 "caption": caption, 

168 }) 

169 

170 async def answer_callback(self, callback_query_id: str, text: str = "") -> ReplyResult: 

171 """Answer a callback query (dismiss loading spinner).""" 

172 return await self._api_call("answerCallbackQuery", { 

173 "callback_query_id": callback_query_id, 

174 "text": text, 

175 }) 

176 

177 # ── Internal ── 

178 

179 async def _api_call(self, method: str, params: dict) -> ReplyResult: 

180 """Call Telegram Bot API.""" 

181 url = f"{self._api}/{method}" 

182 

183 try: 

184 import aiohttp 

185 async with aiohttp.ClientSession() as session: 

186 async with session.post(url, json=params) as resp: 

187 data = await resp.json() 

188 if data.get("ok"): 

189 return ReplyResult( 

190 success=True, 

191 message_id=str(data.get("result", {}).get("message_id", "")), 

192 ) 

193 return ReplyResult( 

194 success=False, 

195 error=data.get("description", "unknown"), 

196 ) 

197 except ImportError: 

198 import urllib.request 

199 req = urllib.request.Request( 

200 url, data=json.dumps(params).encode(), 

201 headers={"Content-Type": "application/json"}, 

202 ) 

203 with urllib.request.urlopen(req) as resp: 

204 data = json.loads(resp.read()) 

205 return ReplyResult( 

206 success=data.get("ok", False), 

207 message_id=str(data.get("result", {}).get("message_id", "")), 

208 ) 

209 

210 # ── Webhook ── 

211 

212 async def set_webhook(self, url: str) -> bool: 

213 """Register webhook URL with Telegram.""" 

214 result = await self._api_call("setWebhook", {"url": url}) 

215 return result.success 

216 

217 async def delete_webhook(self) -> bool: 

218 """Remove webhook (switch to long-polling).""" 

219 result = await self._api_call("deleteWebhook", {}) 

220 return result.success