Coverage for agentos/channels/adapters/slack.py: 0%

84 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Slack Channel Adapter — Slack Events API (Bolt for Python). 

3 

4OAuth 2.0 flow → Bolt App → Socket Mode / HTTP Webhook → ChannelMessage. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import hmac 

11import hashlib 

12import time 

13from typing import Optional, Callable, Any 

14 

15from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult 

16from agentos.channels.message import ChannelMessage, ChannelType, MessageType 

17 

18 

19class SlackAdapter(BaseChannelAdapter): 

20 """Slack Events API adapter. 

21 

22 Config fields: 

23 bot_token: Slack Bot User OAuth Token (xoxb-...) 

24 signing_secret: Slack Signing Secret for request verification 

25 app_token: Socket Mode app-level token (xapp-...) — optional 

26 socket_mode: bool — use Socket Mode instead of HTTP webhooks 

27 """ 

28 

29 CHANNEL_TYPE = ChannelType.SLACK 

30 

31 def __init__(self, config: ChannelConfig): 

32 super().__init__(config) 

33 self._bot_token = config.extra.get("bot_token", "") 

34 self._signing_secret = config.extra.get("signing_secret", "") 

35 self._app_token = config.extra.get("app_token", "") 

36 self._socket_mode = config.extra.get("socket_mode", False) 

37 

38 # ── Webhook verification ── 

39 

40 def verify_signature(self, body: bytes, headers: dict) -> bool: 

41 """Verify Slack request signature (HMAC-SHA256).""" 

42 timestamp = headers.get("x-slack-request-timestamp", "") 

43 slack_sig = headers.get("x-slack-signature", "") 

44 

45 if abs(time.time() - int(timestamp)) > 300: 

46 return False 

47 

48 sig_basestring = f"v0:{timestamp}:{body.decode()}" 

49 computed = "v0=" + hmac.new( 

50 self._signing_secret.encode(), 

51 sig_basestring.encode(), 

52 hashlib.sha256, 

53 ).hexdigest() 

54 

55 return hmac.compare_digest(computed, slack_sig) 

56 

57 # ── Message parsing ── 

58 

59 async def parse_incoming(self, payload: dict) -> Optional[ChannelMessage]: 

60 """Parse a Slack event payload into ChannelMessage.""" 

61 event_type = payload.get("type", "") 

62 

63 # URL verification challenge 

64 if event_type == "url_verification": 

65 return ChannelMessage( 

66 channel_type=ChannelType.SLACK, 

67 channel_id=self.config.channel_id, 

68 user_id="system", 

69 content=payload.get("challenge", ""), 

70 message_type=MessageType.SYSTEM, 

71 raw=payload, 

72 reply_token=payload.get("challenge"), 

73 ) 

74 

75 # Event callback 

76 if event_type == "event_callback": 

77 event = payload.get("event", {}) 

78 return self._parse_event(event) 

79 

80 return None 

81 

82 def _parse_event(self, event: dict) -> Optional[ChannelMessage]: 

83 """Parse a Slack event (message, app_mention, etc.).""" 

84 event_type = event.get("type", "") 

85 user = event.get("user", "") 

86 channel = event.get("channel", "") 

87 text = event.get("text", "") 

88 ts = event.get("ts", "") 

89 

90 # Strip bot mention prefix 

91 if event_type == "app_mention" and text: 

92 text = self._strip_mention(text) 

93 

94 if not text: 

95 return None 

96 

97 msg_type = MessageType.TEXT 

98 return ChannelMessage( 

99 channel_type=ChannelType.SLACK, 

100 channel_id=channel, 

101 user_id=user, 

102 content=text, 

103 message_type=msg_type, 

104 raw=event, 

105 reply_token=ts, 

106 ) 

107 

108 def _strip_mention(self, text: str) -> str: 

109 """Remove <@BOT_ID> prefix from message text.""" 

110 import re 

111 return re.sub(r"^<@U[A-Z0-9]+>\s*", "", text).strip() 

112 

113 # ── Reply ── 

114 

115 async def reply(self, channel_id: str, content: str, **kwargs) -> ReplyResult: 

116 """Send a message to Slack channel via chat.postMessage.""" 

117 url = "https://slack.com/api/chat.postMessage" 

118 headers = { 

119 "Authorization": f"Bearer {self._bot_token}", 

120 "Content-Type": "application/json", 

121 } 

122 body = { 

123 "channel": channel_id, 

124 "text": content, 

125 } 

126 

127 thread_ts = kwargs.get("thread_ts") or kwargs.get("reply_token") 

128 if thread_ts: 

129 body["thread_ts"] = thread_ts 

130 

131 try: 

132 import aiohttp 

133 async with aiohttp.ClientSession() as session: 

134 async with session.post(url, headers=headers, json=body) as resp: 

135 data = await resp.json() 

136 if data.get("ok"): 

137 return ReplyResult(success=True, message_id=data.get("ts", "")) 

138 return ReplyResult(success=False, error=data.get("error", "unknown")) 

139 except ImportError: 

140 import urllib.request 

141 req = urllib.request.Request( 

142 url, data=json.dumps(body).encode(), headers=headers 

143 ) 

144 with urllib.request.urlopen(req) as resp: 

145 data = json.loads(resp.read()) 

146 return ReplyResult(success=data.get("ok", False), message_id=data.get("ts", "")) 

147 

148 async def reply_blocks( 

149 self, channel_id: str, blocks: list[dict], **kwargs 

150 ) -> ReplyResult: 

151 """Send Slack Block Kit message.""" 

152 url = "https://slack.com/api/chat.postMessage" 

153 headers = { 

154 "Authorization": f"Bearer {self._bot_token}", 

155 "Content-Type": "application/json", 

156 } 

157 body = {"channel": channel_id, "blocks": blocks} 

158 

159 try: 

160 import aiohttp 

161 async with aiohttp.ClientSession() as session: 

162 async with session.post(url, headers=headers, json=body) as resp: 

163 data = await resp.json() 

164 return ReplyResult(success=data.get("ok", False), message_id=data.get("ts", "")) 

165 except ImportError: 

166 import urllib.request 

167 req = urllib.request.Request( 

168 url, data=json.dumps(body).encode(), headers=headers 

169 ) 

170 with urllib.request.urlopen(req) as resp: 

171 data = json.loads(resp.read()) 

172 return ReplyResult(success=data.get("ok", False), message_id=data.get("ts", ""))