Coverage for src / kemi / api_keys.py: 100%

103 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-05 15:47 +0000

1"""API key management for multi-tenant FastAPI server. 

2 

3Stores hashed API keys in the same SQLite database as memories (table 

4`api_keys`, schema version 8). The raw key is returned to the caller only 

5once at creation time; only the SHA-256 hash is persisted. 

6 

7Key format 

8---------- 

9Raw key: ``kemi_<43-char base32 token>`` (e.g. ``kemi_abc...xyz``). 

10Key id: short, non-secret identifier shown in listings so an operator 

11 can revoke a specific key without seeing the secret. 

12Hash: ``sha256(raw_key).hexdigest()`` — 64 hex chars. 

13 

14Lookup is by hash, so listing endpoints can return key_id / name / user 

15/ timestamps without leaking the secret. Expiry is checked at lookup 

16time; expired keys are treated as invalid. 

17""" 

18 

19from __future__ import annotations 

20 

21import hashlib 

22import logging 

23import secrets 

24import sqlite3 

25from dataclasses import dataclass 

26from datetime import datetime, timedelta, timezone 

27from typing import Any 

28 

29logger = logging.getLogger(__name__) 

30 

31KEY_PREFIX = "kemi_" 

32KEY_ID_PREFIX = "kmi_" # short, non-secret identifier for listings 

33_HASH_ALGO = "sha256" 

34_RANDOM_BYTES = 32 # 256 bits of entropy 

35_KEY_ID_BYTES = 4 # 8 hex chars — plenty for non-secret display IDs 

36 

37 

38@dataclass 

39class APIKey: 

40 """In-memory representation of an api_keys row. 

41 

42 `raw_key` is populated only on creation, never loaded from storage. 

43 """ 

44 

45 key_id: str 

46 user_id: str 

47 name: str 

48 created_at: str 

49 expires_at: str | None 

50 last_used_at: str | None 

51 revoked_at: str | None 

52 raw_key: str | None = None 

53 

54 def is_expired(self, now: datetime | None = None) -> bool: 

55 if not self.expires_at: 

56 return False 

57 try: 

58 expiry = datetime.fromisoformat(self.expires_at) 

59 except ValueError: 

60 return False 

61 if expiry.tzinfo is None: 

62 expiry = expiry.replace(tzinfo=timezone.utc) 

63 return expiry <= (now or datetime.now(timezone.utc)) 

64 

65 def is_active(self) -> bool: 

66 return self.revoked_at is None and not self.is_expired() 

67 

68 def to_dict(self, include_secret: bool = False) -> dict[str, Any]: 

69 d: dict[str, Any] = { 

70 "key_id": self.key_id, 

71 "user_id": self.user_id, 

72 "name": self.name, 

73 "created_at": self.created_at, 

74 "expires_at": self.expires_at, 

75 "last_used_at": self.last_used_at, 

76 "revoked_at": self.revoked_at, 

77 } 

78 if include_secret and self.raw_key is not None: 

79 d["api_key"] = self.raw_key 

80 return d 

81 

82 

83def _hash_key(raw_key: str) -> str: 

84 return hashlib.sha256(raw_key.encode("utf-8")).hexdigest() 

85 

86 

87def _generate_raw_key() -> str: 

88 # 32 bytes → 43 base32 chars (no padding). Strip '=' for a clean token. 

89 token = secrets.token_urlsafe(_RANDOM_BYTES).rstrip("=") 

90 return f"{KEY_PREFIX}{token}" 

91 

92 

93def _generate_key_id() -> str: 

94 return f"{KEY_ID_PREFIX}{secrets.token_hex(_KEY_ID_BYTES)}" 

95 

96 

97class APIKeyManager: 

98 """CRUD + lookup for API keys backed by the `api_keys` table.""" 

99 

100 def __init__(self, connection: sqlite3.Connection) -> None: 

101 self._conn = connection 

102 

103 def create_key( 

104 self, 

105 user_id: str, 

106 name: str, 

107 expires_at: datetime | None = None, 

108 ) -> APIKey: 

109 """Create a new API key for a user. 

110 

111 Args: 

112 user_id: Owner of the key. 

113 name: Human-readable label (e.g. "laptop", "ci-runner"). 

114 expires_at: Optional expiry datetime (UTC). None = never expires. 

115 

116 Returns: 

117 APIKey whose ``raw_key`` is set. Callers must surface the 

118 ``raw_key`` to the user; it is unrecoverable afterwards. 

119 """ 

120 if not user_id or not name: 

121 raise ValueError("user_id and name are required") 

122 

123 raw_key = _generate_raw_key() 

124 key_id = _generate_key_id() 

125 hashed = _hash_key(raw_key) 

126 now = datetime.now(timezone.utc).isoformat() 

127 expires_iso = expires_at.isoformat() if expires_at else None 

128 

129 try: 

130 self._conn.execute( 

131 """ 

132 INSERT INTO api_keys 

133 (key_id, user_id, hashed_key, name, created_at, expires_at) 

134 VALUES (?, ?, ?, ?, ?, ?) 

135 """, 

136 (key_id, user_id, hashed, name, now, expires_iso), 

137 ) 

138 self._conn.commit() 

139 except sqlite3.IntegrityError as e: 

140 # Collisions on key_id or hashed_key are astronomically unlikely, 

141 # but surface a clear error rather than a generic IntegrityError. 

