Coverage for agentos/channels/adapters/line.py: 0%

132 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2LINE Channel Adapter — LINE Messaging API. 

3 

4LINE Developers Console → Channel Access Token + Channel Secret → webhook → ChannelMessage. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import base64 

11import hashlib 

12import hmac 

13from typing import Optional 

14 

15from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult 

16from agentos.channels.message import ChannelMessage, ChannelType, MessageType 

17 

18 

19class LINEAdapter(BaseChannelAdapter): 

20 """LINE Messaging API adapter. 

21 

22 Config fields: 

23 channel_access_token: LINE channel access token (long-lived) 

24 channel_secret: LINE channel secret (for signature verification) 

25 reply_retry_limit: max reply attempts (default 1) 

26 """ 

27 

28 CHANNEL_TYPE = ChannelType.LINE 

29 API_BASE = "https://api.line.me/v2" 

30 API_DATA = "https://api-data.line.me/v2" 

31 

32 def __init__(self, config: ChannelConfig): 

33 super().__init__(config) 

34 self._access_token = config.extra.get("channel_access_token", "") 

35 self._channel_secret = config.extra.get("channel_secret", "") 

36 self._retry_limit = config.extra.get("reply_retry_limit", 1) 

37 

38 @property 

39 def _headers(self) -> dict: 

40 return {"Authorization": f"Bearer {self._access_token}"} 

41 

42 # ── Signature verification ── 

43 

44 def verify_signature(self, body: bytes, signature: str) -> bool: 

45 """Verify LINE webhook signature (HMAC-SHA256 base64).""" 

46 computed = base64.b64encode( 

47 hmac.new( 

48 self._channel_secret.encode(), 

49 body, 

50 hashlib.sha256, 

51 ).digest() 

52 ).decode() 

53 return hmac.compare_digest(computed, signature) 

54 

55 # ── Message parsing ── 

56 

57 async def parse_incoming(self, payload: dict) -> Optional[ChannelMessage]: 

58 """Parse LINE webhook events into ChannelMessage.""" 

59 events = payload.get("events", []) 

60 if not events: 

61 return None 

62 

63 event = events[0] 

64 event_type = event.get("type", "") 

65 

66 if event_type == "message": 

67 return self._parse_message(event) 

68 elif event_type == "postback": 

69 return self._parse_postback(event) 

70 elif event_type == "follow": 

71 return self._parse_follow(event) 

72 elif event_type == "unfollow": 

73 return ChannelMessage( 

74 channel_type=ChannelType.LINE, 

75 channel_id=event.get("source", {}).get("userId", ""), 

76 user_id=event.get("source", {}).get("userId", ""), 

77 content="unfollow", 

78 message_type=MessageType.SYSTEM, 

79 raw=event, 

80 ) 

81 

82 return None 

83 

84 def _parse_message(self, event: dict) -> Optional[ChannelMessage]: 

85 """Parse a LINE message event.""" 

86 source = event.get("source", {}) 

87 user_id = source.get("userId", "") 

88 group_id = source.get("groupId", "") 

89 room_id = source.get("roomId", "") 

90 channel_id = group_id or room_id or user_id 

91 

92 msg = event.get("message", {}) 

93 msg_type = msg.get("type", "text") 

94 

95 content = "" 

96 mtype = MessageType.TEXT 

97 metadata = { 

98 "source_type": source.get("type", "user"), 

99 "group_id": group_id, 

100 "room_id": room_id, 

101 "display_name": "", # Filled via profile API if needed 

102 } 

103 

104 if msg_type == "text": 

105 content = msg.get("text", "") 

106 elif msg_type == "image": 

107 content = "[Image]" 

108 mtype = MessageType.IMAGE 

109 metadata["message_id"] = msg.get("id", "") 

110 elif msg_type == "video": 

111 content = "[Video]" 

112 mtype = MessageType.VIDEO 

113 elif msg_type == "audio": 

114 content = "[Voice message]" 

115 mtype = MessageType.VOICE 

116 elif msg_type == "file": 

117 content = f"[File: {msg.get('fileName', 'unknown')}]" 

118 mtype = MessageType.FILE 

119 metadata["file_name"] = msg.get("fileName", "") 

120 metadata["file_size"] = msg.get("fileSize", 0) 

121 elif msg_type == "location": 

122 content = f"[Location: {msg.get('title', '')} {msg.get('address', '')}]" 

123 mtype = MessageType.LOCATION 

124 elif msg_type == "sticker": 

125 content = f"[Sticker: {msg.get('packageId')}/{msg.get('stickerId')}]" 

126 else: 

127 content = f"[{msg_type}]" 

128 

129 return ChannelMessage( 

130 channel_type=ChannelType.LINE, 

131 channel_id=channel_id, 

132 user_id=user_id, 

133 content=content, 

134 message_type=mtype, 

135 raw=event, 

136 reply_token=event.get("replyToken", ""), 

137 metadata=metadata, 

138 ) 

139 

140 def _parse_postback(self, event: dict) -> Optional[ChannelMessage]: 

141 """Parse LINE postback event (rich menu, button tap).""" 

142 source = event.get("source", {}) 

143 data = event.get("postback", {}).get("data", "") 

144 params = event.get("postback", {}).get("params", {}) 

145 

146 return ChannelMessage( 

147 channel_type=ChannelType.LINE, 

148 channel_id=source.get("userId", ""), 

149 user_id=source.get("userId", ""), 

150 content=data, 

151 message_type=MessageType.INTERACTIVE, 

152 raw=event, 

153 reply_token=event.get("replyToken", ""), 

154 metadata={"postback_params": params}, 

155 ) 

