Coverage for agentos/channels/adapters/discord.py: 0%

76 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Discord Channel Adapter — Discord.py Gateway. 

3 

4Bot Token + Intents → Gateway connection → on_message → ChannelMessage. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10from typing import Optional 

11 

12from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult 

13from agentos.channels.message import ChannelMessage, ChannelType, MessageType 

14 

15 

16class DiscordAdapter(BaseChannelAdapter): 

17 """Discord Bot adapter (Gateway Intents). 

18 

19 Config fields: 

20 bot_token: Discord bot token 

21 guild_ids: list of guild/servers to monitor (empty = all) 

22 command_prefix: bot command prefix (default "!") 

23 dm_enabled: allow DM messages (default True) 

24 """ 

25 

26 CHANNEL_TYPE = ChannelType.DISCORD 

27 

28 def __init__(self, config: ChannelConfig): 

29 super().__init__(config) 

30 self._bot_token = config.extra.get("bot_token", "") 

31 self._guild_ids = config.extra.get("guild_ids", []) 

32 self._command_prefix = config.extra.get("command_prefix", "!") 

33 self._dm_enabled = config.extra.get("dm_enabled", True) 

34 

35 # ── Message parsing ── 

36 

37 async def parse_incoming(self, payload: dict) -> Optional[ChannelMessage]: 

38 """Parse Discord gateway message event into ChannelMessage.""" 

39 event_type = payload.get("t", "") # Gateway event type 

40 data = payload.get("d", {}) 

41 

42 if event_type == "MESSAGE_CREATE": 

43 return self._parse_message(data) 

44 elif event_type == "INTERACTION_CREATE": 

45 return self._parse_interaction(data) 

46 elif event_type == "READY": 

47 return ChannelMessage( 

48 channel_type=ChannelType.DISCORD, 

49 channel_id="system", 

50 user_id="system", 

51 content=f"Bot ready (guilds: {len(data.get('guilds', []))})", 

52 message_type=MessageType.SYSTEM, 

53 raw=payload, 

54 ) 

55 

56 return None 

57 

58 def _parse_message(self, data: dict) -> Optional[ChannelMessage]: 

59 """Parse a Discord Message Create event.""" 

60 author = data.get("author", {}) 

61 if author.get("bot", False): 

62 return None # Ignore other bots 

63 

64 content = data.get("content", "") 

65 if not content.strip(): 

66 return None 

67 

68 user_id = author.get("id", "") 

69 channel_id = data.get("channel_id", "") 

70 guild_id = data.get("guild_id", "") 

71 

72 # DM check 

73 if not guild_id and not self._dm_enabled: 

74 return None 

75 

76 # Guild filter 

77 if guild_id and self._guild_ids and guild_id not in self._guild_ids: 

78 return None 

79 

80 # Strip command prefix 

81 stripped = content 

82 if content.startswith(self._command_prefix): 

83 stripped = content[len(self._command_prefix):] 

84 msg_type = MessageType.COMMAND 

85 else: 

86 msg_type = MessageType.TEXT 

87 

88 return ChannelMessage( 

89 channel_type=ChannelType.DISCORD, 

90 channel_id=channel_id, 

91 user_id=user_id, 

92 content=stripped.strip(), 

93 message_type=msg_type, 

94 raw=data, 

95 reply_token=data.get("id", ""), 

96 metadata={ 

97 "guild_id": guild_id, 

98 "username": author.get("username", ""), 

99 "display_name": data.get("member", {}).get("nick", author.get("username", "")), 

100 "attachments": [a.get("url") for a in data.get("attachments", [])], 

101 }, 

102 ) 

103 

104 def _parse_interaction(self, data: dict) -> Optional[ChannelMessage]: 

105 """Parse Discord slash command interaction.""" 

106 interaction_data = data.get("data", {}) 

107 command_name = interaction_data.get("name", "") 

108 

109 user = data.get("user", {}) or data.get("member", {}).get("user", {}) 

110 user_id = user.get("id", "") 

111 channel_id = data.get("channel_id", "") 

112 

113 return ChannelMessage( 

114 channel_type=ChannelType.DISCORD, 

115 channel_id=channel_id, 

116 user_id=user_id, 

117 content=f"/{command_name} " + " ".join( 

118 f"{o.get('name')}:{o.get('value')}" 

119 for o in interaction_data.get("options", []) 

120 ), 

121 message_type=MessageType.COMMAND, 

122 raw=data, 

123 reply_token=data.get("token", ""), 

124 ) 

125 

126 # ── Reply ── 

127 

128 async def reply(self, channel_id: str, content: str, **kwargs) -> ReplyResult: 

129 """Send message to Discord channel via REST API.""" 

130 url = f"https://discord.com/api/v10/channels/{channel_id}/messages" 

131 headers = { 

132 "Authorization": f"Bot {self._bot_token}", 

133 "Content-Type": "application/json", 

134 } 

135 body = {"content": content[:2000]} 

136 

137 if kwargs.get("embed"): 

138 body["embeds"] = [kwargs["embed"]] 

139 

140 # Interaction follow-up 

141 interaction_token = kwargs.get("interaction_token") or kwargs.get("reply_token") 

142 if interaction_token: 

143 url = f"https://discord.com/api/v10/webhooks/{self._bot_token}/{interaction_token}" 

144 

145 try: 

146 import aiohttp 

147 async with aiohttp.ClientSession() as session: 

148 async with session.post(url, headers=headers, json=body) as resp: 

149 data = await resp.json() 

150 return ReplyResult(success=True, message_id=data.get("id", "")) 

151 except ImportError: 

152 import urllib.request 

153 req = urllib.request.Request( 

154 url, data=json.dumps(body).encode(), headers=headers 

155 ) 

156 with urllib.request.urlopen(req) as resp: 

157 data = json.loads(resp.read()) 

158 return ReplyResult(success=True, message_id=data.get("id", "")) 

159 

160 async def reply_embed( 

161 self, channel_id: str, title: str, description: str, 

162 color: int = 0x5865F2, fields: list = None, **kwargs, 

163 ) -> ReplyResult: 

164 """Send a Discord embed message.""" 

165 embed = { 

166 "title": title, 

167 "description": description, 

168 "color": color, 

169 } 

170 if fields: 

171 embed["fields"] = fields 

172 return await self.reply(channel_id, "", embed=embed, **kwargs)