Coverage for agentos/tests/test_a2a.py: 0%

224 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1"""测试 A2A 协议 — Task, Message, Handoff, Client, Server。""" 

2 

3import time 

4import pytest 

5from agentos.protocols.a2a import ( 

6 TaskState, PartType, MessageRole, 

7 TextPart, FilePart, DataPart, part_from_dict, 

8 A2AArtifact, 

9 A2AMessage, 

10 A2ATask, 

11 A2AHandoff, 

12 A2ASession, 

13 A2AClient, 

14 A2AServer, 

15 new_task, new_handoff, 

16) 

17 

18 

19class TestA2AParts: 

20 def test_text_part_roundtrip(self): 

21 tp = TextPart(text="hello", meta={"lang": "en"}) 

22 d = tp.to_dict() 

23 assert d["type"] == "text" 

24 tp2 = TextPart.from_dict(d) 

25 assert tp2.text == "hello" 

26 assert tp2.meta == {"lang": "en"} 

27 

28 def test_file_part_roundtrip(self): 

29 fp = FilePart(url="https://ex.com/f.pdf", filename="report.pdf", mime_type="application/pdf", size=1024) 

30 d = fp.to_dict() 

31 fp2 = FilePart.from_dict(d) 

32 assert fp2.filename == "report.pdf" 

33 assert fp2.mime_type == "application/pdf" 

34 

35 def test_data_part_roundtrip(self): 

36 dp = DataPart(data={"count": 42}, schema_uri="https://schema.org/result") 

37 d = dp.to_dict() 

38 dp2 = DataPart.from_dict(d) 

39 assert dp2.data["count"] == 42 

40 

41 def test_part_from_dict_dispatcher(self): 

42 d = {"type": "text", "text": "hi"} 

43 p = part_from_dict(d) 

44 assert isinstance(p, TextPart) 

45 assert p.text == "hi" 

46 

47 d = {"type": "file", "filename": "x.txt"} 

48 p = part_from_dict(d) 

49 assert isinstance(p, FilePart) 

50 

51 d = {"type": "data", "data": {"a": 1}} 

52 p = part_from_dict(d) 

53 assert isinstance(p, DataPart) 

54 

55 

56class TestA2AArtifact: 

57 def test_roundtrip(self): 

58 art = A2AArtifact(name="result.json", mime_type="application/json", blob=b'{"a":1}', size=8) 

59 d = art.to_dict() 

60 art2 = A2AArtifact.from_dict(d) 

61 assert art2.name == "result.json" 

62 assert art2.blob == b'{"a":1}' 

63 

64 def test_url_artifact(self): 

65 art = A2AArtifact(name="image.png", url="https://cdn.ex/img.png") 

66 d = art.to_dict() 

67 assert "url" in d 

68 art2 = A2AArtifact.from_dict(d) 

69 assert art2.url == "https://cdn.ex/img.png" 

70 

71 

72class TestA2AMessage: 

73 def test_user_text(self): 

74 msg = A2AMessage.user_text("hello world") 

75 assert msg.role == MessageRole.USER 

76 assert len(msg.parts) == 1 

77 assert msg.parts[0].text == "hello world" 

78 

79 def test_agent_text(self): 

80 msg = A2AMessage.agent_text("done") 

81 assert msg.role == MessageRole.AGENT 

82 assert msg.get_text() == "done" 

83 

84 def test_multipart_roundtrip(self): 

85 msg = A2AMessage( 

86 role=MessageRole.USER, 

87 parts=[ 

88 TextPart(text="analyze this"), 

89 FilePart(filename="data.csv"), 

90 DataPart(data={"options": {"method": "pca"}}), 

91 ], 

92 ) 

93 d = msg.to_dict() 

94 msg2 = A2AMessage.from_dict(d) 

95 assert msg2.role == MessageRole.USER 

96 assert len(msg2.parts) == 3 

97 assert isinstance(msg2.parts[0], TextPart) 