142 raise RuntimeError(f"Failed to create API key: {e}") from e 

143 

144 logger.info("Created API key %s for user %s", key_id, user_id) 

145 return APIKey( 

146 key_id=key_id, 

147 user_id=user_id, 

148 name=name, 

149 created_at=now, 

150 expires_at=expires_iso, 

151 last_used_at=None, 

152 revoked_at=None, 

153 raw_key=raw_key, 

154 ) 

155 

156 def lookup(self, raw_key: str) -> APIKey | None: 

157 """Return the active APIKey for a raw key, or None. 

158 

159 Expired or revoked keys return None. The last_used_at timestamp 

160 is updated in the background (best-effort; failures are logged 

161 but do not affect the lookup result). 

162 """ 

163 if not raw_key or not raw_key.startswith(KEY_PREFIX): 

164 return None 

165 hashed = _hash_key(raw_key) 

166 row = self._conn.execute( 

167 """ 

168 SELECT key_id, user_id, name, created_at, expires_at, 

169 last_used_at, revoked_at 

170 FROM api_keys 

171 WHERE hashed_key = ? 

172 """, 

173 (hashed,), 

174 ).fetchone() 

175 

176 if row is None: 

177 return None 

178 

179 key = APIKey( 

180 key_id=row["key_id"], 

181 user_id=row["user_id"], 

182 name=row["name"], 

183 created_at=row["created_at"], 

184 expires_at=row["expires_at"], 

185 last_used_at=row["last_used_at"], 

186 revoked_at=row["revoked_at"], 

187 ) 

188 if not key.is_active(): 

189 return None 

190 

191 # Best-effort last_used_at update. Don't fail the request on error. 

192 try: 

193 self._conn.execute( 

194 "UPDATE api_keys SET last_used_at = ? WHERE key_id = ?", 

195 (datetime.now(timezone.utc).isoformat(), key.key_id), 

196 ) 

197 self._conn.commit() 

198 except sqlite3.Error as e: # pragma: no cover - defensive 

199 logger.warning("Failed to update last_used_at for %s: %s", key.key_id, e) 

200 

201 return key 

202 

203 def list_keys(self, user_id: str | None = None) -> list[APIKey]: 

204 """List API keys, optionally filtered by user. 

205 

206 Returned objects never contain the raw key. 

207 """ 

208 if user_id is None: 

209 rows = self._conn.execute( 

210 """ 

211 SELECT key_id, user_id, name, created_at, expires_at, 

212 last_used_at, revoked_at 

213 FROM api_keys 

214 ORDER BY created_at DESC 

215 """ 

216 ).fetchall() 

217 else: 

218 rows = self._conn.execute( 

219 """ 

220 SELECT key_id, user_id, name, created_at, expires_at, 

221 last_used_at, revoked_at 

222 FROM api_keys 

223 WHERE user_id = ? 

224 ORDER BY created_at DESC 

225 """, 

226 (user_id,), 

227 ).fetchall() 

228 

229 return [ 

230 APIKey( 

231 key_id=r["key_id"], 

232 user_id=r["user_id"], 

233 name=r["name"], 

234 created_at=r["created_at"], 

235 expires_at=r["expires_at"], 

236 last_used_at=r["last_used_at"], 

237 revoked_at=r["revoked_at"], 

238 ) 

239 for r in rows 

240 ] 

241 

242 def revoke(self, key_id: str) -> bool: 

243 """Revoke a key by id. Returns True if a key was revoked.""" 

244 cursor = self._conn.execute( 

245 """ 

246 UPDATE api_keys 

247 SET revoked_at = ? 

248 WHERE key_id = ? AND revoked_at IS NULL 

249 """, 

250 (datetime.now(timezone.utc).isoformat(), key_id), 

251 ) 

252 self._conn.commit() 

253 return cursor.rowcount > 0 

254 

255 def get(self, key_id: str) -> APIKey | None: 

256 row = self._conn.execute( 

257 """ 

258 SELECT key_id, user_id, name, created_at, expires_at, 

259 last_used_at, revoked_at 

260 FROM api_keys 

261 WHERE key_id = ? 

262 """, 

263 (key_id,), 

264 ).fetchone() 

265 if row is None: 

266 return None 

267 return APIKey( 

268 key_id=row["key_id"], 

269 user_id=row["user_id"], 

270 name=row["name"], 

271 created_at=row["created_at"], 

272 expires_at=row["expires_at"], 

273 last_used_at=row["last_used_at"], 

274 revoked_at=row["revoked_at"], 

275 ) 

276 

277 def cleanup_expired(self) -> int: 

278 """Revoke keys whose expiry is in the past. Returns count revoked.""" 

279 now = datetime.now(timezone.utc).isoformat() 

280 cursor = self._conn.execute( 

281 """ 

282 UPDATE api_keys 

283 SET revoked_at = ? 

284 WHERE revoked_at IS NULL 

285 AND expires_at IS NOT NULL 

286 AND expires_at <= ? 

287 """, 

288 (now, now), 

289 ) 

290 self._conn.commit() 

291 return cursor.rowcount 

292 

293 

294def make_expiry(days: int | None) -> datetime | None: 

295 """Helper to convert a 'days from now' hint into a UTC datetime.""" 

296 if days is None: 

297 return None 

298 return datetime.now(timezone.utc) + timedelta(days=days)