Coverage for agentos/channels/adapters/line.py: 0%
132 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2LINE Channel Adapter — LINE Messaging API.
4LINE Developers Console → Channel Access Token + Channel Secret → webhook → ChannelMessage.
5"""
7from __future__ import annotations
9import json
10import base64
11import hashlib
12import hmac
13from typing import Optional
15from agentos.channels.base import BaseChannelAdapter, ChannelConfig, ReplyResult
16from agentos.channels.message import ChannelMessage, ChannelType, MessageType
19class LINEAdapter(BaseChannelAdapter):
20 """LINE Messaging API adapter.
22 Config fields:
23 channel_access_token: LINE channel access token (long-lived)
24 channel_secret: LINE channel secret (for signature verification)
25 reply_retry_limit: max reply attempts (default 1)
26 """
28 CHANNEL_TYPE = ChannelType.LINE
29 API_BASE = "https://api.line.me/v2"
30 API_DATA = "https://api-data.line.me/v2"
32 def __init__(self, config: ChannelConfig):
33 super().__init__(config)
34 self._access_token = config.extra.get("channel_access_token", "")
35 self._channel_secret = config.extra.get("channel_secret", "")
36 self._retry_limit = config.extra.get("reply_retry_limit", 1)
38 @property
39 def _headers(self) -> dict:
40 return {"Authorization": f"Bearer {self._access_token}"}
42 # ── Signature verification ──
44 def verify_signature(self, body: bytes, signature: str) -> bool:
45 """Verify LINE webhook signature (HMAC-SHA256 base64)."""
46 computed = base64.b64encode(
47 hmac.new(
48 self._channel_secret.encode(),
49 body,
50 hashlib.sha256,
51 ).digest()
52 ).decode()
53 return hmac.compare_digest(computed, signature)
55 # ── Message parsing ──
57 async def parse_incoming(self, payload: dict) -> Optional[ChannelMessage]:
58 """Parse LINE webhook events into ChannelMessage."""
59 events = payload.get("events", [])
60 if not events:
61 return None
63 event = events[0]
64 event_type = event.get("type", "")
66 if event_type == "message":
67 return self._parse_message(event)
68 elif event_type == "postback":
69 return self._parse_postback(event)
70 elif event_type == "follow":
71 return self._parse_follow(event)
72 elif event_type == "unfollow":
73 return ChannelMessage(
74 channel_type=ChannelType.LINE,
75 channel_id=event.get("source", {}).get("userId", ""),
76 user_id=event.get("source", {}).get("userId", ""),
77 content="unfollow",
78 message_type=MessageType.SYSTEM,
79 raw=event,
80 )
82 return None
84 def _parse_message(self, event: dict) -> Optional[ChannelMessage]:
85 """Parse a LINE message event."""
86 source = event.get("source", {})
87 user_id = source.get("userId", "")
88 group_id = source.get("groupId", "")
89 room_id = source.get("roomId", "")
90 channel_id = group_id or room_id or user_id
92 msg = event.get("message", {})
93 msg_type = msg.get("type", "text")
95 content = ""
96 mtype = MessageType.TEXT
97 metadata = {
98 "source_type": source.get("type", "user"),
99 "group_id": group_id,
100 "room_id": room_id,
101 "display_name": "", # Filled via profile API if needed
102 }
104 if msg_type == "text":
105 content = msg.get("text", "")
106 elif msg_type == "image":
107 content = "[Image]"
108 mtype = MessageType.IMAGE
109 metadata["message_id"] = msg.get("id", "")
110 elif msg_type == "video":
111 content = "[Video]"
112 mtype = MessageType.VIDEO
113 elif msg_type == "audio":
114 content = "[Voice message]"
115 mtype = MessageType.VOICE
116 elif msg_type == "file":
117 content = f"[File: {msg.get('fileName', 'unknown')}]"
118 mtype = MessageType.FILE
119 metadata["file_name"] = msg.get("fileName", "")
120 metadata["file_size"] = msg.get("fileSize", 0)
121 elif msg_type == "location":
122 content = f"[Location: {msg.get('title', '')} {msg.get('address', '')}]"
123 mtype = MessageType.LOCATION
124 elif msg_type == "sticker":
125 content = f"[Sticker: {msg.get('packageId')}/{msg.get('stickerId')}]"
126 else:
127 content = f"[{msg_type}]"
129 return ChannelMessage(
130 channel_type=ChannelType.LINE,
131 channel_id=channel_id,
132 user_id=user_id,
133 content=content,
134 message_type=mtype,
135 raw=event,
136 reply_token=event.get("replyToken", ""),
137 metadata=metadata,
138 )
140 def _parse_postback(self, event: dict) -> Optional[ChannelMessage]:
141 """Parse LINE postback event (rich menu, button tap)."""
142 source = event.get("source", {})
143 data = event.get("postback", {}).get("data", "")
144 params = event.get("postback", {}).get("params", {})
146 return ChannelMessage(
147 channel_type=ChannelType.LINE,
148 channel_id=source.get("userId", ""),
149 user_id=source.get("userId", ""),
150 content=data,
151 message_type=MessageType.INTERACTIVE,
152 raw=event,
153 reply_token=event.get("replyToken", ""),
154 metadata={"postback_params": params},
155 )
157 def _parse_follow(self, event: dict) -> ChannelMessage:
158 """Parse LINE follow event."""
159 source = event.get("source", {})
160 return ChannelMessage(
161 channel_type=ChannelType.LINE,
162 channel_id=source.get("userId", ""),
163 user_id=source.get("userId", ""),
164 content="follow",
165 message_type=MessageType.SYSTEM,
166 raw=event,
167 )
169 # ── Reply ──
171 async def reply(self, channel_id: str, content: str, **kwargs) -> ReplyResult:
172 """Send a reply text message."""
173 reply_token = kwargs.get("reply_token", "")
174 if not reply_token:
175 return ReplyResult(success=False, error="reply_token required")
177 return await self._api_reply(reply_token, [
178 {"type": "text", "text": content[:5000]},
179 ])
181 async def reply_flex(
182 self, channel_id: str, alt_text: str,
183 contents: dict, **kwargs,
184 ) -> ReplyResult:
185 """Send a LINE Flex Message (bubble/carousel)."""
186 reply_token = kwargs.get("reply_token", "")
187 if not reply_token:
188 return ReplyResult(success=False, error="reply_token required")
190 return await self._api_reply(reply_token, [
191 {"type": "flex", "altText": alt_text, "contents": contents},
192 ])
194 async def reply_quick_reply(
195 self, channel_id: str, text: str,
196 items: list[dict], **kwargs,
197 ) -> ReplyResult:
198 """Send text with quick reply buttons.
200 items = [{"type": "action", "action": {"type": "message", "label": "Yes", "text": "Yes"}}, ...]
201 """
202 reply_token = kwargs.get("reply_token", "")
203 if not reply_token:
204 return ReplyResult(success=False, error="reply_token required")
206 return await self._api_reply(reply_token, [
207 {
208 "type": "text",
209 "text": text[:5000],
210 "quickReply": {"items": items[:13]},
211 },
212 ])
214 async def push_message(
215 self, user_id: str, messages: list[dict],
216 ) -> ReplyResult:
217 """Push a message to a user (outside reply window)."""
218 return await self._api_push(user_id, messages)
220 async def multicast(
221 self, user_ids: list[str], messages: list[dict],
222 ) -> ReplyResult:
223 """Send the same message to up to 500 users."""
224 return await self._api_call(
225 f"{self.API_BASE}/bot/message/multicast",
226 {"to": user_ids[:500], "messages": messages},
227 method="POST",
228 )
230 # ── Profile ──
232 async def get_profile(self, user_id: str) -> Optional[dict]:
233 """Get LINE user profile."""
234 result = await self._api_call(
235 f"{self.API_BASE}/bot/profile/{user_id}",
236 method="GET",
237 )
238 if result.success:
239 return result.raw
240 return None
242 # ── Rich Menu ──
244 async def set_default_rich_menu(self, rich_menu_id: str) -> bool:
245 """Set the default rich menu for all users."""
246 result = await self._api_call(
247 f"{self.API_BASE}/bot/user/all/richmenu/{rich_menu_id}",
248 method="POST",
249 )
250 return result.success
252 # ── Internal API ──
254 async def _api_reply(self, reply_token: str, messages: list) -> ReplyResult:
255 """Send a reply via reply API."""
256 return await self._api_call(
257 f"{self.API_BASE}/bot/message/reply",
258 {"replyToken": reply_token, "messages": messages},
259 method="POST",
260 )
262 async def _api_push(self, user_id: str, messages: list) -> ReplyResult:
263 """Send a push message."""
264 return await self._api_call(
265 f"{self.API_BASE}/bot/message/push",
266 {"to": user_id, "messages": messages},
267 method="POST",
268 )
270 async def _api_call(
271 self, url: str, body: dict = None, method: str = "POST",
272 ) -> ReplyResult:
273 """Generic LINE API call."""
274 headers = {
275 "Authorization": f"Bearer {self._access_token}",
276 "Content-Type": "application/json",
277 }
279 try:
280 import aiohttp
281 async with aiohttp.ClientSession() as session:
282 if method == "GET":
283 async with session.get(url, headers=headers) as resp:
284 data = await resp.json()
285 return ReplyResult(success=True, raw=data)
286 else:
287 async with session.post(url, headers=headers, json=body) as resp:
288 data = await resp.json()
289 if resp.status == 200:
290 return ReplyResult(success=True, message_id="ok", raw=data)
291 return ReplyResult(
292 success=False,
293 error=data.get("message", "unknown"),
294 )
295 except ImportError:
296 import urllib.request
297 req = urllib.request.Request(
298 url,
299 data=json.dumps(body).encode() if body else None,
300 headers=headers,
301 )
302 with urllib.request.urlopen(req) as resp:
303 data = json.loads(resp.read())
304 return ReplyResult(success=True, message_id="ok", raw=data)