Coverage for agentos/protocols/__init__.py: 75%

59 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 15:40 +0800

1""" 

2AgentOS Protocols — Standardized agent communication interfaces. 

3 

4Modules: 

5- registry.py: Agent Registry with service discovery, heartbeat, load balancing 

6- grpc.py: gRPC-based A2A protocol with streaming and TLS/mTLS 

7""" 

8 

9from agentos.protocols.registry import AgentRegistry, AgentInfo 

10from agentos.protocols.grpc import ( 

11 GrpcTaskRequest, 

12 GrpcTaskResponse, 

13 GrpcHeartbeat, 

14 GrpcStreamChunk, 

15 TaskStatus, 

16 GrpcStatusCode, 

17 GrpcAgentService, 

18 DefaultAgentService, 

19 GrpcServer, 

20 GrpcServerConfig, 

21 GrpcClient, 

22 GrpcClientConfig, 

23 GrpcFrameCodec, 

24 SERVICE_NAME, 

25 create_self_signed_cert, 

26) 

27 

28__all__ = [ 

29 # Registry 

30 "AgentRegistry", 

31 # gRPC 

32 "GrpcTaskRequest", 

33 "GrpcTaskResponse", 

34 "GrpcHeartbeat", 

35 "GrpcStreamChunk", 

36 "TaskStatus", 

37 "GrpcStatusCode", 

38 "GrpcAgentService", 

39 "DefaultAgentService", 

40 "GrpcServer", 

41 "GrpcServerConfig", 

42 "GrpcClient", 

43 "GrpcClientConfig", 

44 "GrpcFrameCodec", 

45 "SERVICE_NAME", 

46 "create_self_signed_cert", 

47] 

48 

49 

50# ── Compatibility exports (required by agentos/__init__.py) ── 

51 

52from dataclasses import dataclass, field 

53from enum import Enum 

54from typing import Any 

55 

56 

57class CapabilityDomain(str, Enum): 

58 TEXT = "text" 

59 CODE = "code" 

60 MULTIMODAL = "multimodal" 

61 TOOL_USE = "tool_use" 

62 SAFETY = "safety" 

63 

64 

65class QoSLevel(str, Enum): 

66 BEST_EFFORT = "best_effort" 

67 AT_LEAST_ONCE = "at_least_once" 

68 EXACTLY_ONCE = "exactly_once" 

69 

70 

71@dataclass 

72class AgentCapability: 

73 name: str = "" 

74 domain: CapabilityDomain = CapabilityDomain.TEXT 

75 version: str = "1.0" 

76 description: str = "" 

77 parameters: dict = field(default_factory=dict) 

78 

79 

80@dataclass 

81class AgentContract: 

82 name: str = "" 

83 version: str = "1.0" 

84 capabilities: list = field(default_factory=list) 

85 qos: QoSLevel = QoSLevel.BEST_EFFORT 

86 endpoint: str = "" 

87 

88 def model_dump(self) -> dict: 

89 return { 

90 "name": self.name, 

91 "version": self.version, 

92 "capabilities": [c.name if hasattr(c, 'name') else str(c) for c in self.capabilities], 

93 "qos": self.qos.value, 

94 "endpoint": self.endpoint, 

95 } 

96 

97 

98@dataclass 

99class MatchScore: 

100 score: float = 0.0 

101 matches: list = field(default_factory=list) 

102 

103 

104class CapabilityMatcher: 

105 """匹配 Agent 能力。""" 

106 

107 def __init__(self): 

108 pass 

109 

110 def match(self, required: list, available: list) -> MatchScore: 

111 resolved = set() 

112 for cap in required: 

113 for avail in available: 

114 cap_name = cap if isinstance(cap, str) else cap.name 

115 avail_name = avail if isinstance(avail, str) else avail.name 

116 if cap_name == avail_name: 

117 resolved.add(cap_name) 

118 score = len(resolved) / max(len(required), 1) 

119 return MatchScore(score=score, matches=list(resolved)) 

120 

121 

122class ContractRegistry: 

123 

124 """Agent 合约注册中心。""" 

125 

126 def __init__(self): 

127 self._contracts: dict[str, AgentContract] = {} 

128 

129 def register(self, contract: AgentContract) -> None: 

130 self._contracts[contract.name] = contract 

131 

132 def get(self, name: str) -> AgentContract: 

133 return self._contracts.get(name) 

134 

135 def list_all(self) -> list[str]: 

136 return list(self._contracts.keys()) 

137 

138 

139__all__ += [ 

140 "AgentContract", 

141 "AgentCapability", 

142 "CapabilityDomain", 

143 "QoSLevel", 

144 "CapabilityMatcher", 

145 "ContractRegistry", 

146 "MatchScore", 

147]