Coverage for agentos/enterprise/api_keys.py: 49%
112 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS Enterprise — API Key Management.
4功能:
5 - API Key 创建/撤销/轮转
6 - Key 哈希存储(SHA-256),不存明文
7 - 前缀匹配快速查找
8 - 权限范围(scope)绑定
9 - 过期时间 & 用量配额
10 - 审计日志联动
11"""
13from __future__ import annotations
15import hashlib
16import hmac
17import secrets
18import time
19from dataclasses import dataclass, field
20from datetime import datetime, timedelta
21from enum import Enum
22from typing import Optional
25class KeyScope(str, Enum):
26 """API Key 权限范围。"""
27 READ = "read"
28 WRITE = "write"
29 ADMIN = "admin"
30 AGENT_RUN = "agent:run"
31 AGENT_MANAGE = "agent:manage"
32 TOOLS_ALL = "tools:*"
35@dataclass
36class APIKey:
37 """API Key 实体。只存储哈希,不存明文。"""
38 key_id: str # 唯一标识,如 "ak_abc123"
39 key_hash: str # SHA-256 哈希
40 key_prefix: str # 前 8 位明文,用于快速匹配
41 name: str # 人类可读名称,如 "生产环境 Bot"
42 created_by: str # 创建者
43 scopes: list[KeyScope] # 权限范围
44 created_at: float = field(default_factory=time.time)
45 expires_at: Optional[float] = None # 过期时间戳(None = 永不过期)
46 last_used_at: Optional[float] = None
47 usage_count: int = 0
48 revoked: bool = False
49 revoked_at: Optional[float] = None
50 metadata: dict = field(default_factory=dict)
53@dataclass
54class KeyCreateRequest:
55 """创建 API Key 的请求。"""
56 name: str
57 scopes: list[KeyScope]
58 expires_in_days: Optional[int] = None # None = 永不过期
59 metadata: dict = field(default_factory=dict)
62@dataclass
63class KeyCreateResult:
64 """创建结果 — 包含仅此一次可见的明文 Key。"""
65 key_id: str
66 plaintext_key: str # ⚠️ 仅返回一次
67 key_prefix: str
68 scopes: list[KeyScope]
69 expires_at: Optional[float]
72class APIKeyManager:
73 """API Key 全生命周期管理器。
75 特性:
76 - SHA-256 哈希存储,不存明文
77 - 前缀快速查找(前 8 位明文索引)
78 - 撤销/轮转/用量追踪
79 - 范围校验
80 """
82 def __init__(self, secret_salt: str = ""):
83 self._keys: dict[str, APIKey] = {} # key_id → APIKey
84 self._prefix_index: dict[str, str] = {} # key_prefix → key_id
85 self._secret_salt = secret_salt or secrets.token_hex(16)
87 # ── 创建 ──
89 def create_key(self, request: KeyCreateRequest, created_by: str = "admin") -> KeyCreateResult:
90 """创建新的 API Key。返回仅一次可见的明文。"""
91 key_id = f"ak_{secrets.token_hex(12)}"
92 plaintext = f"agentos_{secrets.token_hex(24)}"
93 key_prefix = plaintext[:12]
94 key_hash = self._hash(plaintext)
96 expires_at = None
97 if request.expires_in_days:
98 expires_at = time.time() + request.expires_in_days * 86400
100 key = APIKey(
101 key_id=key_id,
102 key_hash=key_hash,
103 key_prefix=key_prefix,
104 name=request.name,
105 created_by=created_by,
106 scopes=request.scopes,
107 expires_at=expires_at,
108 metadata=request.metadata,
109 )
111 self._keys[key_id] = key
112 self._prefix_index[key_prefix] = key_id
114 return KeyCreateResult(
115 key_id=key_id,
116 plaintext_key=plaintext,
117 key_prefix=key_prefix,
118 scopes=request.scopes,
119 expires_at=expires_at,
120 )
122 # ── 验证 ──
124 def validate_key(self, plaintext: str) -> Optional[APIKey]:
125 """验证 API Key 并返回对应的 Key 对象。无效/已撤销/过期返回 None。"""
126 key_hash = self._hash(plaintext)
128 # 前缀快速定位
129 key_prefix = plaintext[:12]
130 key_id = self._prefix_index.get(key_prefix)
131 if not key_id:
132 return None
134 key = self._keys.get(key_id)
135 if not key:
136 return None
138 # 恒定时间比对防时序攻击
139 if not hmac.compare_digest(key.key_hash, key_hash):
140 return None
142 if key.revoked:
143 return None
145 if key.expires_at and time.time() > key.expires_at:
146 return None
148 # 更新使用记录
149 key.last_used_at = time.time()
150 key.usage_count += 1
152 return key
154 def check_scope(self, key: APIKey, required_scope: KeyScope) -> bool:
155 """检查 Key 是否拥有指定权限范围。"""
156 if KeyScope.ADMIN in key.scopes:
157 return True
158 return required_scope in key.scopes
160 # ── 管理 ──
162 def revoke_key(self, key_id: str) -> bool:
163 """撤销 API Key。"""
164 key = self._keys.get(key_id)
165 if not key or key.revoked:
166 return False
167 key.revoked = True
168 key.revoked_at = time.time()
169 return True
171 def rotate_key(self, key_id: str, created_by: str = "admin") -> Optional[KeyCreateResult]:
172 """轮转 API Key:撤销旧 Key,创建新 Key。"""
173 old = self._keys.get(key_id)
174 if not old or old.revoked:
175 return None
177 self.revoke_key(key_id)
179 expires_in = None
180 if old.expires_at:
181 remaining = old.expires_at - time.time()
182 expires_in = int(max(1, remaining / 86400))
184 return self.create_key(
185 KeyCreateRequest(
186 name=f"{old.name} (rotated)",
187 scopes=old.scopes,
188 expires_in_days=expires_in,
189 metadata={"rotated_from": key_id, **old.metadata},
190 ),
191 created_by=created_by,
192 )
194 def list_keys(self) -> list[APIKey]:
195 """列出所有 Key(不含明文)。"""
196 return sorted(self._keys.values(), key=lambda k: k.created_at, reverse=True)
198 def get_key(self, key_id: str) -> Optional[APIKey]:
199 """获取单个 Key 信息。"""
200 return self._keys.get(key_id)
202 def stats(self) -> dict:
203 """Key 统计信息。"""
204 total = len(self._keys)
205 active = sum(1 for k in self._keys.values() if not k.revoked)
206 revoked = total - active
207 total_usage = sum(k.usage_count for k in self._keys.values())
208 return {
209 "total": total,
210 "active": active,
211 "revoked": revoked,
212 "total_usage_count": total_usage,
213 }
215 # ── 内部 ──
217 def _hash(self, plaintext: str) -> str:
218 """SHA-256 哈希(加盐)。"""
219 return hashlib.sha256(f"{self._secret_salt}:{plaintext}".encode()).hexdigest()