Coverage for src / kemi / encryption.py: 82%
182 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
1"""
2Encryption layer for kemi storage adapters.
4Supports two encryption approaches:
5- Approach A: SQLCipher full-database encryption for SQLite
6- Approach B: Fernet field-level encryption for all adapters (content + metadata)
8Key management:
9- SQLCipher: key loaded from --key-file path passed to init
10- Fernet: key loaded from KEMI_ENCRYPTION_KEY env var or --key-file path
11"""
13from __future__ import annotations
15import base64
16import json
17import os
18from pathlib import Path
19from typing import Any
21__all__ = [
22 "FernetEncryptor",
23 "SQLCipherManager",
24 "EncryptionConfig",
25 "FieldEncryptor",
26]
29class EncryptionConfig:
30 """Configuration for encryption. Passed to storage adapters."""
32 def __init__(
33 self,
34 enabled: bool = False,
35 mode: str = "fernet", # "fernet" or "sqlcipher"
36 key: str | None = None,
37 key_file: str | None = None,
38 key_id: str | None = None,
39 encrypt_user_id: bool = False,
40 encrypt_session_id: bool = False,
41 ) -> None:
42 self.enabled = enabled
43 self.mode = mode # "fernet" or "sqlcipher"
44 self._key = key or ""
45 self.key_file = key_file
46 self.key_id = key_id or "default"
47 self.encrypt_user_id = encrypt_user_id
48 self.encrypt_session_id = encrypt_session_id
50 @classmethod
51 def from_env(cls) -> "EncryptionConfig":
52 """Load encryption config from environment variables."""
53 enabled = os.environ.get("KEMI_ENCRYPTION_ENABLED", "").lower() in ("1", "true", "yes")
54 mode = os.environ.get("KEMI_ENCRYPTION_MODE", "fernet")
55 key = os.environ.get("KEMI_ENCRYPTION_KEY", "")
56 key_id = os.environ.get("KEMI_ENCRYPTION_KEY_ID", "default")
57 return cls(enabled=enabled, mode=mode, key=key, key_id=key_id)
59 @classmethod
60 def from_key_file(cls, path: str, key_id: str | None = None) -> "EncryptionConfig":
61 """Load encryption config from a key file."""
62 key = load_key_from_file(path)
63 kid = key_id if key_id is not None else "default"
64 return cls(enabled=True, mode="fernet", key=key, key_file=path, key_id=kid)
66 @property
67 def key(self) -> str:
68 if self._key:
69 return self._key
70 if self.key_file:
71 return load_key_from_file(self.key_file)
72 raise ValueError("No encryption key configured. Set KEMI_ENCRYPTION_KEY env var or pass --key-file")
75def load_key_from_file(path: str) -> str:
76 """Load encryption key from a file."""
77 p = Path(path).expanduser()
78 if not p.exists():
79 raise FileNotFoundError(f"Key file not found: {path}")
80 return p.read_text().strip()
83def generate_key(path: str | None = None) -> str:
84 """Generate a new Fernet-compatible encryption key.
86 Uses Fernet.generate_key() which produces a 128-bit URL-safe base64-encoded
87 key (43 bytes). If path is provided, write the key to that file.
88 Returns the key as a string.
89 """
90 try:
91 from cryptography.fernet import Fernet
92 except ImportError as e:
93 raise ImportError(
94 "cryptography package required for key generation. "
95 "Install with: pip install kemi[encryption] or pip install cryptography"
96 ) from e
98 key = Fernet.generate_key().decode("utf-8")
99 if path:
100 p = Path(path).expanduser()
101 p.write_text(key + "\n")
102 return key
105# ---------------------------------------------------------------------------
106# Fernet field-level encryption
107# ---------------------------------------------------------------------------
109class FernetEncryptor:
110 """Fernet symmetric encryption for field-level data protection.
112 Fernet is a standard symmetric encryption method (AES-128-CBC with HMAC).
113 Encrypts arbitrary bytes and encodes them as URL-safe base64.
114 """
116 def __init__(self, key: str) -> None:
117 try:
118 from cryptography.fernet import Fernet
119 except ImportError as e:
120 raise ImportError(
121 "cryptography package required for Fernet encryption. "
122 "Install with: pip install kemi[encryption] or pip install cryptography"
123 ) from e
125 # Derive a valid Fernet key using SHA-256 hash.
126 # Fernet keys are 128-bit, base64-urlsafe encoded (43 bytes).
127 # SHA-256 produces 32 bytes; we base64-encode directly to get a valid key.
128 import hashlib
130 digest = hashlib.sha256(key.encode("utf-8")).digest()
131 fernet_key = base64.urlsafe_b64encode(digest)
132 self._fernet = Fernet(fernet_key)
134 def encrypt(self, data: str | bytes) -> str:
135 """Encrypt data, return base64-encoded ciphertext."""
136 if isinstance(data, str):
137 data = data.encode("utf-8")
138 result = self._fernet.encrypt(data)
139 # cryptography Fernet.encrypt() returns bytes on some versions,
140 # str on others — normalize to string
141 if isinstance(result, bytes):
142 result = result.decode("utf-8")
143 return result
145 def decrypt(self, ciphertext: str | bytes) -> bytes:
146 """Decrypt base64-encoded ciphertext, return raw bytes."""
147 if isinstance(ciphertext, str):
148 ciphertext = ciphertext.encode("utf-8")
149 return self._fernet.decrypt(ciphertext)
151 def decrypt_str(self, ciphertext: str) -> str:
152 """Decrypt ciphertext, return as string."""
153 return self.decrypt(ciphertext).decode("utf-8")
156class FieldEncryptor:
157 """Encrypts and decrypts specific memory fields.
159 Encrypted fields are stored as JSON-serialized objects:
160 {"encrypted": true, "key_id": "...", "data": "...base64 ciphertext..."}
162 Fields encrypted by default: content, metadata (JSON fields with sensitive data).
163 Optionally encrypts: user_id, session_id.
164 """
166 ENCRYPTED_PREFIX = {"encrypted": True}
168 def __init__(
169 self,
170 config: EncryptionConfig,
171 encrypt_fields: list[str] | None = None,
172 encrypt_user_id: bool | None = None,
173 encrypt_session_id: bool | None = None,
174 ) -> None:
175 if not config.enabled:
176 self._fernet: FernetEncryptor | None = None
177 self._encrypt_fields: frozenset[str] = frozenset()
178 self._encrypt_user_id = False
179 self._encrypt_session_id = False
180 return
182 self._fernet = FernetEncryptor(config.key)
183 self._encrypt_fields = frozenset(encrypt_fields or ["content", "metadata"])
184 # Read from config when params are not explicitly set (default to None)
185 self._encrypt_user_id = encrypt_user_id if encrypt_user_id is not None else getattr(config, "encrypt_user_id", False)
186 self._encrypt_session_id = encrypt_session_id if encrypt_session_id is not None else getattr(config, "encrypt_session_id", False)
187 self._key_id = config.key_id
189 @property
190 def is_enabled(self) -> bool:
191 return self._fernet is not None
193 def _encrypt_value(self, value: Any) -> dict[str, Any]:
194 """Encrypt a value and return an encrypted envelope dict."""
195 json_bytes = json.dumps(value).encode("utf-8")
196 encrypted = self._fernet.encrypt(json_bytes)
197 if isinstance(encrypted, bytes):
198 encrypted = encrypted.decode("utf-8")
199 return {
200 **self.ENCRYPTED_PREFIX,
201 "key_id": self._key_id,
202 "data": encrypted,
203 }
205 def encrypt_field(self, field_name: str, value: Any) -> Any:
206 """Encrypt a field value if encryption is enabled for it.
208 Handles both standard encryptable fields (content, metadata, etc.)
209 and optional extra fields like user_id and session_id.
210 """
211 if not self.is_enabled:
212 return value
213 if value is None:
214 return None
216 # Check standard encryptable fields
217 if field_name in self._encrypt_fields:
218 return self._encrypt_value(value)
220 # Check optional extra fields (user_id, session_id)
221 if field_name == "user_id" and self._encrypt_user_id:
222 return self._encrypt_value(value)
223 if field_name == "session_id" and self._encrypt_session_id:
224 return self._encrypt_value(value)
226 return value
228 def decrypt_field(self, field_name: str, value: Any) -> Any:
229 """Decrypt a field value if it's an encrypted blob."""
230 if not self.is_enabled:
231 return value
232 if not self._is_encrypted(value):
233 return value
235 ciphertext = value.get("data", "")
236 decrypted_bytes = self._fernet.decrypt(ciphertext)
237 return json.loads(decrypted_bytes.decode("utf-8"))
239 def _is_encrypted(self, value: Any) -> bool:
240 return isinstance(value, dict) and value.get("encrypted") is True
242 def encrypt_memory_row(self, row: dict[str, Any]) -> dict[str, Any]:
243 """Encrypt relevant fields in a memory row dict (before storage)."""
244 if not self.is_enabled:
245 return row
247 result = dict(row)
248 for field in self._encrypt_fields:
249 if field in result and result[field] is not None:
250 result[field] = self.encrypt_field(field, result[field])
252 if self._encrypt_user_id and "user_id" in result:
253 result["user_id"] = self.encrypt_field("user_id", result["user_id"])
254 if self._encrypt_session_id and "session_id" in result:
255 result["session_id"] = self.encrypt_field("session_id", result["session_id"])
257 return result
259 def decrypt_memory_row(self, row: dict[str, Any]) -> dict[str, Any]:
260 """Decrypt relevant fields in a memory row dict (after retrieval)."""
261 if not self.is_enabled:
262 return row
264 result = dict(row)
265 for field in self._encrypt_fields:
266 if field in result and self._is_encrypted(result[field]):
267 result[field] = self.decrypt_field(field, result[field])
269 if self._encrypt_user_id and "user_id" in result and self._is_encrypted(result["user_id"]):
270 result["user_id"] = self.decrypt_field("user_id", result["user_id"])
271 if self._encrypt_session_id and "session_id" in result and self._is_encrypted(result["session_id"]):
272 result["session_id"] = self.decrypt_field("session_id", result["session_id"])
274 return result
277# ---------------------------------------------------------------------------
278# SQLCipher full-database encryption
279# ---------------------------------------------------------------------------
281class SQLCipherManager:
282 """Manages SQLCipher connection configuration for SQLiteStorageAdapter.
284 SQLCipher provides full-database AES-256 encryption at the SQLite level.
285 The encryption is transparent to the application — SQL operations remain
286 the same, but all data at rest is encrypted.
288 Usage:
289 manager = SQLCipherManager(key_file="/path/to/key")
290 conn = manager.connect("kemi.db")
291 # Use conn normally — all data is encrypted
292 """
294 def __init__(self, key: str | None = None, key_file: str | None = None) -> None:
295 if key is None and key_file is None:
296 raise ValueError("SQLCipher requires a key (key= or key_file=)")
297 if key_file:
298 key = load_key_from_file(key_file)
299 self._key = key
301 @property
302 def key(self) -> str:
303 return self._key
305 def configure_connection(self, conn: Any) -> None:
306 """Apply SQLCipher PRAGMAs to an existing sqlite3 connection.
308 Must be called AFTER sqlite3.connect() but BEFORE any SQL operations.
309 Sets the encryption key and cipher configuration.
311 Uses hex-formatted key via PRAGMA key = "x'...'" to prevent any
312 special-character issues in the PRAGMA value.
313 """
314 try:
315 import sqlcipher3
316 except ImportError as e:
317 raise ImportError(
318 "sqlcipher3 package required for SQLCipher encryption. "
319 "Install with: pip install kemi[sqlcipher] or pip install sqlcipher3"
320 ) from e
322 # Use hex-encoded key for safe PRAGMA key assignment without
323 # any special-character injection risk.
324 hex_key = self._key.encode("utf-8").hex()
325 conn.execute('PRAGMA key = "x\'%s\'"' % hex_key)
326 # Configure cipher settings for best security/compatibility
327 conn.execute("PRAGMA cipher_page_size = 4096")
328 conn.execute("PRAGMA kdf_iter = 256000")
329 conn.execute("PRAGMA cipher_memory_security = ON")
331 def connect(self, db_path: str) -> Any:
332 """Create and return a SQLCipher-encrypted sqlite3 connection."""
333 try:
334 import sqlcipher3
335 except ImportError as e:
336 raise ImportError(
337 "sqlcipher3 package required for SQLCipher encryption. "
338 "Install with: pip install kemi[sqlcipher] or pip install sqlcipher3"
339 ) from e
341 conn = sqlcipher3.connect(db_path)
342 self.configure_connection(conn)
343 return conn
346def is_sqlcipher_available() -> bool:
347 """Check if sqlcipher3 is installed and functional."""
348 try:
349 import sqlcipher3 # type: ignore[import]
350 return True
351 except ImportError:
352 return False
355def is_cryptography_available() -> bool:
356 """Check if cryptography (Fernet) is installed and functional."""
357 try:
358 from cryptography.fernet import Fernet # type: ignore[import]
359 return True
360 except ImportError:
361 return False