Coverage for agentos/protocols/contracts.py: 0%
162 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v0.70 — Agent能力契约与发现协议。
3基因来源: MCP (Model Context Protocol) + OpenAPI Spec
5契约系统允许Agent声明自己的能力和限制,其他Agent可以通过
6能力匹配引擎找到合适的Agent协作。
8契约格式:
9- AgentCapability: 单个能力描述(名称、描述、输入输出schema)
10- AgentContract: Agent的完整契约(身份、能力列表、QoS、限制)
11- CapabilityMatcher: 能力匹配引擎
12"""
14from __future__ import annotations
16from dataclasses import dataclass, field
17from enum import Enum
18from typing import Any
21# ── Capability Types ────────────────────────────
23class CapabilityDomain(str, Enum):
25 """能力域枚举。"""
27 REASONING = "reasoning" # 推理分析
28 CODING = "coding" # 代码生成
29 SEARCH = "search" # 信息检索
30 EXECUTION = "execution" # 命令执行
31 CREATIVE = "creative" # 创意生成
32 ANALYSIS = "analysis" # 数据分析
33 COORDINATION = "coordination" # 协调调度
36class QoSLevel(str, Enum):
38 """服务质量等级。"""
40 BEST_EFFORT = "best_effort" # 尽力而为
41 HIGH_AVAILABILITY = "ha" # 高可用
42 LOW_LATENCY = "low_latency" # 低延迟
43 HIGH_ACCURACY = "high_accuracy" # 高准确
46@dataclass
47class AgentCapability:
48 """单个能力声明。"""
50 name: str
51 description: str
52 domain: CapabilityDomain = CapabilityDomain.REASONING
53 input_schema: dict = field(default_factory=dict)
54 output_schema: dict = field(default_factory=dict)
55 max_tokens: int = 8192
56 cost_per_call: float = 0.0
57 avg_latency_ms: float = 1000.0
58 tags: list[str] = field(default_factory=list)
59 confidence: float = 0.9 # 0.0 - 1.0
60 version: str = "1.0.0"
63@dataclass
64class AgentContract:
65 """Agent完整契约 — 身份 + 能力 + 限制。"""
67 agent_id: str
68 agent_name: str
69 agent_type: str = "general"
70 description: str = ""
71 capabilities: list[AgentCapability] = field(default_factory=list)
72 qos_level: QoSLevel = QoSLevel.BEST_EFFORT
73 rate_limit_rpm: int = 60 # 每分钟最大请求数
74 max_context_tokens: int = 128000
75 supported_languages: list[str] = field(default_factory=list)
76 endpoints: list[str] = field(default_factory=list)
77 dependencies: list[str] = field(default_factory=list)
78 health_check_url: str = ""
79 metadata: dict[str, Any] = field(default_factory=dict)
80 version: str = "1.0.0"
81 created_at: str = ""
82 updated_at: str = ""
84 def to_dict(self) -> dict:
85 return {
86 "agent_id": self.agent_id,
87 "agent_name": self.agent_name,
88 "agent_type": self.agent_type,
89 "description": self.description,
90 "capabilities": [
91 {
92 "name": c.name,
93 "domain": c.domain.value,
94 "description": c.description,
95 "tags": c.tags,
96 }
97 for c in self.capabilities
98 ],
99 "qos_level": self.qos_level.value,
100 "rate_limit_rpm": self.rate_limit_rpm,
101 "max_context_tokens": self.max_context_tokens,
102 }
104 def has_capability(self, name: str) -> bool:
105 return any(c.name == name for c in self.capabilities)
107 def has_domain(self, domain: CapabilityDomain) -> bool:
108 return any(c.domain == domain for c in self.capabilities)
111# ── Capability Matcher ──────────────────────────
113@dataclass
114class MatchScore:
115 """匹配评分结果。"""
117 contract: AgentContract
118 capability: AgentCapability
119 score: float # 0.0 - 1.0
120 match_details: dict[str, float] = field(default_factory=dict)
121 rank: int = 0
124class CapabilityMatcher:
125 """
126 能力匹配引擎 — 根据查询找到最合适的Agent。
127 支持: 语义匹配、标签匹配、领域匹配、QoS权重。
128 """
130 def __init__(self, contracts: list[AgentContract] | None = None):
131 self._contracts: dict[str, AgentContract] = {}
132 if contracts:
133 for c in contracts:
134 self.register(c)
136 def register(self, contract: AgentContract):
137 self._contracts[contract.agent_id] = contract
139 def unregister(self, agent_id: str):
140 self._contracts.pop(agent_id, None)
142 def find(
143 self,
144 query: str,
145 domain: CapabilityDomain | None = None,
146 min_score: float = 0.3,
147 top_k: int = 5,
148 ) -> list[MatchScore]:
149 """
150 根据自然语言查询找到最匹配的Agent。
151 """
152 results: list[MatchScore] = []
154 for contract in self._contracts.values():
155 for cap in contract.capabilities:
156 if domain and cap.domain != domain:
157 continue
159 score = self._compute_match(query, cap, contract)
160 if score >= min_score:
161 results.append(MatchScore(
162 contract=contract,
163 capability=cap,
164 score=score,
165 match_details={
166 "text_similarity": self._text_sim(query, cap),
167 "domain_match": 1.0 if domain and cap.domain == domain else 0.5,
168 "tag_overlap": self._tag_overlap(query, cap.tags),
169 },
170 ))
172 # Sort by score desc
173 results.sort(key=lambda x: x.score, reverse=True)
175 # Assign ranks
176 for i, r in enumerate(results[:top_k]):
177 r.rank = i + 1
179 return results[:top_k]
181 def find_by_domain(self, domain: CapabilityDomain) -> list[AgentContract]:
182 """按领域查找所有Agent。"""
183 return [c for c in self._contracts.values() if c.has_domain(domain)]
185 def find_by_tag(self, tag: str) -> list[AgentContract]:
186 """按标签查找。"""
187 return [
188 c for c in self._contracts.values()
189 if any(tag in cap.tags for cap in c.capabilities)
190 ]
192 def recommend_for_task(self, task_description: str) -> list[MatchScore]:
193 """根据任务描述推荐Agent。"""
194 return self.find(query=task_description, top_k=3)
196 # ── Internal scoring ─────────────────────────
198 def _compute_match(
199 self,
200 query: str,
201 capability: AgentCapability,
202 contract: AgentContract,
203 ) -> float:
204 """综合匹配评分。"""
205 scores = []
207 # Text similarity (keyword overlap)
208 scores.append(self._text_sim(query, capability) * 0.35)
210 # Domain match (if query mentions domain keywords)
211 domain_hint = self._detect_domain(query)
212 if domain_hint and domain_hint == capability.domain:
213 scores.append(0.3)
214 else:
215 scores.append(0.1)
217 # Tag overlap
218 scores.append(self._tag_overlap(query, capability.tags) * 0.15)
220 # QoS bonus
221 qos_bonus = {
222 QoSLevel.HIGH_ACCURACY: 0.1,
223 QoSLevel.LOW_LATENCY: 0.08,
224 QoSLevel.HIGH_AVAILABILITY: 0.05,
225 QoSLevel.BEST_EFFORT: 0.0,
226 }.get(contract.qos_level, 0.0)
227 scores.append(qos_bonus)
229 # Confidence
230 scores.append(capability.confidence * 0.1)
232 return min(sum(scores), 1.0)
234 def _text_sim(self, query: str, capability: AgentCapability) -> float:
235 """关键词重叠相似度。"""
236 q_lower = query.lower()
237 keywords = set(q_lower.split()) | {q_lower}
239 # Capability text
240 cap_text = (
241 f"{capability.name} {capability.description} "
242 f"{' '.join(capability.tags)} {capability.domain.value}"
243 ).lower()
244 cap_words = set(cap_text.split())
246 if not keywords or not cap_words:
247 return 0.0
249 overlap = len(keywords & cap_words)
250 return min(overlap / max(len(keywords), 1), 1.0)
252 def _tag_overlap(self, query: str, tags: list[str]) -> float:
253 """标签重叠率。"""
254 q_lower = query.lower()
255 if not tags:
256 return 0.0
257 hits = sum(1 for tag in tags if tag.lower() in q_lower)
258 return hits / len(tags)
260 def _detect_domain(self, query: str) -> CapabilityDomain | None:
261 """从查询中检测能力域。"""
262 q = query.lower()
263 domain_keywords = {
264 CapabilityDomain.CODING: ["code", "代码", "编程", "开发", "函数", "bug", "debug"],
265 CapabilityDomain.SEARCH: ["search", "搜索", "查找", "检索", "资料"],
266 CapabilityDomain.ANALYSIS: ["分析", "数据", "统计", "图表", "analysis", "data"],
267 CapabilityDomain.CREATIVE: ["写", "创作", "生成", "设计", "创意", "write", "create"],
268 CapabilityDomain.EXECUTION: ["运行", "执行", "操作", "命令", "exec", "run"],
269 CapabilityDomain.COORDINATION: ["协调", "编排", "调度", "工作流", "orchestrate"],
270 }
271 for domain, keywords in domain_keywords.items():
272 if any(kw in q for kw in keywords):
273 return domain
274 return None
277# ── Contract Registry ───────────────────────────
279class ContractRegistry:
280 """
281 契约注册中心 — 分布式Agent能力发现。
282 支持心跳检测、自动过期。
283 """
285 def __init__(self):
286 self._contracts: dict[str, AgentContract] = {}
287 self._heartbeats: dict[str, float] = {}
288 self._matcher = CapabilityMatcher()
290 def register(self, contract: AgentContract):
291 import time
292 self._contracts[contract.agent_id] = contract
293 self._heartbeats[contract.agent_id] = time.time()
294 self._matcher.register(contract)
296 def heartbeat(self, agent_id: str):
297 import time
298 if agent_id in self._contracts:
299 self._heartbeats[agent_id] = time.time()
301 def unregister(self, agent_id: str):
302 self._contracts.pop(agent_id, None)
303 self._heartbeats.pop(agent_id, None)
304 self._matcher.unregister(agent_id)
306 def prune_stale(self, max_idle_seconds: float = 300.0):
307 """移除超时未心跳的Agent。"""
308 import time
309 now = time.time()
310 stale = [
311 aid for aid, ts in self._heartbeats.items()
312 if now - ts > max_idle_seconds
313 ]
314 for aid in stale:
315 self.unregister(aid)
316 return stale
318 def find(self, query: str, **kwargs) -> list[MatchScore]:
319 return self._matcher.find(query, **kwargs)
321 @property
322 def active_count(self) -> int:
323 return len(self._contracts)
325 def list_contracts(self) -> list[AgentContract]:
326 return list(self._contracts.values())
328 def summary(self) -> str:
329 lines = [f"注册Agent: {self.active_count}"]
330 for c in self._contracts.values():
331 caps = ", ".join(cap.domain.value for cap in c.capabilities)
332 lines.append(f" {c.agent_name} ({c.agent_type}): [{caps}]")
333 return "\n".join(lines)