Coverage for src / kemi / encryption.py: 82%

182 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-05 15:47 +0000

1""" 

2Encryption layer for kemi storage adapters. 

3 

4Supports two encryption approaches: 

5- Approach A: SQLCipher full-database encryption for SQLite 

6- Approach B: Fernet field-level encryption for all adapters (content + metadata) 

7 

8Key management: 

9- SQLCipher: key loaded from --key-file path passed to init 

10- Fernet: key loaded from KEMI_ENCRYPTION_KEY env var or --key-file path 

11""" 

12 

13from __future__ import annotations 

14 

15import base64 

16import json 

17import os 

18from pathlib import Path 

19from typing import Any 

20 

21__all__ = [ 

22 "FernetEncryptor", 

23 "SQLCipherManager", 

24 "EncryptionConfig", 

25 "FieldEncryptor", 

26] 

27 

28 

29class EncryptionConfig: 

30 """Configuration for encryption. Passed to storage adapters.""" 

31 

32 def __init__( 

33 self, 

34 enabled: bool = False, 

35 mode: str = "fernet", # "fernet" or "sqlcipher" 

36 key: str | None = None, 

37 key_file: str | None = None, 

38 key_id: str | None = None, 

39 encrypt_user_id: bool = False, 

40 encrypt_session_id: bool = False, 

41 ) -> None: 

42 self.enabled = enabled 

43 self.mode = mode # "fernet" or "sqlcipher" 

44 self._key = key or "" 

45 self.key_file = key_file 

46 self.key_id = key_id or "default" 

47 self.encrypt_user_id = encrypt_user_id 

48 self.encrypt_session_id = encrypt_session_id 

49 

50 @classmethod 

51 def from_env(cls) -> "EncryptionConfig": 

52 """Load encryption config from environment variables.""" 

53 enabled = os.environ.get("KEMI_ENCRYPTION_ENABLED", "").lower() in ("1", "true", "yes") 

54 mode = os.environ.get("KEMI_ENCRYPTION_MODE", "fernet") 

55 key = os.environ.get("KEMI_ENCRYPTION_KEY", "") 

56 key_id = os.environ.get("KEMI_ENCRYPTION_KEY_ID", "default") 

57 return cls(enabled=enabled, mode=mode, key=key, key_id=key_id) 

58 

59 @classmethod 

60 def from_key_file(cls, path: str, key_id: str | None = None) -> "EncryptionConfig": 

61 """Load encryption config from a key file.""" 

62 key = load_key_from_file(path) 

63 kid = key_id if key_id is not None else "default" 

64 return cls(enabled=True, mode="fernet", key=key, key_file=path, key_id=kid) 

65 

66 @property 

67 def key(self) -> str: 

68 if self._key: 

69 return self._key 

70 if self.key_file: 

71 return load_key_from_file(self.key_file) 

72 raise ValueError("No encryption key configured. Set KEMI_ENCRYPTION_KEY env var or pass --key-file") 

73 

74 

75def load_key_from_file(path: str) -> str: 

76 """Load encryption key from a file.""" 

77 p = Path(path).expanduser() 

78 if not p.exists(): 

79 raise FileNotFoundError(f"Key file not found: {path}") 

80 return p.read_text().strip() 

81 

82 

83def generate_key(path: str | None = None) -> str: 

84 """Generate a new Fernet-compatible encryption key. 

85 

86 Uses Fernet.generate_key() which produces a 128-bit URL-safe base64-encoded 

87 key (43 bytes). If path is provided, write the key to that file. 

88 Returns the key as a string. 

89 """ 

90 try: 

91 from cryptography.fernet import Fernet 

92 except ImportError as e: 

93 raise ImportError( 

94 "cryptography package required for key generation. " 

95 "Install with: pip install kemi[encryption] or pip install cryptography" 

96 ) from e 

97 

98 key = Fernet.generate_key().decode("utf-8") 

99 if path: 

100 p = Path(path).expanduser() 

101 p.write_text(key + "\n") 

102 return key 

103 

104 

105# --------------------------------------------------------------------------- 

106# Fernet field-level encryption 

107# --------------------------------------------------------------------------- 

108 

109class FernetEncryptor: 

110 """Fernet symmetric encryption for field-level data protection. 

111 

112 Fernet is a standard symmetric encryption method (AES-128-CBC with HMAC). 

113 Encrypts arbitrary bytes and encodes them as URL-safe base64. 

114 """ 

115 

116 def __init__(self, key: str) -> None: 

117 try: 

118 from cryptography.fernet import Fernet 

119 except ImportError as e: 

120 raise ImportError( 

121 "cryptography package required for Fernet encryption. " 

122 "Install with: pip install kemi[encryption] or pip install cryptography" 

123 ) from e 

124 

125 # Derive a valid Fernet key using SHA-256 hash. 

126 # Fernet keys are 128-bit, base64-urlsafe encoded (43 bytes). 