156 

157 def _parse_follow(self, event: dict) -> ChannelMessage: 

158 """Parse LINE follow event.""" 

159 source = event.get("source", {}) 

160 return ChannelMessage( 

161 channel_type=ChannelType.LINE, 

162 channel_id=source.get("userId", ""), 

163 user_id=source.get("userId", ""), 

164 content="follow", 

165 message_type=MessageType.SYSTEM, 

166 raw=event, 

167 ) 

168 

169 # ── Reply ── 

170 

171 async def reply(self, channel_id: str, content: str, **kwargs) -> ReplyResult: 

172 """Send a reply text message.""" 

173 reply_token = kwargs.get("reply_token", "") 

174 if not reply_token: 

175 return ReplyResult(success=False, error="reply_token required") 

176 

177 return await self._api_reply(reply_token, [ 

178 {"type": "text", "text": content[:5000]}, 

179 ]) 

180 

181 async def reply_flex( 

182 self, channel_id: str, alt_text: str, 

183 contents: dict, **kwargs, 

184 ) -> ReplyResult: 

185 """Send a LINE Flex Message (bubble/carousel).""" 

186 reply_token = kwargs.get("reply_token", "") 

187 if not reply_token: 

188 return ReplyResult(success=False, error="reply_token required") 

189 

190 return await self._api_reply(reply_token, [ 

191 {"type": "flex", "altText": alt_text, "contents": contents}, 

192 ]) 

193 

194 async def reply_quick_reply( 

195 self, channel_id: str, text: str, 

196 items: list[dict], **kwargs, 

197 ) -> ReplyResult: 

198 """Send text with quick reply buttons. 

199 

200 items = [{"type": "action", "action": {"type": "message", "label": "Yes", "text": "Yes"}}, ...] 

201 """ 

202 reply_token = kwargs.get("reply_token", "") 

203 if not reply_token: 

204 return ReplyResult(success=False, error="reply_token required") 

205 

206 return await self._api_reply(reply_token, [ 

207 { 

208 "type": "text", 

209 "text": text[:5000], 

210 "quickReply": {"items": items[:13]}, 

211 }, 

212 ]) 

213 

214 async def push_message( 

215 self, user_id: str, messages: list[dict], 

216 ) -> ReplyResult: 

217 """Push a message to a user (outside reply window).""" 

218 return await self._api_push(user_id, messages) 

219 

220 async def multicast( 

221 self, user_ids: list[str], messages: list[dict], 

222 ) -> ReplyResult: 

223 """Send the same message to up to 500 users.""" 

224 return await self._api_call( 

225 f"{self.API_BASE}/bot/message/multicast", 

226 {"to": user_ids[:500], "messages": messages}, 

227 method="POST", 

228 ) 

229 

230 # ── Profile ── 

231 

232 async def get_profile(self, user_id: str) -> Optional[dict]: 

233 """Get LINE user profile.""" 

234 result = await self._api_call( 

235 f"{self.API_BASE}/bot/profile/{user_id}", 

236 method="GET", 

237 ) 

238 if result.success: 

239 return result.raw 

240 return None 

241 

242 # ── Rich Menu ── 

243 

244 async def set_default_rich_menu(self, rich_menu_id: str) -> bool: 

245 """Set the default rich menu for all users.""" 

246 result = await self._api_call( 

247 f"{self.API_BASE}/bot/user/all/richmenu/{rich_menu_id}", 

248 method="POST", 

249 ) 

250 return result.success 

251 

252 # ── Internal API ── 

253 

254 async def _api_reply(self, reply_token: str, messages: list) -> ReplyResult: 

255 """Send a reply via reply API.""" 

256 return await self._api_call( 

257 f"{self.API_BASE}/bot/message/reply", 

258 {"replyToken": reply_token, "messages": messages}, 

259 method="POST", 

260 ) 

261 

262 async def _api_push(self, user_id: str, messages: list) -> ReplyResult: 

263 """Send a push message.""" 

264 return await self._api_call( 

265 f"{self.API_BASE}/bot/message/push", 

266 {"to": user_id, "messages": messages}, 

267 method="POST", 

268 ) 

269 

270 async def _api_call( 

271 self, url: str, body: dict = None, method: str = "POST", 

272 ) -> ReplyResult: 

273 """Generic LINE API call.""" 

274 headers = { 

275 "Authorization": f"Bearer {self._access_token}", 

276 "Content-Type": "application/json", 

277 } 

278 

279 try: 

280 import aiohttp 

281 async with aiohttp.ClientSession() as session: 

282 if method == "GET": 

283 async with session.get(url, headers=headers) as resp: 

284 data = await resp.json() 

285 return ReplyResult(success=True, raw=data) 

286 else: 

287 async with session.post(url, headers=headers, json=body) as resp: 

288 data = await resp.json() 

289 if resp.status == 200: 

290 return ReplyResult(success=True, message_id="ok", raw=data) 

291 return ReplyResult( 

292 success=False, 

293 error=data.get("message", "unknown"), 

294 ) 

295 except ImportError: 

296 import urllib.request 

297 req = urllib.request.Request( 

298 url, 

299 data=json.dumps(body).encode() if body else None, 

300 headers=headers, 

301 ) 

302 with urllib.request.urlopen(req) as resp: 

303 data = json.loads(resp.read()) 

304 return ReplyResult(success=True, message_id="ok", raw=data)