Coverage for agentos/orchestration/a2a_router.py: 67%

55 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2A2A协议路由 — 跨框架Agent互操作。 

3基因来源: Google ADK A2A Protocol 

4""" 

5 

6from __future__ import annotations 

7 

8from dataclasses import dataclass, field 

9from enum import Enum 

10from typing import Any 

11 

12 

13class TaskStatus(str, Enum): 

14 

15 """任务状态枚举。""" 

16 

17 PENDING = "pending" 

18 IN_PROGRESS = "in_progress" 

19 COMPLETED = "completed" 

20 FAILED = "failed" 

21 

22 

23@dataclass 

24class AgentCard: 

25 """Agent名片 — A2A协议中Agent互相发现的基础。""" 

26 id: str 

27 name: str 

28 description: str 

29 capabilities: list[str] = field(default_factory=list) 

30 endpoint: str = "" 

31 protocol_version: str = "1.0" 

32 

33 

34@dataclass 

35class Task: 

36 """结构化任务 — A2A协议的任务定义。""" 

37 id: str 

38 description: str 

39 input_data: dict[str, Any] = field(default_factory=dict) 

40 status: TaskStatus = TaskStatus.PENDING 

41 

42 

43@dataclass 

44class TaskResult: 

45 """Result of an A2A routed task execution.""" 

46 task_id: str 

47 output: str 

48 artifacts: list[str] = field(default_factory=list) 

49 error: str | None = None 

50 

51 

52class A2ARouter: 

53 """ 

54 A2A协议路由 — 让不同框架构建的Agent相互通信。 

55 

56 核心流程: 

57 1. Agent Card 注册 → 互相发现 

58 2. Task 委派 → 结构化任务传递 

59 3. Message 协商 → 多轮异步通信 

60 4. Artifact 返回 → 产物传递 

61 """ 

62 

63 def __init__(self): 

64 self.local_agents: dict[str, AgentCard] = {} 

65 self.remote_agents: dict[str, AgentCard] = {} 

66 self._task_results: dict[str, TaskResult] = {} 

67 

68 def register(self, card: AgentCard) -> None: 

69 """Register an agent card (compliance test entry point).""" 

70 self.local_agents[card.id] = card 

71 

72 def register_local(self, card: AgentCard): 

73 self.local_agents[card.id] = card 

74 

75 def discover_remote(self, cards: list[AgentCard]): 

76 for card in cards: 

77 self.remote_agents[card.id] = card 

78 

79 def find_agent(self, capability: str) -> AgentCard | None: 

80 """按能力查找Agent。""" 

81 all_agents = {**self.local_agents, **self.remote_agents} 

82 for agent in all_agents.values(): 

83 if capability.lower() in [c.lower() for c in agent.capabilities]: 

84 return agent 

85 return None 

86 

87 def delegate(self, task: Task, agent_id: str | None = None) -> TaskResult: 

88 """ 

89 任务委派。实际生产环境中会通过A2A协议异步调用远程Agent。 

90 当前为本地模拟实现。 

91 """ 

92 task.status = TaskStatus.IN_PROGRESS 

93 

94 # 模拟异步执行 

95 result = TaskResult( 

96 task_id=task.id, 

97 output=f"Agent {agent_id or 'unknown'} processed: {task.description}", 

98 ) 

99 task.status = TaskStatus.COMPLETED 

100 self._task_results[task.id] = result 

101 return result 

102 

103 def list_agents(self) -> list[AgentCard]: 

104 return list(self.local_agents.values()) + list(self.remote_agents.values())