127 # SHA-256 produces 32 bytes; we base64-encode directly to get a valid key. 

128 import hashlib 

129 

130 digest = hashlib.sha256(key.encode("utf-8")).digest() 

131 fernet_key = base64.urlsafe_b64encode(digest) 

132 self._fernet = Fernet(fernet_key) 

133 

134 def encrypt(self, data: str | bytes) -> str: 

135 """Encrypt data, return base64-encoded ciphertext.""" 

136 if isinstance(data, str): 

137 data = data.encode("utf-8") 

138 result = self._fernet.encrypt(data) 

139 # cryptography Fernet.encrypt() returns bytes on some versions, 

140 # str on others — normalize to string 

141 if isinstance(result, bytes): 

142 result = result.decode("utf-8") 

143 return result 

144 

145 def decrypt(self, ciphertext: str | bytes) -> bytes: 

146 """Decrypt base64-encoded ciphertext, return raw bytes.""" 

147 if isinstance(ciphertext, str): 

148 ciphertext = ciphertext.encode("utf-8") 

149 return self._fernet.decrypt(ciphertext) 

150 

151 def decrypt_str(self, ciphertext: str) -> str: 

152 """Decrypt ciphertext, return as string.""" 

153 return self.decrypt(ciphertext).decode("utf-8") 

154 

155 

156class FieldEncryptor: 

157 """Encrypts and decrypts specific memory fields. 

158 

159 Encrypted fields are stored as JSON-serialized objects: 

160 {"encrypted": true, "key_id": "...", "data": "...base64 ciphertext..."} 

161 

162 Fields encrypted by default: content, metadata (JSON fields with sensitive data). 

163 Optionally encrypts: user_id, session_id. 

164 """ 

165 

166 ENCRYPTED_PREFIX = {"encrypted": True} 

167 

168 def __init__( 

169 self, 

170 config: EncryptionConfig, 

171 encrypt_fields: list[str] | None = None, 

172 encrypt_user_id: bool | None = None, 

173 encrypt_session_id: bool | None = None, 

174 ) -> None: 

175 if not config.enabled: 

176 self._fernet: FernetEncryptor | None = None 

177 self._encrypt_fields: frozenset[str] = frozenset() 

178 self._encrypt_user_id = False 

179 self._encrypt_session_id = False 

180 return 

181 

182 self._fernet = FernetEncryptor(config.key) 

183 self._encrypt_fields = frozenset(encrypt_fields or ["content", "metadata"]) 

184 # Read from config when params are not explicitly set (default to None) 

185 self._encrypt_user_id = encrypt_user_id if encrypt_user_id is not None else getattr(config, "encrypt_user_id", False) 

186 self._encrypt_session_id = encrypt_session_id if encrypt_session_id is not None else getattr(config, "encrypt_session_id", False) 

187 self._key_id = config.key_id 

188 

189 @property 

190 def is_enabled(self) -> bool: 

191 return self._fernet is not None 

192 

193 def _encrypt_value(self, value: Any) -> dict[str, Any]: 

194 """Encrypt a value and return an encrypted envelope dict.""" 

195 json_bytes = json.dumps(value).encode("utf-8") 

196 encrypted = self._fernet.encrypt(json_bytes) 

197 if isinstance(encrypted, bytes): 

198 encrypted = encrypted.decode("utf-8") 

199 return { 

200 **self.ENCRYPTED_PREFIX, 

201 "key_id": self._key_id, 

202 "data": encrypted, 

203 } 

204 

205 def encrypt_field(self, field_name: str, value: Any) -> Any: 

206 """Encrypt a field value if encryption is enabled for it. 

207 

208 Handles both standard encryptable fields (content, metadata, etc.) 

209 and optional extra fields like user_id and session_id. 

210 """ 

211 if not self.is_enabled: 

212 return value 

213 if value is None: 

214 return None 

215 

216 # Check standard encryptable fields 

217 if field_name in self._encrypt_fields: 

218 return self._encrypt_value(value) 

219 

220 # Check optional extra fields (user_id, session_id) 

221 if field_name == "user_id" and self._encrypt_user_id: 

222 return self._encrypt_value(value) 

223 if field_name == "session_id" and self._encrypt_session_id: 

224 return self._encrypt_value(value) 

225 

226 return value 

227 

228 def decrypt_field(self, field_name: str, value: Any) -> Any: 

229 """Decrypt a field value if it's an encrypted blob.""" 

230 if not self.is_enabled: 

231 return value 

232 if not self._is_encrypted(value): 

233 return value 

234 

235 ciphertext = value.get("data", "") 

236 decrypted_bytes = self._fernet.decrypt(ciphertext) 

237 return json.loads(decrypted_bytes.decode("utf-8")) 

238 

239 def _is_encrypted(self, value: Any) -> bool: 

240 return isinstance(value, dict) and value.get("encrypted") is True 

241 

