Coverage for agentos/protocols/contracts.py: 0%

162 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v0.70 — Agent能力契约与发现协议。 

3基因来源: MCP (Model Context Protocol) + OpenAPI Spec 

4 

5契约系统允许Agent声明自己的能力和限制,其他Agent可以通过 

6能力匹配引擎找到合适的Agent协作。 

7 

8契约格式: 

9- AgentCapability: 单个能力描述(名称、描述、输入输出schema) 

10- AgentContract: Agent的完整契约(身份、能力列表、QoS、限制) 

11- CapabilityMatcher: 能力匹配引擎 

12""" 

13 

14from __future__ import annotations 

15 

16from dataclasses import dataclass, field 

17from enum import Enum 

18from typing import Any 

19 

20 

21# ── Capability Types ──────────────────────────── 

22 

23class CapabilityDomain(str, Enum): 

24 

25 """能力域枚举。""" 

26 

27 REASONING = "reasoning" # 推理分析 

28 CODING = "coding" # 代码生成 

29 SEARCH = "search" # 信息检索 

30 EXECUTION = "execution" # 命令执行 

31 CREATIVE = "creative" # 创意生成 

32 ANALYSIS = "analysis" # 数据分析 

33 COORDINATION = "coordination" # 协调调度 

34 

35 

36class QoSLevel(str, Enum): 

37 

38 """服务质量等级。""" 

39 

40 BEST_EFFORT = "best_effort" # 尽力而为 

41 HIGH_AVAILABILITY = "ha" # 高可用 

42 LOW_LATENCY = "low_latency" # 低延迟 

43 HIGH_ACCURACY = "high_accuracy" # 高准确 

44 

45 

46@dataclass 

47class AgentCapability: 

48 """单个能力声明。""" 

49 

50 name: str 

51 description: str 

52 domain: CapabilityDomain = CapabilityDomain.REASONING 

53 input_schema: dict = field(default_factory=dict) 

54 output_schema: dict = field(default_factory=dict) 

55 max_tokens: int = 8192 

56 cost_per_call: float = 0.0 

57 avg_latency_ms: float = 1000.0 

58 tags: list[str] = field(default_factory=list) 

59 confidence: float = 0.9 # 0.0 - 1.0 

60 version: str = "1.0.0" 

61 

62 

63@dataclass 

64class AgentContract: 

65 """Agent完整契约 — 身份 + 能力 + 限制。""" 

66 

67 agent_id: str 

68 agent_name: str 

69 agent_type: str = "general" 

70 description: str = "" 

71 capabilities: list[AgentCapability] = field(default_factory=list) 

72 qos_level: QoSLevel = QoSLevel.BEST_EFFORT 

73 rate_limit_rpm: int = 60 # 每分钟最大请求数 

74 max_context_tokens: int = 128000 

75 supported_languages: list[str] = field(default_factory=list) 

76 endpoints: list[str] = field(default_factory=list) 

77 dependencies: list[str] = field(default_factory=list) 

78 health_check_url: str = "" 

79 metadata: dict[str, Any] = field(default_factory=dict) 

80 version: str = "1.0.0" 

81 created_at: str = "" 

82 updated_at: str = "" 

83 

84 def to_dict(self) -> dict: 

85 return { 

86 "agent_id": self.agent_id, 

87 "agent_name": self.agent_name, 

88 "agent_type": self.agent_type, 

89 "description": self.description, 

90 "capabilities": [ 

91 { 

92 "name": c.name, 

93 "domain": c.domain.value, 

94 "description": c.description, 

95 "tags": c.tags, 

96 } 

97 for c in self.capabilities 

98 ], 

99 "qos_level": self.qos_level.value, 

100 "rate_limit_rpm": self.rate_limit_rpm, 

101 "max_context_tokens": self.max_context_tokens, 

102 } 

103 

104 def has_capability(self, name: str) -> bool: 

105 return any(c.name == name for c in self.capabilities) 

106 

107 def has_domain(self, domain: CapabilityDomain) -> bool: 

108 return any(c.domain == domain for c in self.capabilities) 

109 

110 

111# ── Capability Matcher ────────────────────────── 

112 

113@dataclass 

114class MatchScore: 

115 """匹配评分结果。""" 

116 

117 contract: AgentContract 

118 capability: AgentCapability 

119 score: float # 0.0 - 1.0 

120 match_details: dict[str, float] = field(default_factory=dict) 

121 rank: int = 0 

122 

123 

124class CapabilityMatcher: 

125 """ 

