Coverage for src / kemi / api_keys.py: 100%
103 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
1"""API key management for multi-tenant FastAPI server.
3Stores hashed API keys in the same SQLite database as memories (table
4`api_keys`, schema version 8). The raw key is returned to the caller only
5once at creation time; only the SHA-256 hash is persisted.
7Key format
8----------
9Raw key: ``kemi_<43-char base32 token>`` (e.g. ``kemi_abc...xyz``).
10Key id: short, non-secret identifier shown in listings so an operator
11 can revoke a specific key without seeing the secret.
12Hash: ``sha256(raw_key).hexdigest()`` — 64 hex chars.
14Lookup is by hash, so listing endpoints can return key_id / name / user
15/ timestamps without leaking the secret. Expiry is checked at lookup
16time; expired keys are treated as invalid.
17"""
19from __future__ import annotations
21import hashlib
22import logging
23import secrets
24import sqlite3
25from dataclasses import dataclass
26from datetime import datetime, timedelta, timezone
27from typing import Any
29logger = logging.getLogger(__name__)
31KEY_PREFIX = "kemi_"
32KEY_ID_PREFIX = "kmi_" # short, non-secret identifier for listings
33_HASH_ALGO = "sha256"
34_RANDOM_BYTES = 32 # 256 bits of entropy
35_KEY_ID_BYTES = 4 # 8 hex chars — plenty for non-secret display IDs
38@dataclass
39class APIKey:
40 """In-memory representation of an api_keys row.
42 `raw_key` is populated only on creation, never loaded from storage.
43 """
45 key_id: str
46 user_id: str
47 name: str
48 created_at: str
49 expires_at: str | None
50 last_used_at: str | None
51 revoked_at: str | None
52 raw_key: str | None = None
54 def is_expired(self, now: datetime | None = None) -> bool:
55 if not self.expires_at:
56 return False
57 try:
58 expiry = datetime.fromisoformat(self.expires_at)
59 except ValueError:
60 return False
61 if expiry.tzinfo is None:
62 expiry = expiry.replace(tzinfo=timezone.utc)
63 return expiry <= (now or datetime.now(timezone.utc))
65 def is_active(self) -> bool:
66 return self.revoked_at is None and not self.is_expired()
68 def to_dict(self, include_secret: bool = False) -> dict[str, Any]:
69 d: dict[str, Any] = {
70 "key_id": self.key_id,
71 "user_id": self.user_id,
72 "name": self.name,
73 "created_at": self.created_at,
74 "expires_at": self.expires_at,
75 "last_used_at": self.last_used_at,
76 "revoked_at": self.revoked_at,
77 }
78 if include_secret and self.raw_key is not None:
79 d["api_key"] = self.raw_key
80 return d
83def _hash_key(raw_key: str) -> str:
84 return hashlib.sha256(raw_key.encode("utf-8")).hexdigest()
87def _generate_raw_key() -> str:
88 # 32 bytes → 43 base32 chars (no padding). Strip '=' for a clean token.
89 token = secrets.token_urlsafe(_RANDOM_BYTES).rstrip("=")
90 return f"{KEY_PREFIX}{token}"
93def _generate_key_id() -> str:
94 return f"{KEY_ID_PREFIX}{secrets.token_hex(_KEY_ID_BYTES)}"
97class APIKeyManager:
98 """CRUD + lookup for API keys backed by the `api_keys` table."""
100 def __init__(self, connection: sqlite3.Connection) -> None:
101 self._conn = connection
103 def create_key(
104 self,
105 user_id: str,
106 name: str,
107 expires_at: datetime | None = None,
108 ) -> APIKey:
109 """Create a new API key for a user.
111 Args:
112 user_id: Owner of the key.
113 name: Human-readable label (e.g. "laptop", "ci-runner").
114 expires_at: Optional expiry datetime (UTC). None = never expires.
116 Returns:
117 APIKey whose ``raw_key`` is set. Callers must surface the
118 ``raw_key`` to the user; it is unrecoverable afterwards.
119 """
120 if not user_id or not name:
121 raise ValueError("user_id and name are required")
123 raw_key = _generate_raw_key()
124 key_id = _generate_key_id()
125 hashed = _hash_key(raw_key)
126 now = datetime.now(timezone.utc).isoformat()
127 expires_iso = expires_at.isoformat() if expires_at else None
129 try:
130 self._conn.execute(
131 """
132 INSERT INTO api_keys
133 (key_id, user_id, hashed_key, name, created_at, expires_at)
134 VALUES (?, ?, ?, ?, ?, ?)
135 """,
136 (key_id, user_id, hashed, name, now, expires_iso),
137 )
138 self._conn.commit()
139 except sqlite3.IntegrityError as e:
140 # Collisions on key_id or hashed_key are astronomically unlikely,
141 # but surface a clear error rather than a generic IntegrityError.
142 raise RuntimeError(f"Failed to create API key: {e}") from e
144 logger.info("Created API key %s for user %s", key_id, user_id)
145 return APIKey(
146 key_id=key_id,
147 user_id=user_id,
148 name=name,
149 created_at=now,
150 expires_at=expires_iso,
151 last_used_at=None,
152 revoked_at=None,
153 raw_key=raw_key,
154 )
156 def lookup(self, raw_key: str) -> APIKey | None:
157 """Return the active APIKey for a raw key, or None.
159 Expired or revoked keys return None. The last_used_at timestamp
160 is updated in the background (best-effort; failures are logged
161 but do not affect the lookup result).
162 """
163 if not raw_key or not raw_key.startswith(KEY_PREFIX):
164 return None
165 hashed = _hash_key(raw_key)
166 row = self._conn.execute(
167 """
168 SELECT key_id, user_id, name, created_at, expires_at,
169 last_used_at, revoked_at
170 FROM api_keys
171 WHERE hashed_key = ?
172 """,
173 (hashed,),
174 ).fetchone()
176 if row is None:
177 return None
179 key = APIKey(
180 key_id=row["key_id"],
181 user_id=row["user_id"],
182 name=row["name"],
183 created_at=row["created_at"],
184 expires_at=row["expires_at"],
185 last_used_at=row["last_used_at"],
186 revoked_at=row["revoked_at"],
187 )
188 if not key.is_active():
189 return None
191 # Best-effort last_used_at update. Don't fail the request on error.
192 try:
193 self._conn.execute(
194 "UPDATE api_keys SET last_used_at = ? WHERE key_id = ?",
195 (datetime.now(timezone.utc).isoformat(), key.key_id),
196 )
197 self._conn.commit()
198 except sqlite3.Error as e: # pragma: no cover - defensive
199 logger.warning("Failed to update last_used_at for %s: %s", key.key_id, e)
201 return key
203 def list_keys(self, user_id: str | None = None) -> list[APIKey]:
204 """List API keys, optionally filtered by user.
206 Returned objects never contain the raw key.
207 """
208 if user_id is None:
209 rows = self._conn.execute(
210 """
211 SELECT key_id, user_id, name, created_at, expires_at,
212 last_used_at, revoked_at
213 FROM api_keys
214 ORDER BY created_at DESC
215 """
216 ).fetchall()
217 else:
218 rows = self._conn.execute(
219 """
220 SELECT key_id, user_id, name, created_at, expires_at,
221 last_used_at, revoked_at
222 FROM api_keys
223 WHERE user_id = ?
224 ORDER BY created_at DESC
225 """,
226 (user_id,),
227 ).fetchall()
229 return [
230 APIKey(
231 key_id=r["key_id"],
232 user_id=r["user_id"],
233 name=r["name"],
234 created_at=r["created_at"],
235 expires_at=r["expires_at"],
236 last_used_at=r["last_used_at"],
237 revoked_at=r["revoked_at"],
238 )
239 for r in rows
240 ]
242 def revoke(self, key_id: str) -> bool:
243 """Revoke a key by id. Returns True if a key was revoked."""
244 cursor = self._conn.execute(
245 """
246 UPDATE api_keys
247 SET revoked_at = ?
248 WHERE key_id = ? AND revoked_at IS NULL
249 """,
250 (datetime.now(timezone.utc).isoformat(), key_id),
251 )
252 self._conn.commit()
253 return cursor.rowcount > 0
255 def get(self, key_id: str) -> APIKey | None:
256 row = self._conn.execute(
257 """
258 SELECT key_id, user_id, name, created_at, expires_at,
259 last_used_at, revoked_at
260 FROM api_keys
261 WHERE key_id = ?
262 """,
263 (key_id,),
264 ).fetchone()
265 if row is None:
266 return None
267 return APIKey(
268 key_id=row["key_id"],
269 user_id=row["user_id"],
270 name=row["name"],
271 created_at=row["created_at"],
272 expires_at=row["expires_at"],
273 last_used_at=row["last_used_at"],
274 revoked_at=row["revoked_at"],
275 )
277 def cleanup_expired(self) -> int:
278 """Revoke keys whose expiry is in the past. Returns count revoked."""
279 now = datetime.now(timezone.utc).isoformat()
280 cursor = self._conn.execute(
281 """
282 UPDATE api_keys
283 SET revoked_at = ?
284 WHERE revoked_at IS NULL
285 AND expires_at IS NOT NULL
286 AND expires_at <= ?
287 """,
288 (now, now),
289 )
290 self._conn.commit()
291 return cursor.rowcount
294def make_expiry(days: int | None) -> datetime | None:
295 """Helper to convert a 'days from now' hint into a UTC datetime."""
296 if days is None:
297 return None
298 return datetime.now(timezone.utc) + timedelta(days=days)