98 assert isinstance(msg2.parts[1], FilePart) 

99 assert isinstance(msg2.parts[2], DataPart) 

100 assert msg2.get_text() == "analyze this" 

101 

102 

103class TestA2ATask: 

104 def test_lifecycle(self): 

105 task = A2ATask(input=A2AMessage.user_text("do something")) 

106 assert task.state == TaskState.SUBMITTED 

107 

108 task.start_working() 

109 assert task.state == TaskState.WORKING 

110 

111 task.complete(A2AMessage.agent_text("done")) 

112 assert task.state == TaskState.COMPLETED 

113 assert task.output.get_text() == "done" 

114 assert task.is_terminal() 

115 

116 def test_fail(self): 

117 task = A2ATask(input=A2AMessage.user_text("bad")) 

118 task.start_working() 

119 task.fail("something went wrong") 

120 assert task.state == TaskState.FAILED 

121 assert task.error == "something went wrong" 

122 assert task.is_terminal() 

123 

124 def test_cancel(self): 

125 task = A2ATask() 

126 assert not task.is_terminal() 

127 task.cancel() 

128 assert task.state == TaskState.CANCELLED 

129 assert task.is_terminal() 

130 

131 def test_cannot_start_non_submitted(self): 

132 task = A2ATask() 

133 task.start_working() 

134 with pytest.raises(ValueError): 

135 task.start_working() 

136 

137 def test_cannot_complete_non_working(self): 

138 task = A2ATask() 

139 with pytest.raises(ValueError): 

140 task.complete() 

141 

142 def test_cannot_cancel_completed(self): 

143 task = A2ATask() 

144 task.start_working() 

145 task.complete() 

146 with pytest.raises(ValueError): 

147 task.cancel() 

148 

149 def test_artifact_attachment(self): 

150 task = A2ATask() 

151 task.add_artifact(A2AArtifact(name="out.csv")) 

152 task.add_artifact(A2AArtifact(name="out.png")) 

153 assert len(task.artifacts) == 2 

154 

155 def test_json_roundtrip(self): 

156 task = A2ATask(input=A2AMessage.user_text("hello")) 

157 task.start_working() 

158 task.complete(A2AMessage.agent_text("result")) 

159 task.add_artifact(A2AArtifact(name="out.json", blob=b"{}")) 

160 

161 json_str = task.to_json() 

162 task2 = A2ATask.from_json(json_str) 

163 assert task2.task_id == task.task_id 

164 assert task2.state == TaskState.COMPLETED 

165 assert task2.input.get_text() == "hello" 

166 assert task2.artifacts[0].name == "out.json" 

167 

168 def test_state_history(self): 

169 task = A2ATask() 

170 task.start_working() 

171 task.complete() 

172 assert len(task._state_history) == 2 

173 assert task._state_history[0][0] == TaskState.SUBMITTED 

174 assert task._state_history[1][0] == TaskState.WORKING 

175 

176 

177class TestA2AHandoff: 

178 def test_roundtrip(self): 

179 task = A2ATask(input=A2AMessage.user_text("do x")) 

180 ho = A2AHandoff( 

181 source_agent="coordinator", 

182 target_agent="worker", 

183 task=task, 

184 reason="delegation", 

185 ) 

186 d = ho.to_dict() 

187 ho2 = A2AHandoff.from_dict(d) 

188 assert ho2.source_agent == "coordinator" 

189 assert ho2.target_agent == "worker" 

190 assert ho2.task.task_id == task.task_id 

191 assert ho2.reason == "delegation" 

192 

193 def test_json_roundtrip(self): 

194 task = A2ATask(input=A2AMessage.user_text("test")) 

195 ho = A2AHandoff(source_agent="a", target_agent="b", task=task) 

196 ho2 = A2AHandoff.from_json(ho.to_json()) 

197 assert ho2.source_agent == "a" 

198 assert ho2.handoff_id == ho.handoff_id 

199 

200 

201class TestA2ASession: 

