Coverage for src / apcore_cli / security / config_encryptor.py: 90%
96 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
1"""Encrypted configuration storage (FE-05)."""
3from __future__ import annotations
5import base64
6import binascii
7import hashlib
8import logging
9import os
10import socket
12from cryptography.exceptions import InvalidTag
14logger = logging.getLogger("apcore_cli.security")
16# PBKDF2-HMAC-SHA256 iteration count — follows OWASP 2024+ minimum for SHA-256.
17_PBKDF2_ITERATIONS = 600_000
19# Static salt used only for backward-compatible reads of legacy enc: (v1) values.
20_V1_STATIC_SALT = b"apcore-cli-config-v1"
23class ConfigDecryptionError(Exception):
24 pass
27class ConfigEncryptor:
28 SERVICE_NAME = "apcore-cli"
30 def store(self, key: str, value: str) -> str:
31 if self._keyring_available():
32 import keyring as kr
34 kr.set_password(self.SERVICE_NAME, key, value)
35 return f"keyring:{key}"
36 else:
37 logger.warning(
38 "OS keyring unavailable. Falling back to file-based obfuscation "
39 "with a host+user-derived key. This is NOT strong encryption: any "
40 "local user who can read the config AND observe hostname+username "
41 "can recover the value. Install a real keyring backend "
42 "(macOS Keychain / GNOME Keyring / KWallet / Windows Credential "
43 "Manager) for real protection."
44 )
45 ciphertext = self._aes_encrypt(value)
46 return f"enc:v2:{base64.b64encode(ciphertext).decode()}"
48 def retrieve(self, config_value: str, key: str) -> str:
49 if config_value.startswith("keyring:"):
50 import keyring as kr
52 ref_key = config_value[len("keyring:") :]
53 result = kr.get_password(self.SERVICE_NAME, ref_key)
54 if result is None:
55 raise ConfigDecryptionError(f"Keyring entry not found for '{ref_key}'.")
56 return result
57 elif config_value.startswith("enc:v2:"):
58 try:
59 data = base64.b64decode(config_value[len("enc:v2:") :])
60 return self._aes_decrypt(data)
61 except (InvalidTag, ValueError, binascii.Error, UnicodeDecodeError) as exc:
62 raise ConfigDecryptionError(
63 f"Failed to decrypt configuration value '{key}'. Re-configure with 'apcore-cli config set {key}'."
64 ) from exc
65 elif config_value.startswith("enc:"):
66 try:
67 data = base64.b64decode(config_value[len("enc:") :])
68 return self._aes_decrypt_v1(data)
69 except (InvalidTag, ValueError, binascii.Error, UnicodeDecodeError, ConfigDecryptionError) as exc:
70 raise ConfigDecryptionError(
71 f"Failed to decrypt configuration value '{key}'. Re-configure with 'apcore-cli config set {key}'."
72 ) from exc
73 else:
74 return config_value
76 def _keyring_available(self) -> bool:
77 try:
78 import keyring as kr
80 backend = kr.get_keyring()
81 return not (
82 hasattr(kr, "backends")
83 and hasattr(kr.backends, "fail")
84 and isinstance(backend, kr.backends.fail.Keyring)
85 )
86 except Exception:
87 return False
89 def _derive_key(self, salt: bytes) -> bytes:
90 """Derive a 32-byte AES key using PBKDF2-HMAC-SHA256 with a provided salt."""
91 passphrase = os.getenv("APCORE_CLI_CONFIG_PASSPHRASE")
92 if passphrase:
93 material = passphrase.encode()
94 else:
95 hostname = socket.gethostname()
96 username = os.getenv("USER", os.getenv("USERNAME", "unknown"))
97 material = f"{hostname}:{username}".encode()
98 return hashlib.pbkdf2_hmac("sha256", material, salt, iterations=_PBKDF2_ITERATIONS)
100 def _aes_encrypt(self, plaintext: str) -> bytes:
101 """Encrypt using AES-256-GCM with a random per-encryption salt (v2 format).
103 Wire layout: salt(16) + nonce(12) + tag(16) + ciphertext.
104 """
105 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
107 salt = os.urandom(16)
108 key = self._derive_key(salt)
109 nonce = os.urandom(12)
110 encryptor = Cipher(algorithms.AES(key), modes.GCM(nonce)).encryptor()
111 ct = encryptor.update(plaintext.encode("utf-8")) + encryptor.finalize()
112 tag = encryptor.tag
113 # Wire format v2: salt(16) + nonce(12) + tag(16) + ciphertext
114 return salt + nonce + tag + ct
116 def _aes_decrypt(self, data: bytes) -> str:
117 """Decrypt v2-format ciphertext (salt(16) + nonce(12) + tag(16) + ct)."""
118 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
120 salt = data[:16]
121 nonce = data[16:28]
122 tag = data[28:44]
123 ct = data[44:]
124 key = self._derive_key(salt)
125 decryptor = Cipher(algorithms.AES(key), modes.GCM(nonce, tag)).decryptor()
126 return (decryptor.update(ct) + decryptor.finalize()).decode("utf-8")
128 def _aes_decrypt_v1(self, data: bytes) -> str:
129 """Decrypt legacy v1-format ciphertext (nonce(12) + tag(16) + ct).
131 Tries 600k iterations first (Rust-written v1), then 100k (early Python/TS).
132 """
133 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
135 nonce = data[:12]
136 tag = data[12:28]
137 ct = data[28:]
138 last_exc: Exception = ValueError("no iterations tried")
139 for iterations in (600_000, 100_000):
140 try:
141 key = hashlib.pbkdf2_hmac("sha256", self._v1_material(), _V1_STATIC_SALT, iterations=iterations)
142 decryptor = Cipher(algorithms.AES(key), modes.GCM(nonce, tag)).decryptor()
143 return (decryptor.update(ct) + decryptor.finalize()).decode("utf-8")
144 except (InvalidTag, ValueError, UnicodeDecodeError) as exc:
145 last_exc = exc
146 continue
147 raise ConfigDecryptionError("v1 decryption failed") from last_exc
149 def _v1_material(self) -> bytes:
150 hostname = socket.gethostname()
151 username = os.getenv("USER", os.getenv("USERNAME", "unknown"))
152 return f"{hostname}:{username}".encode()