Coverage for agentos/channels/adapters/slack.py: 0%
84 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Slack Channel Adapter — Slack Events API (Bolt for Python).
4OAuth 2.0 flow → Bolt App → Socket Mode / HTTP Webhook → ChannelMessage.
5"""
7from __future__ import annotations
9import json
10import hmac
11import hashlib
12import time
13from typing import Optional, Callable, Any
15from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult
16from agentos.channels.message import ChannelMessage, ChannelType, MessageType
19class SlackAdapter(BaseChannelAdapter):
20 """Slack Events API adapter.
22 Config fields:
23 bot_token: Slack Bot User OAuth Token (xoxb-...)
24 signing_secret: Slack Signing Secret for request verification
25 app_token: Socket Mode app-level token (xapp-...) — optional
26 socket_mode: bool — use Socket Mode instead of HTTP webhooks
27 """
29 CHANNEL_TYPE = ChannelType.SLACK
31 def __init__(self, config: ChannelConfig):
32 super().__init__(config)
33 self._bot_token = config.extra.get("bot_token", "")
34 self._signing_secret = config.extra.get("signing_secret", "")
35 self._app_token = config.extra.get("app_token", "")
36 self._socket_mode = config.extra.get("socket_mode", False)
38 # ── Webhook verification ──
40 def verify_signature(self, body: bytes, headers: dict) -> bool:
41 """Verify Slack request signature (HMAC-SHA256)."""
42 timestamp = headers.get("x-slack-request-timestamp", "")
43 slack_sig = headers.get("x-slack-signature", "")
45 if abs(time.time() - int(timestamp)) > 300:
46 return False
48 sig_basestring = f"v0:{timestamp}:{body.decode()}"
49 computed = "v0=" + hmac.new(
50 self._signing_secret.encode(),
51 sig_basestring.encode(),
52 hashlib.sha256,
53 ).hexdigest()
55 return hmac.compare_digest(computed, slack_sig)
57 # ── Message parsing ──
59 async def parse_incoming(self, payload: dict) -> Optional[ChannelMessage]:
60 """Parse a Slack event payload into ChannelMessage."""
61 event_type = payload.get("type", "")
63 # URL verification challenge
64 if event_type == "url_verification":
65 return ChannelMessage(
66 channel_type=ChannelType.SLACK,
67 channel_id=self.config.channel_id,
68 user_id="system",
69 content=payload.get("challenge", ""),
70 message_type=MessageType.SYSTEM,
71 raw=payload,
72 reply_token=payload.get("challenge"),
73 )
75 # Event callback
76 if event_type == "event_callback":
77 event = payload.get("event", {})
78 return self._parse_event(event)
80 return None
82 def _parse_event(self, event: dict) -> Optional[ChannelMessage]:
83 """Parse a Slack event (message, app_mention, etc.)."""
84 event_type = event.get("type", "")
85 user = event.get("user", "")
86 channel = event.get("channel", "")
87 text = event.get("text", "")
88 ts = event.get("ts", "")
90 # Strip bot mention prefix
91 if event_type == "app_mention" and text:
92 text = self._strip_mention(text)
94 if not text:
95 return None
97 msg_type = MessageType.TEXT
98 return ChannelMessage(
99 channel_type=ChannelType.SLACK,
100 channel_id=channel,
101 user_id=user,
102 content=text,
103 message_type=msg_type,
104 raw=event,
105 reply_token=ts,
106 )
108 def _strip_mention(self, text: str) -> str:
109 """Remove <@BOT_ID> prefix from message text."""
110 import re
111 return re.sub(r"^<@U[A-Z0-9]+>\s*", "", text).strip()
113 # ── Reply ──
115 async def reply(self, channel_id: str, content: str, **kwargs) -> ReplyResult:
116 """Send a message to Slack channel via chat.postMessage."""
117 url = "https://slack.com/api/chat.postMessage"
118 headers = {
119 "Authorization": f"Bearer {self._bot_token}",
120 "Content-Type": "application/json",
121 }
122 body = {
123 "channel": channel_id,
124 "text": content,
125 }
127 thread_ts = kwargs.get("thread_ts") or kwargs.get("reply_token")
128 if thread_ts:
129 body["thread_ts"] = thread_ts
131 try:
132 import aiohttp
133 async with aiohttp.ClientSession() as session:
134 async with session.post(url, headers=headers, json=body) as resp:
135 data = await resp.json()
136 if data.get("ok"):
137 return ReplyResult(success=True, message_id=data.get("ts", ""))
138 return ReplyResult(success=False, error=data.get("error", "unknown"))
139 except ImportError:
140 import urllib.request
141 req = urllib.request.Request(
142 url, data=json.dumps(body).encode(), headers=headers
143 )
144 with urllib.request.urlopen(req) as resp:
145 data = json.loads(resp.read())
146 return ReplyResult(success=data.get("ok", False), message_id=data.get("ts", ""))
148 async def reply_blocks(
149 self, channel_id: str, blocks: list[dict], **kwargs
150 ) -> ReplyResult:
151 """Send Slack Block Kit message."""
152 url = "https://slack.com/api/chat.postMessage"
153 headers = {
154 "Authorization": f"Bearer {self._bot_token}",
155 "Content-Type": "application/json",
156 }
157 body = {"channel": channel_id, "blocks": blocks}
159 try:
160 import aiohttp
161 async with aiohttp.ClientSession() as session:
162 async with session.post(url, headers=headers, json=body) as resp:
163 data = await resp.json()
164 return ReplyResult(success=data.get("ok", False), message_id=data.get("ts", ""))
165 except ImportError:
166 import urllib.request
167 req = urllib.request.Request(
168 url, data=json.dumps(body).encode(), headers=headers
169 )
170 with urllib.request.urlopen(req) as resp:
171 data = json.loads(resp.read())
172 return ReplyResult(success=data.get("ok", False), message_id=data.get("ts", ""))