202 def test_basic(self): 

203 sess = A2ASession() 

204 sess.add_message(A2AMessage.user_text("hi")) 

205 sess.add_message(A2AMessage.agent_text("hello")) 

206 sess.add_task(A2ATask()) 

207 assert len(sess.history) == 2 

208 assert len(sess.tasks) == 1 

209 

210 def test_get_last_n(self): 

211 sess = A2ASession() 

212 for i in range(5): 

213 sess.add_message(A2AMessage.user_text(f"msg{i}")) 

214 last3 = sess.get_last_n_messages(3) 

215 assert len(last3) == 3 

216 assert last3[-1].get_text() == "msg4" 

217 

218 

219class TestA2Server: 

220 @pytest.mark.asyncio 

221 async def test_process_task_success(self): 

222 server = A2AServer() 

223 async def handler(task: A2ATask): 

224 return A2AMessage.agent_text(f"processed: {task.input.get_text()}") 

225 

226 server.register_handler("worker", handler) 

227 task = new_task("hello test", target_agent="worker") 

228 result = await server.process_task(task.to_dict()) 

229 assert result["state"] == "completed" 

230 assert "processed: hello test" in result["output"]["parts"][0]["text"] 

231 

232 @pytest.mark.asyncio 

233 async def test_process_task_no_handler(self): 

234 server = A2AServer() 

235 task = new_task("hello", target_agent="nonexistent") 

236 result = await server.process_task(task.to_dict()) 

237 assert result["state"] == "failed" 

238 assert "No handler" in result["error"] 

239 

240 @pytest.mark.asyncio 

241 async def test_process_task_handler_error(self): 

242 server = A2AServer() 

243 async def bad_handler(task): 

244 raise ValueError("simulated error") 

245 

246 server.register_handler("bad", bad_handler) 

247 task = new_task("test", target_agent="bad") 

248 result = await server.process_task(task.to_dict()) 

249 assert result["state"] == "failed" 

250 assert "simulated error" in result["error"] 

251 

252 def test_get_task(self): 

253 server = A2AServer() 

254 task = A2ATask(task_id="task-001") 

255 server._tasks["task-001"] = task 

256 assert server.get_task("task-001").task_id == "task-001" 

257 assert server.get_task("nonexistent") is None 

258 

259 def test_list_tasks_by_state(self): 

260 server = A2AServer() 

261 t1 = A2ATask(task_id="t1") 

262 t2 = A2ATask(task_id="t2"); t2.start_working(); t2.complete() 

263 t3 = A2ATask(task_id="t3"); t3.fail("err") 

264 server._tasks = {"t1": t1, "t2": t2, "t3": t3} 

265 assert len(server.list_tasks()) == 3 

266 assert len(server.list_tasks(TaskState.COMPLETED)) == 1 

267 assert len(server.list_tasks(TaskState.FAILED)) == 1 

268 

269 def test_cleanup(self): 

270 server = A2AServer() 

271 old = A2ATask(task_id="old") 

272 old.start_working(); old.complete() 

273 old._updated = time.time() - 4000 # fake old 

274 fresh = A2ATask(task_id="fresh") 

275 server._tasks = {"old": old, "fresh": fresh} 

276 n = server.cleanup_old(max_age_seconds=3600) 

277 assert n == 1 

278 assert "old" not in server._tasks 

279 assert "fresh" in server._tasks 

280 

281 

282class TestConvenience: 

283 def test_new_task(self): 

284 t = new_task("my task", target_agent="worker", priority="high") 

285 assert t.input.get_text() == "my task" 

286 assert t.meta["target_agent"] == "worker" 

287 assert t.meta["priority"] == "high" 

288 

289 def test_new_handoff(self): 

290 t = new_task("delegate me") 

291 ho = new_handoff(t, source="a", target="b", reason="overload") 

292 assert ho.source_agent == "a" 

293 assert ho.target_agent == "b" 

294 assert ho.reason == "overload"