Coverage for agentos/enterprise/api_keys.py: 49%

112 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS Enterprise — API Key Management. 

3 

4功能: 

5 - API Key 创建/撤销/轮转 

6 - Key 哈希存储(SHA-256),不存明文 

7 - 前缀匹配快速查找 

8 - 权限范围(scope)绑定 

9 - 过期时间 & 用量配额 

10 - 审计日志联动 

11""" 

12 

13from __future__ import annotations 

14 

15import hashlib 

16import hmac 

17import secrets 

18import time 

19from dataclasses import dataclass, field 

20from datetime import datetime, timedelta 

21from enum import Enum 

22from typing import Optional 

23 

24 

25class KeyScope(str, Enum): 

26 """API Key 权限范围。""" 

27 READ = "read" 

28 WRITE = "write" 

29 ADMIN = "admin" 

30 AGENT_RUN = "agent:run" 

31 AGENT_MANAGE = "agent:manage" 

32 TOOLS_ALL = "tools:*" 

33 

34 

35@dataclass 

36class APIKey: 

37 """API Key 实体。只存储哈希,不存明文。""" 

38 key_id: str # 唯一标识,如 "ak_abc123" 

39 key_hash: str # SHA-256 哈希 

40 key_prefix: str # 前 8 位明文,用于快速匹配 

41 name: str # 人类可读名称,如 "生产环境 Bot" 

42 created_by: str # 创建者 

43 scopes: list[KeyScope] # 权限范围 

44 created_at: float = field(default_factory=time.time) 

45 expires_at: Optional[float] = None # 过期时间戳(None = 永不过期) 

46 last_used_at: Optional[float] = None 

47 usage_count: int = 0 

48 revoked: bool = False 

49 revoked_at: Optional[float] = None 

50 metadata: dict = field(default_factory=dict) 

51 

52 

53@dataclass 

54class KeyCreateRequest: 

55 """创建 API Key 的请求。""" 

56 name: str 

57 scopes: list[KeyScope] 

58 expires_in_days: Optional[int] = None # None = 永不过期 

59 metadata: dict = field(default_factory=dict) 

60 

61 

62@dataclass 

63class KeyCreateResult: 

64 """创建结果 — 包含仅此一次可见的明文 Key。""" 

65 key_id: str 

66 plaintext_key: str # ⚠️ 仅返回一次 

67 key_prefix: str 

68 scopes: list[KeyScope] 

69 expires_at: Optional[float] 

70 

71 

72class APIKeyManager: 

73 """API Key 全生命周期管理器。 

74 

75 特性: 

76 - SHA-256 哈希存储,不存明文 

77 - 前缀快速查找(前 8 位明文索引) 

78 - 撤销/轮转/用量追踪 

79 - 范围校验 

