Coverage for agentos/orchestration/a2a_router.py: 67%
55 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2A2A协议路由 — 跨框架Agent互操作。
3基因来源: Google ADK A2A Protocol
4"""
6from __future__ import annotations
8from dataclasses import dataclass, field
9from enum import Enum
10from typing import Any
13class TaskStatus(str, Enum):
15 """任务状态枚举。"""
17 PENDING = "pending"
18 IN_PROGRESS = "in_progress"
19 COMPLETED = "completed"
20 FAILED = "failed"
23@dataclass
24class AgentCard:
25 """Agent名片 — A2A协议中Agent互相发现的基础。"""
26 id: str
27 name: str
28 description: str
29 capabilities: list[str] = field(default_factory=list)
30 endpoint: str = ""
31 protocol_version: str = "1.0"
34@dataclass
35class Task:
36 """结构化任务 — A2A协议的任务定义。"""
37 id: str
38 description: str
39 input_data: dict[str, Any] = field(default_factory=dict)
40 status: TaskStatus = TaskStatus.PENDING
43@dataclass
44class TaskResult:
45 """Result of an A2A routed task execution."""
46 task_id: str
47 output: str
48 artifacts: list[str] = field(default_factory=list)
49 error: str | None = None
52class A2ARouter:
53 """
54 A2A协议路由 — 让不同框架构建的Agent相互通信。
56 核心流程:
57 1. Agent Card 注册 → 互相发现
58 2. Task 委派 → 结构化任务传递
59 3. Message 协商 → 多轮异步通信
60 4. Artifact 返回 → 产物传递
61 """
63 def __init__(self):
64 self.local_agents: dict[str, AgentCard] = {}
65 self.remote_agents: dict[str, AgentCard] = {}
66 self._task_results: dict[str, TaskResult] = {}
68 def register(self, card: AgentCard) -> None:
69 """Register an agent card (compliance test entry point)."""
70 self.local_agents[card.id] = card
72 def register_local(self, card: AgentCard):
73 self.local_agents[card.id] = card
75 def discover_remote(self, cards: list[AgentCard]):
76 for card in cards:
77 self.remote_agents[card.id] = card
79 def find_agent(self, capability: str) -> AgentCard | None:
80 """按能力查找Agent。"""
81 all_agents = {**self.local_agents, **self.remote_agents}
82 for agent in all_agents.values():
83 if capability.lower() in [c.lower() for c in agent.capabilities]:
84 return agent
85 return None
87 def delegate(self, task: Task, agent_id: str | None = None) -> TaskResult:
88 """
89 任务委派。实际生产环境中会通过A2A协议异步调用远程Agent。
90 当前为本地模拟实现。
91 """
92 task.status = TaskStatus.IN_PROGRESS
94 # 模拟异步执行
95 result = TaskResult(
96 task_id=task.id,
97 output=f"Agent {agent_id or 'unknown'} processed: {task.description}",
98 )
99 task.status = TaskStatus.COMPLETED
100 self._task_results[task.id] = result
101 return result
103 def list_agents(self) -> list[AgentCard]:
104 return list(self.local_agents.values()) + list(self.remote_agents.values())