Coverage for agentos/tests/test_a2a.py: 0%
224 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""测试 A2A 协议 — Task, Message, Handoff, Client, Server。"""
3import time
4import pytest
5from agentos.protocols.a2a import (
6 TaskState, PartType, MessageRole,
7 TextPart, FilePart, DataPart, part_from_dict,
8 A2AArtifact,
9 A2AMessage,
10 A2ATask,
11 A2AHandoff,
12 A2ASession,
13 A2AClient,
14 A2AServer,
15 new_task, new_handoff,
16)
19class TestA2AParts:
20 def test_text_part_roundtrip(self):
21 tp = TextPart(text="hello", meta={"lang": "en"})
22 d = tp.to_dict()
23 assert d["type"] == "text"
24 tp2 = TextPart.from_dict(d)
25 assert tp2.text == "hello"
26 assert tp2.meta == {"lang": "en"}
28 def test_file_part_roundtrip(self):
29 fp = FilePart(url="https://ex.com/f.pdf", filename="report.pdf", mime_type="application/pdf", size=1024)
30 d = fp.to_dict()
31 fp2 = FilePart.from_dict(d)
32 assert fp2.filename == "report.pdf"
33 assert fp2.mime_type == "application/pdf"
35 def test_data_part_roundtrip(self):
36 dp = DataPart(data={"count": 42}, schema_uri="https://schema.org/result")
37 d = dp.to_dict()
38 dp2 = DataPart.from_dict(d)
39 assert dp2.data["count"] == 42
41 def test_part_from_dict_dispatcher(self):
42 d = {"type": "text", "text": "hi"}
43 p = part_from_dict(d)
44 assert isinstance(p, TextPart)
45 assert p.text == "hi"
47 d = {"type": "file", "filename": "x.txt"}
48 p = part_from_dict(d)
49 assert isinstance(p, FilePart)
51 d = {"type": "data", "data": {"a": 1}}
52 p = part_from_dict(d)
53 assert isinstance(p, DataPart)
56class TestA2AArtifact:
57 def test_roundtrip(self):
58 art = A2AArtifact(name="result.json", mime_type="application/json", blob=b'{"a":1}', size=8)
59 d = art.to_dict()
60 art2 = A2AArtifact.from_dict(d)
61 assert art2.name == "result.json"
62 assert art2.blob == b'{"a":1}'
64 def test_url_artifact(self):
65 art = A2AArtifact(name="image.png", url="https://cdn.ex/img.png")
66 d = art.to_dict()
67 assert "url" in d
68 art2 = A2AArtifact.from_dict(d)
69 assert art2.url == "https://cdn.ex/img.png"
72class TestA2AMessage:
73 def test_user_text(self):
74 msg = A2AMessage.user_text("hello world")
75 assert msg.role == MessageRole.USER
76 assert len(msg.parts) == 1
77 assert msg.parts[0].text == "hello world"
79 def test_agent_text(self):
80 msg = A2AMessage.agent_text("done")
81 assert msg.role == MessageRole.AGENT
82 assert msg.get_text() == "done"
84 def test_multipart_roundtrip(self):
85 msg = A2AMessage(
86 role=MessageRole.USER,
87 parts=[
88 TextPart(text="analyze this"),
89 FilePart(filename="data.csv"),
90 DataPart(data={"options": {"method": "pca"}}),
91 ],
92 )
93 d = msg.to_dict()
94 msg2 = A2AMessage.from_dict(d)
95 assert msg2.role == MessageRole.USER
96 assert len(msg2.parts) == 3
97 assert isinstance(msg2.parts[0], TextPart)
98 assert isinstance(msg2.parts[1], FilePart)
99 assert isinstance(msg2.parts[2], DataPart)
100 assert msg2.get_text() == "analyze this"
103class TestA2ATask:
104 def test_lifecycle(self):
105 task = A2ATask(input=A2AMessage.user_text("do something"))
106 assert task.state == TaskState.SUBMITTED
108 task.start_working()
109 assert task.state == TaskState.WORKING
111 task.complete(A2AMessage.agent_text("done"))
112 assert task.state == TaskState.COMPLETED
113 assert task.output.get_text() == "done"
114 assert task.is_terminal()
116 def test_fail(self):
117 task = A2ATask(input=A2AMessage.user_text("bad"))
118 task.start_working()
119 task.fail("something went wrong")
120 assert task.state == TaskState.FAILED
121 assert task.error == "something went wrong"
122 assert task.is_terminal()
124 def test_cancel(self):
125 task = A2ATask()
126 assert not task.is_terminal()
127 task.cancel()
128 assert task.state == TaskState.CANCELLED
129 assert task.is_terminal()
131 def test_cannot_start_non_submitted(self):
132 task = A2ATask()
133 task.start_working()
134 with pytest.raises(ValueError):
135 task.start_working()
137 def test_cannot_complete_non_working(self):
138 task = A2ATask()
139 with pytest.raises(ValueError):
140 task.complete()
142 def test_cannot_cancel_completed(self):
143 task = A2ATask()
144 task.start_working()
145 task.complete()
146 with pytest.raises(ValueError):
147 task.cancel()
149 def test_artifact_attachment(self):
150 task = A2ATask()
151 task.add_artifact(A2AArtifact(name="out.csv"))
152 task.add_artifact(A2AArtifact(name="out.png"))
153 assert len(task.artifacts) == 2
155 def test_json_roundtrip(self):
156 task = A2ATask(input=A2AMessage.user_text("hello"))
157 task.start_working()
158 task.complete(A2AMessage.agent_text("result"))
159 task.add_artifact(A2AArtifact(name="out.json", blob=b"{}"))
161 json_str = task.to_json()
162 task2 = A2ATask.from_json(json_str)
163 assert task2.task_id == task.task_id
164 assert task2.state == TaskState.COMPLETED
165 assert task2.input.get_text() == "hello"
166 assert task2.artifacts[0].name == "out.json"
168 def test_state_history(self):
169 task = A2ATask()
170 task.start_working()
171 task.complete()
172 assert len(task._state_history) == 2
173 assert task._state_history[0][0] == TaskState.SUBMITTED
174 assert task._state_history[1][0] == TaskState.WORKING
177class TestA2AHandoff:
178 def test_roundtrip(self):
179 task = A2ATask(input=A2AMessage.user_text("do x"))
180 ho = A2AHandoff(
181 source_agent="coordinator",
182 target_agent="worker",
183 task=task,
184 reason="delegation",
185 )
186 d = ho.to_dict()
187 ho2 = A2AHandoff.from_dict(d)
188 assert ho2.source_agent == "coordinator"
189 assert ho2.target_agent == "worker"
190 assert ho2.task.task_id == task.task_id
191 assert ho2.reason == "delegation"
193 def test_json_roundtrip(self):
194 task = A2ATask(input=A2AMessage.user_text("test"))
195 ho = A2AHandoff(source_agent="a", target_agent="b", task=task)
196 ho2 = A2AHandoff.from_json(ho.to_json())
197 assert ho2.source_agent == "a"
198 assert ho2.handoff_id == ho.handoff_id
201class TestA2ASession:
202 def test_basic(self):
203 sess = A2ASession()
204 sess.add_message(A2AMessage.user_text("hi"))
205 sess.add_message(A2AMessage.agent_text("hello"))
206 sess.add_task(A2ATask())
207 assert len(sess.history) == 2
208 assert len(sess.tasks) == 1
210 def test_get_last_n(self):
211 sess = A2ASession()
212 for i in range(5):
213 sess.add_message(A2AMessage.user_text(f"msg{i}"))
214 last3 = sess.get_last_n_messages(3)
215 assert len(last3) == 3
216 assert last3[-1].get_text() == "msg4"
219class TestA2Server:
220 @pytest.mark.asyncio
221 async def test_process_task_success(self):
222 server = A2AServer()
223 async def handler(task: A2ATask):
224 return A2AMessage.agent_text(f"processed: {task.input.get_text()}")
226 server.register_handler("worker", handler)
227 task = new_task("hello test", target_agent="worker")
228 result = await server.process_task(task.to_dict())
229 assert result["state"] == "completed"
230 assert "processed: hello test" in result["output"]["parts"][0]["text"]
232 @pytest.mark.asyncio
233 async def test_process_task_no_handler(self):
234 server = A2AServer()
235 task = new_task("hello", target_agent="nonexistent")
236 result = await server.process_task(task.to_dict())
237 assert result["state"] == "failed"
238 assert "No handler" in result["error"]
240 @pytest.mark.asyncio
241 async def test_process_task_handler_error(self):
242 server = A2AServer()
243 async def bad_handler(task):
244 raise ValueError("simulated error")
246 server.register_handler("bad", bad_handler)
247 task = new_task("test", target_agent="bad")
248 result = await server.process_task(task.to_dict())
249 assert result["state"] == "failed"
250 assert "simulated error" in result["error"]
252 def test_get_task(self):
253 server = A2AServer()
254 task = A2ATask(task_id="task-001")
255 server._tasks["task-001"] = task
256 assert server.get_task("task-001").task_id == "task-001"
257 assert server.get_task("nonexistent") is None
259 def test_list_tasks_by_state(self):
260 server = A2AServer()
261 t1 = A2ATask(task_id="t1")
262 t2 = A2ATask(task_id="t2"); t2.start_working(); t2.complete()
263 t3 = A2ATask(task_id="t3"); t3.fail("err")
264 server._tasks = {"t1": t1, "t2": t2, "t3": t3}
265 assert len(server.list_tasks()) == 3
266 assert len(server.list_tasks(TaskState.COMPLETED)) == 1
267 assert len(server.list_tasks(TaskState.FAILED)) == 1
269 def test_cleanup(self):
270 server = A2AServer()
271 old = A2ATask(task_id="old")
272 old.start_working(); old.complete()
273 old._updated = time.time() - 4000 # fake old
274 fresh = A2ATask(task_id="fresh")
275 server._tasks = {"old": old, "fresh": fresh}
276 n = server.cleanup_old(max_age_seconds=3600)
277 assert n == 1
278 assert "old" not in server._tasks
279 assert "fresh" in server._tasks
282class TestConvenience:
283 def test_new_task(self):
284 t = new_task("my task", target_agent="worker", priority="high")
285 assert t.input.get_text() == "my task"
286 assert t.meta["target_agent"] == "worker"
287 assert t.meta["priority"] == "high"
289 def test_new_handoff(self):
290 t = new_task("delegate me")
291 ho = new_handoff(t, source="a", target="b", reason="overload")
292 assert ho.source_agent == "a"
293 assert ho.target_agent == "b"
294 assert ho.reason == "overload"