Coverage for src / apcore_cli / security / config_encryptor.py: 90%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-26 10:23 +0800

1"""Encrypted configuration storage (FE-05).""" 

2 

3from __future__ import annotations 

4 

5import base64 

6import binascii 

7import hashlib 

8import logging 

9import os 

10import socket 

11 

12from cryptography.exceptions import InvalidTag 

13 

14logger = logging.getLogger("apcore_cli.security") 

15 

16# PBKDF2-HMAC-SHA256 iteration count — follows OWASP 2024+ minimum for SHA-256. 

17_PBKDF2_ITERATIONS = 600_000 

18 

19# Static salt used only for backward-compatible reads of legacy enc: (v1) values. 

20_V1_STATIC_SALT = b"apcore-cli-config-v1" 

21 

22 

23class ConfigDecryptionError(Exception): 

24 pass 

25 

26 

27class ConfigEncryptor: 

28 SERVICE_NAME = "apcore-cli" 

29 

30 def store(self, key: str, value: str) -> str: 

31 if self._keyring_available(): 

32 import keyring as kr 

33 

34 kr.set_password(self.SERVICE_NAME, key, value) 

35 return f"keyring:{key}" 

36 else: 

37 logger.warning( 

38 "OS keyring unavailable. Falling back to file-based obfuscation " 

39 "with a host+user-derived key. This is NOT strong encryption: any " 

40 "local user who can read the config AND observe hostname+username " 

41 "can recover the value. Install a real keyring backend " 

42 "(macOS Keychain / GNOME Keyring / KWallet / Windows Credential " 

43 "Manager) for real protection." 

44 ) 

45 ciphertext = self._aes_encrypt(value) 

46 return f"enc:v2:{base64.b64encode(ciphertext).decode()}" 

47 

48 def retrieve(self, config_value: str, key: str) -> str: 

49 if config_value.startswith("keyring:"): 

50 import keyring as kr 

51 

52 ref_key = config_value[len("keyring:") :] 

53 result = kr.get_password(self.SERVICE_NAME, ref_key) 

54 if result is None: 

55 raise ConfigDecryptionError(f"Keyring entry not found for '{ref_key}'.") 

56 return result 

57 elif config_value.startswith("enc:v2:"): 

58 try: 

59 data = base64.b64decode(config_value[len("enc:v2:") :]) 

60 return self._aes_decrypt(data) 

61 except (InvalidTag, ValueError, binascii.Error, UnicodeDecodeError) as exc: 

62 raise ConfigDecryptionError( 

63 f"Failed to decrypt configuration value '{key}'. Re-configure with 'apcore-cli config set {key}'." 

64 ) from exc 

65 elif config_value.startswith("enc:"): 

66 try: 

67 data = base64.b64decode(config_value[len("enc:") :]) 

68 return self._aes_decrypt_v1(data) 

69 except (InvalidTag, ValueError, binascii.Error, UnicodeDecodeError, ConfigDecryptionError) as exc: 

70 raise ConfigDecryptionError( 

71 f"Failed to decrypt configuration value '{key}'. Re-configure with 'apcore-cli config set {key}'." 

72 ) from exc 

73 else: 

74 return config_value 

75 

76 def _keyring_available(self) -> bool: 

77 try: 

78 import keyring as kr 

79 

80 backend = kr.get_keyring() 

81 return not ( 

82 hasattr(kr, "backends") 

83 and hasattr(kr.backends, "fail") 

84 and isinstance(backend, kr.backends.fail.Keyring) 

85 ) 

86 except Exception: 

87 return False 

88 

89 def _derive_key(self, salt: bytes) -> bytes: 

90 """Derive a 32-byte AES key using PBKDF2-HMAC-SHA256 with a provided salt.""" 

91 passphrase = os.getenv("APCORE_CLI_CONFIG_PASSPHRASE") 

92 if passphrase: 

93 material = passphrase.encode() 

94 else: 

95 hostname = socket.gethostname() 

96 username = os.getenv("USER", os.getenv("USERNAME", "unknown")) 

97 material = f"{hostname}:{username}".encode() 

98 return hashlib.pbkdf2_hmac("sha256", material, salt, iterations=_PBKDF2_ITERATIONS) 

99 

100 def _aes_encrypt(self, plaintext: str) -> bytes: 

101 """Encrypt using AES-256-GCM with a random per-encryption salt (v2 format). 

102 

103 Wire layout: salt(16) + nonce(12) + tag(16) + ciphertext. 

104 """ 

105 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

106 

107 salt = os.urandom(16) 

108 key = self._derive_key(salt) 

109 nonce = os.urandom(12) 

110 encryptor = Cipher(algorithms.AES(key), modes.GCM(nonce)).encryptor() 

111 ct = encryptor.update(plaintext.encode("utf-8")) + encryptor.finalize() 

112 tag = encryptor.tag 

113 # Wire format v2: salt(16) + nonce(12) + tag(16) + ciphertext 

114 return salt + nonce + tag + ct 

115 

116 def _aes_decrypt(self, data: bytes) -> str: 

117 """Decrypt v2-format ciphertext (salt(16) + nonce(12) + tag(16) + ct).""" 

118 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

119 

120 salt = data[:16] 

121 nonce = data[16:28] 

122 tag = data[28:44] 

123 ct = data[44:] 

124 key = self._derive_key(salt) 

125 decryptor = Cipher(algorithms.AES(key), modes.GCM(nonce, tag)).decryptor() 

126 return (decryptor.update(ct) + decryptor.finalize()).decode("utf-8") 

127 

128 def _aes_decrypt_v1(self, data: bytes) -> str: 

129 """Decrypt legacy v1-format ciphertext (nonce(12) + tag(16) + ct). 

130 

131 Tries 600k iterations first (Rust-written v1), then 100k (early Python/TS). 

132 """ 

133 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

134 

135 nonce = data[:12] 

136 tag = data[12:28] 

137 ct = data[28:] 

138 last_exc: Exception = ValueError("no iterations tried") 

139 for iterations in (600_000, 100_000): 

140 try: 

141 key = hashlib.pbkdf2_hmac("sha256", self._v1_material(), _V1_STATIC_SALT, iterations=iterations) 

142 decryptor = Cipher(algorithms.AES(key), modes.GCM(nonce, tag)).decryptor() 

143 return (decryptor.update(ct) + decryptor.finalize()).decode("utf-8") 

144 except (InvalidTag, ValueError, UnicodeDecodeError) as exc: 

145 last_exc = exc 

146 continue 

147 raise ConfigDecryptionError("v1 decryption failed") from last_exc 

148 

149 def _v1_material(self) -> bytes: 

150 hostname = socket.gethostname() 

151 username = os.getenv("USER", os.getenv("USERNAME", "unknown")) 

152 return f"{hostname}:{username}".encode()