126 能力匹配引擎 — 根据查询找到最合适的Agent。 

127 支持: 语义匹配、标签匹配、领域匹配、QoS权重。 

128 """ 

129 

130 def __init__(self, contracts: list[AgentContract] | None = None): 

131 self._contracts: dict[str, AgentContract] = {} 

132 if contracts: 

133 for c in contracts: 

134 self.register(c) 

135 

136 def register(self, contract: AgentContract): 

137 self._contracts[contract.agent_id] = contract 

138 

139 def unregister(self, agent_id: str): 

140 self._contracts.pop(agent_id, None) 

141 

142 def find( 

143 self, 

144 query: str, 

145 domain: CapabilityDomain | None = None, 

146 min_score: float = 0.3, 

147 top_k: int = 5, 

148 ) -> list[MatchScore]: 

149 """ 

150 根据自然语言查询找到最匹配的Agent。 

151 """ 

152 results: list[MatchScore] = [] 

153 

154 for contract in self._contracts.values(): 

155 for cap in contract.capabilities: 

156 if domain and cap.domain != domain: 

157 continue 

158 

159 score = self._compute_match(query, cap, contract) 

160 if score >= min_score: 

161 results.append(MatchScore( 

162 contract=contract, 

163 capability=cap, 

164 score=score, 

165 match_details={ 

166 "text_similarity": self._text_sim(query, cap), 

167 "domain_match": 1.0 if domain and cap.domain == domain else 0.5, 

168 "tag_overlap": self._tag_overlap(query, cap.tags), 

169 }, 

170 )) 

171 

172 # Sort by score desc 

173 results.sort(key=lambda x: x.score, reverse=True) 

174 

175 # Assign ranks 

176 for i, r in enumerate(results[:top_k]): 

177 r.rank = i + 1 

178 

179 return results[:top_k] 

180 

181 def find_by_domain(self, domain: CapabilityDomain) -> list[AgentContract]: 

182 """按领域查找所有Agent。""" 

183 return [c for c in self._contracts.values() if c.has_domain(domain)] 

184 

185 def find_by_tag(self, tag: str) -> list[AgentContract]: 

186 """按标签查找。""" 

187 return [ 

188 c for c in self._contracts.values() 

189 if any(tag in cap.tags for cap in c.capabilities) 

190 ] 

191 

192 def recommend_for_task(self, task_description: str) -> list[MatchScore]: 

193 """根据任务描述推荐Agent。""" 

194 return self.find(query=task_description, top_k=3) 

195 

196 # ── Internal scoring ───────────────────────── 

197 

198 def _compute_match( 

199 self, 

200 query: str, 

201 capability: AgentCapability, 

202 contract: AgentContract, 

203 ) -> float: 

204 """综合匹配评分。""" 

205 scores = [] 

206 

207 # Text similarity (keyword overlap) 

208 scores.append(self._text_sim(query, capability) * 0.35) 

209 

210 # Domain match (if query mentions domain keywords) 

211 domain_hint = self._detect_domain(query) 

212 if domain_hint and domain_hint == capability.domain: 

213 scores.append(0.3) 

214 else: 

215 scores.append(0.1) 

216 

217 # Tag overlap 

218 scores.append(self._tag_overlap(query, capability.tags) * 0.15) 

219 

220 # QoS bonus 

221 qos_bonus = { 

222 QoSLevel.HIGH_ACCURACY: 0.1, 

223 QoSLevel.LOW_LATENCY: 0.08, 

224 QoSLevel.HIGH_AVAILABILITY: 0.05, 

225 QoSLevel.BEST_EFFORT: 0.0, 

226 }.get(contract.qos_level, 0.0) 

227 scores.append(qos_bonus) 

228 

229 # Confidence 

230 scores.append(capability.confidence * 0.1) 

231 

232 return min(sum(scores), 1.0) 

233 

234 def _text_sim(self, query: str, capability: AgentCapability) -> float: 

235 """关键词重叠相似度。""" 

236 q_lower = query.lower() 

237 keywords = set(q_lower.split()) | {q_lower} 

238 

239 # Capability text 

240 cap_text = ( 

241 f"{capability.name} {capability.description} " 

242 f"{' '.join(capability.tags)} {capability.domain.value}" 

243 ).lower() 

244 cap_words = set(cap_text.split()) 

245 

246 if not keywords or not cap_words: 

247 return 0.0 

248 

249 overlap = len(keywords & cap_words) 

250 return min(overlap / max(len(keywords), 1), 1.0) 

251 

252 def _tag_overlap(self, query: str, tags: list[str]) -> float: 

253 """标签重叠率。""" 

254 q_lower = query.lower() 

255 if not tags: 

256 return 0.0 

257 hits = sum(1 for tag in tags if tag.lower() in q_lower) 

258 return hits / len(tags) 

259 

260 def _detect_domain(self, query: str) -> CapabilityDomain | None: 

261 """从查询中检测能力域。""" 

262 q = query.lower() 

263 domain_keywords = { 

264 CapabilityDomain.CODING: ["code", "代码", "编程", "开发", "函数", "bug", "debug"], 

265 CapabilityDomain.SEARCH: ["search", "搜索", "查找", "检索", "资料"], 

266 CapabilityDomain.ANALYSIS: ["分析", "数据", "统计", "图表", "analysis", "data"], 

267 CapabilityDomain.CREATIVE: ["写", "创作", "生成", "设计", "创意", "write", "create"], 

268 CapabilityDomain.EXECUTION: ["运行", "执行", "操作", "命令", "exec", "run"], 

269 CapabilityDomain.COORDINATION: ["协调", "编排", "调度", "工作流", "orchestrate"], 

270 } 

271 for domain, keywords in domain_keywords.items(): 

272 if any(kw in q for kw in keywords): 

273 return domain 

274 return None 

275 

276 

277# ── Contract Registry ─────────────────────────── 

278 

279class ContractRegistry: 

280 """ 