242 def encrypt_memory_row(self, row: dict[str, Any]) -> dict[str, Any]: 

243 """Encrypt relevant fields in a memory row dict (before storage).""" 

244 if not self.is_enabled: 

245 return row 

246 

247 result = dict(row) 

248 for field in self._encrypt_fields: 

249 if field in result and result[field] is not None: 

250 result[field] = self.encrypt_field(field, result[field]) 

251 

252 if self._encrypt_user_id and "user_id" in result: 

253 result["user_id"] = self.encrypt_field("user_id", result["user_id"]) 

254 if self._encrypt_session_id and "session_id" in result: 

255 result["session_id"] = self.encrypt_field("session_id", result["session_id"]) 

256 

257 return result 

258 

259 def decrypt_memory_row(self, row: dict[str, Any]) -> dict[str, Any]: 

260 """Decrypt relevant fields in a memory row dict (after retrieval).""" 

261 if not self.is_enabled: 

262 return row 

263 

264 result = dict(row) 

265 for field in self._encrypt_fields: 

266 if field in result and self._is_encrypted(result[field]): 

267 result[field] = self.decrypt_field(field, result[field]) 

268 

269 if self._encrypt_user_id and "user_id" in result and self._is_encrypted(result["user_id"]): 

270 result["user_id"] = self.decrypt_field("user_id", result["user_id"]) 

271 if self._encrypt_session_id and "session_id" in result and self._is_encrypted(result["session_id"]): 

272 result["session_id"] = self.decrypt_field("session_id", result["session_id"]) 

273 

274 return result 

275 

276 

277# --------------------------------------------------------------------------- 

278# SQLCipher full-database encryption 

279# --------------------------------------------------------------------------- 

280 

281class SQLCipherManager: 

282 """Manages SQLCipher connection configuration for SQLiteStorageAdapter. 

283 

284 SQLCipher provides full-database AES-256 encryption at the SQLite level. 

285 The encryption is transparent to the application — SQL operations remain 

286 the same, but all data at rest is encrypted. 

287 

288 Usage: 

289 manager = SQLCipherManager(key_file="/path/to/key") 

290 conn = manager.connect("kemi.db") 

291 # Use conn normally — all data is encrypted 

292 """ 

293 

294 def __init__(self, key: str | None = None, key_file: str | None = None) -> None: 

295 if key is None and key_file is None: 

296 raise ValueError("SQLCipher requires a key (key= or key_file=)") 

297 if key_file: 

298 key = load_key_from_file(key_file) 

299 self._key = key 

300 

301 @property 

302 def key(self) -> str: 

303 return self._key 

304 

305 def configure_connection(self, conn: Any) -> None: 

306 """Apply SQLCipher PRAGMAs to an existing sqlite3 connection. 

307 

308 Must be called AFTER sqlite3.connect() but BEFORE any SQL operations. 

309 Sets the encryption key and cipher configuration. 

310 

311 Uses hex-formatted key via PRAGMA key = "x'...'" to prevent any 

312 special-character issues in the PRAGMA value. 

313 """ 

314 try: 

315 import sqlcipher3 

316 except ImportError as e: 

317 raise ImportError( 

318 "sqlcipher3 package required for SQLCipher encryption. " 

319 "Install with: pip install kemi[sqlcipher] or pip install sqlcipher3" 

320 ) from e 

321 

322 # Use hex-encoded key for safe PRAGMA key assignment without 

323 # any special-character injection risk. 

324 hex_key = self._key.encode("utf-8").hex() 

325 conn.execute('PRAGMA key = "x\'%s\'"' % hex_key) 

326 # Configure cipher settings for best security/compatibility 

327 conn.execute("PRAGMA cipher_page_size = 4096") 

328 conn.execute("PRAGMA kdf_iter = 256000") 

329 conn.execute("PRAGMA cipher_memory_security = ON") 

330 

331 def connect(self, db_path: str) -> Any: 

332 """Create and return a SQLCipher-encrypted sqlite3 connection.""" 

333 try: 

334 import sqlcipher3 

335 except ImportError as e: 

336 raise ImportError( 

337 "sqlcipher3 package required for SQLCipher encryption. " 

338 "Install with: pip install kemi[sqlcipher] or pip install sqlcipher3" 

339 ) from e 

340 

341 conn = sqlcipher3.connect(db_path) 

342 self.configure_connection(conn) 

343 return conn 

344 

345 

346def is_sqlcipher_available() -> bool: 

347 """Check if sqlcipher3 is installed and functional.""" 

348 try: 

349 import sqlcipher3 # type: ignore[import] 

350 return True 

351 except ImportError: 

352 return False 

353 

354 

355def is_cryptography_available() -> bool: 

356 """Check if cryptography (Fernet) is installed and functional.""" 

357 try: 

358 from cryptography.fernet import Fernet # type: ignore[import] 

359 return True 

360 except ImportError: 

361 return False