80 """ 

81 

82 def __init__(self, secret_salt: str = ""): 

83 self._keys: dict[str, APIKey] = {} # key_id → APIKey 

84 self._prefix_index: dict[str, str] = {} # key_prefix → key_id 

85 self._secret_salt = secret_salt or secrets.token_hex(16) 

86 

87 # ── 创建 ── 

88 

89 def create_key(self, request: KeyCreateRequest, created_by: str = "admin") -> KeyCreateResult: 

90 """创建新的 API Key。返回仅一次可见的明文。""" 

91 key_id = f"ak_{secrets.token_hex(12)}" 

92 plaintext = f"agentos_{secrets.token_hex(24)}" 

93 key_prefix = plaintext[:12] 

94 key_hash = self._hash(plaintext) 

95 

96 expires_at = None 

97 if request.expires_in_days: 

98 expires_at = time.time() + request.expires_in_days * 86400 

99 

100 key = APIKey( 

101 key_id=key_id, 

102 key_hash=key_hash, 

103 key_prefix=key_prefix, 

104 name=request.name, 

105 created_by=created_by, 

106 scopes=request.scopes, 

107 expires_at=expires_at, 

108 metadata=request.metadata, 

109 ) 

110 

111 self._keys[key_id] = key 

112 self._prefix_index[key_prefix] = key_id 

113 

114 return KeyCreateResult( 

115 key_id=key_id, 

116 plaintext_key=plaintext, 

117 key_prefix=key_prefix, 

118 scopes=request.scopes, 

119 expires_at=expires_at, 

120 ) 

121 

122 # ── 验证 ── 

123 

124 def validate_key(self, plaintext: str) -> Optional[APIKey]: 

125 """验证 API Key 并返回对应的 Key 对象。无效/已撤销/过期返回 None。""" 

126 key_hash = self._hash(plaintext) 

127 

128 # 前缀快速定位 

129 key_prefix = plaintext[:12] 

130 key_id = self._prefix_index.get(key_prefix) 

131 if not key_id: 

132 return None 

133 

134 key = self._keys.get(key_id) 

135 if not key: 

136 return None 

137 

138 # 恒定时间比对防时序攻击 

139 if not hmac.compare_digest(key.key_hash, key_hash): 

140 return None 

141 

142 if key.revoked: 

143 return None 

144 

145 if key.expires_at and time.time() > key.expires_at: 

146 return None 

147 

148 # 更新使用记录 

149 key.last_used_at = time.time() 

150 key.usage_count += 1 

151 

152 return key 

153 

154 def check_scope(self, key: APIKey, required_scope: KeyScope) -> bool: 

155 """检查 Key 是否拥有指定权限范围。""" 

156 if KeyScope.ADMIN in key.scopes: 

157 return True 

158 return required_scope in key.scopes 

159 

160 # ── 管理 ── 

161 

162 def revoke_key(self, key_id: str) -> bool: 

163 """撤销 API Key。""" 

164 key = self._keys.get(key_id) 

165 if not key or key.revoked: 

166 return False 

167 key.revoked = True 

168 key.revoked_at = time.time() 

169 return True 

170 

171 def rotate_key(self, key_id: str, created_by: str = "admin") -> Optional[KeyCreateResult]: 

172 """轮转 API Key:撤销旧 Key,创建新 Key。""" 

173 old = self._keys.get(key_id) 

174 if not old or old.revoked: 

175 return None 

176 

177 self.revoke_key(key_id) 

178 

179 expires_in = None 

180 if old.expires_at: 

181 remaining = old.expires_at - time.time() 

182 expires_in = int(max(1, remaining / 86400)) 

183 

184 return self.create_key( 

185 KeyCreateRequest( 

186 name=f"{old.name} (rotated)", 

187 scopes=old.scopes, 

188 expires_in_days=expires_in, 

189 metadata={"rotated_from": key_id, **old.metadata}, 

190 ), 

191 created_by=created_by, 

192 ) 

193 

194 def list_keys(self) -> list[APIKey]: 

195 """列出所有 Key(不含明文)。""" 

196 return sorted(self._keys.values(), key=lambda k: k.created_at, reverse=True) 

197 

198 def get_key(self, key_id: str) -> Optional[APIKey]: 

199 """获取单个 Key 信息。""" 

200 return self._keys.get(key_id) 

201 

202 def stats(self) -> dict: 

203 """Key 统计信息。""" 

204 total = len(self._keys) 

205 active = sum(1 for k in self._keys.values() if not k.revoked) 

206 revoked = total - active 

207 total_usage = sum(k.usage_count for k in self._keys.values()) 

208 return { 

209 "total": total, 

210 "active": active, 

211 "revoked": revoked, 

212 "total_usage_count": total_usage, 

213 } 

214 

215 # ── 内部 ── 

216 

217 def _hash(self, plaintext: str) -> str: 

218 """SHA-256 哈希(加盐)。""" 

219 return hashlib.sha256(f"{self._secret_salt}:{plaintext}".encode()).hexdigest()