281 契约注册中心 — 分布式Agent能力发现。 

282 支持心跳检测、自动过期。 

283 """ 

284 

285 def __init__(self): 

286 self._contracts: dict[str, AgentContract] = {} 

287 self._heartbeats: dict[str, float] = {} 

288 self._matcher = CapabilityMatcher() 

289 

290 def register(self, contract: AgentContract): 

291 import time 

292 self._contracts[contract.agent_id] = contract 

293 self._heartbeats[contract.agent_id] = time.time() 

294 self._matcher.register(contract) 

295 

296 def heartbeat(self, agent_id: str): 

297 import time 

298 if agent_id in self._contracts: 

299 self._heartbeats[agent_id] = time.time() 

300 

301 def unregister(self, agent_id: str): 

302 self._contracts.pop(agent_id, None) 

303 self._heartbeats.pop(agent_id, None) 

304 self._matcher.unregister(agent_id) 

305 

306 def prune_stale(self, max_idle_seconds: float = 300.0): 

307 """移除超时未心跳的Agent。""" 

308 import time 

309 now = time.time() 

310 stale = [ 

311 aid for aid, ts in self._heartbeats.items() 

312 if now - ts > max_idle_seconds 

313 ] 

314 for aid in stale: 

315 self.unregister(aid) 

316 return stale 

317 

318 def find(self, query: str, **kwargs) -> list[MatchScore]: 

319 return self._matcher.find(query, **kwargs) 

320 

321 @property 

322 def active_count(self) -> int: 

323 return len(self._contracts) 

324 

325 def list_contracts(self) -> list[AgentContract]: 

326 return list(self._contracts.values()) 

327 

328 def summary(self) -> str: 

329 lines = [f"注册Agent: {self.active_count}"] 

330 for c in self._contracts.values(): 

331 caps = ", ".join(cap.domain.value for cap in c.capabilities) 

332 lines.append(f" {c.agent_name} ({c.agent_type}): [{caps}]") 

333 return "\n".join(lines)