Coverage for frappe_manager / ssl_manager / acmesh_certificate_service.py: 11%

216 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:35 +0530

1""" 

2acme.sh-based SSL certificate service implementation. 

3 

4Provides certificate issuance, renewal, and management using acme.sh client, 

5following dependency injection pattern for testability and maintainability. 

6""" 

7 

8import os 

9import shutil 

10import subprocess 

11from pathlib import Path 

12 

13from frappe_manager.logger.contextual import ContextualLogger 

14from frappe_manager.output_manager import OutputHandler 

15from frappe_manager.output_manager.rich_output import RichOutputHandler 

16from frappe_manager.ssl_manager.certificate import SSLCertificate 

17from frappe_manager.ssl_manager.certificate_exceptions import ( 

18 SSLCertificateGenerateFailed, 

19 SSLCertificateNotFoundError, 

20) 

21from frappe_manager.ssl_manager.letsencrypt_certificate import CustomDomainCertificate 

22from frappe_manager.utils.subprocess import stream_command_output 

23 

24LETSENCRYPT_PRODUCTION_SERVER = "https://acme-v02.api.letsencrypt.org/directory" 

25LETSENCRYPT_STAGING_SERVER = "https://acme-staging-v02.api.letsencrypt.org/directory" 

26 

27 

28class AcmeShCertificateService: 

29 """ 

30 SSL certificate service using acme.sh client. 

31 

32 Supports: 

33 - HTTP-01 challenge (webroot validation) 

34 - DNS-01 challenge (Cloudflare API) 

35 - CNAME delegation for custom domains 

36 

37 Follows dependency injection pattern with OutputHandler for display operations. 

38 """ 

39 

40 # Class variable to cache acme.sh installation status 

41 _acmesh_installed = False 

42 

43 def __init__( 

44 self, 

45 logger: ContextualLogger, 

46 ssl_service_dir: Path, 

47 webroot_dir: Path, 

48 acmesh_home: Path | None = None, 

49 output_handler: OutputHandler | None = None, 

50 ): 

51 self.logger = logger.child(component="acmesh") 

52 self.webroot_dir = webroot_dir 

53 self.root_dir = ssl_service_dir / "acmesh" 

54 self.acmesh_home = acmesh_home or (ssl_service_dir / "acmesh" / ".acme.sh") 

55 self.acmesh_bin = self.acmesh_home / "acme.sh" 

56 self.output = output_handler or RichOutputHandler() 

57 

58 self.root_dir.mkdir(parents=True, exist_ok=True) 

59 

60 self._ensure_acmesh_installed() 

61 

62 def _ensure_acmesh_installed(self): 

63 """ 

64 Install acme.sh if not present. 

65 

66 Uses class-level caching to avoid repeated installation checks. 

67 """ 

68 if AcmeShCertificateService._acmesh_installed and self.acmesh_bin.exists(): 

69 return 

70 

71 if self.acmesh_bin.exists(): 

72 self.output.debug(f"acme.sh found at {self.acmesh_bin}") 

73 AcmeShCertificateService._acmesh_installed = True 

74 return 

75 

76 self.output.change_head("Installing acme.sh") 

77 try: 

78 # Install acme.sh with default noreply email 

79 # Email notifications discontinued by Let's Encrypt (June 2025) 

80 install_cmd = f"curl -s https://get.acme.sh | sh -s email=noreply@acme.sh --home {self.acmesh_home}" 

81 result = subprocess.run(install_cmd, shell=True, check=True, capture_output=True, text=True) 

82 

83 # Enable logging in account.conf (commented out by default) 

84 # This ensures all acme.sh operations are logged to acme.sh.log for debugging 

85 account_conf = self.acmesh_home / "account.conf" 

86 if account_conf.exists(): 

87 content = account_conf.read_text() 

88 # Uncomment LOG_FILE and LOG_LEVEL if they are commented 

89 content = content.replace("#LOG_FILE=", "LOG_FILE=") 

90 content = content.replace("#LOG_LEVEL=", "LOG_LEVEL=") 

91 account_conf.write_text(content) 

92 self.output.debug(f"Enabled logging in {account_conf}") 

93 

94 self.output.print("acme.sh installed successfully") 

95 AcmeShCertificateService._acmesh_installed = True 

96 except subprocess.CalledProcessError as e: 

97 error_msg = f"Failed to install acme.sh: {e.stderr}" 

98 self.output.display_error(error_msg) 

99 raise Exception(error_msg) 

100 

101 def _run_acmesh_command(self, args: list, env: dict[str, str] | None = None) -> subprocess.CompletedProcess: 

102 """ 

103 Run acme.sh command with proper environment. 

104 

105 Args: 

106 args: Command arguments (without acme.sh binary path) 

107 env: Additional environment variables 

108 

109 Returns: 

110 CompletedProcess result 

111 """ 

112 cmd = [str(self.acmesh_bin)] + args 

113 self.output.debug(f"Running acme.sh: {' '.join(args)}") 

114 

115 command_env = os.environ.copy() 

116 command_env["LE_WORKING_DIR"] = str(self.acmesh_home) 

117 

118 if env: 

119 command_env.update(env) 

120 

121 result = subprocess.run(cmd, capture_output=True, text=True, env=command_env) 

122 

123 if result.returncode != 0: 

124 self.output.debug(f"acme.sh failed: {result.stderr}") 

125 # Also show stdout for debug mode 

126 if result.stdout: 

127 self.output.debug(f"acme.sh stdout: {result.stdout}") 

128 else: 

129 self.output.debug("acme.sh succeeded") 

130 # Show output in debug mode 

131 if result.stdout: 

132 self.output.debug(f"acme.sh output: {result.stdout}") 

133 

134 return result 

135 

136 def _clear_cached_dns_credentials(self): 

137 """ 

138 Clear cached DNS credentials from acme.sh account.conf. 

139 

140 When users update their Cloudflare API credentials in FM config, acme.sh may 

141 continue using old cached credentials from account.conf, causing authentication 

142 failures. This method removes cached DNS provider credentials while preserving 

143 all other acme.sh settings (ACME account configuration, CA preferences, etc.). 

144 

145 Credentials cleared: 

146 - SAVED_CF_Token: Cloudflare API Token 

147 - SAVED_CF_Account_ID: Cloudflare Account ID 

148 - SAVED_CF_Key: Cloudflare Global API Key (legacy) 

149 - SAVED_CF_Email: Email for Global API Key auth 

150 - SAVED_CF_Zone_ID: Zone ID (optional optimization) 

151 

152 This ensures acme.sh always uses fresh credentials from FM configuration 

153 instead of potentially stale/revoked cached values. 

154 

155 Note: This does NOT affect the ACME account key stored in ca/ directory, 

156 which is required for certificate management and renewals. 

157 """ 

158 account_conf = self.acmesh_home / "account.conf" 

159 if not account_conf.exists(): 

160 self.output.debug("account.conf not found, skipping credential cache clear") 

161 return 

162 

163 try: 

164 # Read current configuration 

165 content = account_conf.read_text() 

166 lines = content.split("\n") 

167 

168 # Credential prefixes to remove (acme.sh uses SAVED_ prefix for mutable account config) 

169 stale_cred_prefixes = ( 

170 "SAVED_CF_Token=", 

171 "SAVED_CF_Account_ID=", 

172 "SAVED_CF_Key=", 

173 "SAVED_CF_Email=", 

174 "SAVED_CF_Zone_ID=", 

175 ) 

176 

177 # Filter out credential lines, preserve everything else 

178 original_count = len(lines) 

179 filtered_lines = [line for line in lines if not line.startswith(stale_cred_prefixes)] 

180 removed_count = original_count - len(filtered_lines) 

181 

182 # Write back filtered configuration 

183 account_conf.write_text("\n".join(filtered_lines)) 

184 

185 if removed_count > 0: 

186 self.output.debug(f"Cleared {removed_count} cached DNS credential line(s) from account.conf") 

187 else: 

188 self.output.debug("No cached DNS credentials found in account.conf") 

189 

190 except Exception as e: 

191 # Non-fatal: log warning but don't block certificate operations 

192 # Environment variables will still override cache if present 

193 self.output.warning(f"Failed to clear cached credentials: {e}") 

194 

195 def _stream_acmesh_command( 

196 self, 

197 args: list, 

198 env: dict[str, str] | None = None, 

199 show_live_output: bool = True, 

200 ) -> int: 

201 """ 

202 Run acme.sh command with live output streaming from stdout/stderr. 

203 

204 This method streams stdout/stderr in real-time to the user while the command runs, 

205 providing feedback during long-running operations (certificate generation, DNS propagation). 

206 Uses a rolling window display (shows last 5 lines) that updates as new output arrives. 

207 acme.sh outputs debug info to stderr when --debug flag is used. 

208 

209 Args: 

210 args: Command arguments (without acme.sh binary path) 

211 env: Additional environment variables 

212 show_live_output: Whether to display live output (default True) 

213 

214 Returns: 

215 Exit code of the command 

216 """ 

217 cmd = [str(self.acmesh_bin)] + args 

218 self.output.debug(f"Running acme.sh with streaming: {' '.join(args)}") 

219 

220 # Build environment 

221 command_env = os.environ.copy() 

222 command_env["LE_WORKING_DIR"] = str(self.acmesh_home) 

223 if env: 

224 command_env.update(env) 

225 

226 if show_live_output: 

227 # Use closure to track exit code while streaming 

228 exit_code_holder = [0] 

229 

230 def stream_with_exit_tracking(): 

231 """Generator that tracks exit code while yielding output.""" 

232 for source, line in stream_command_output(cmd, env=command_env, cwd=None): 

233 if source == "exit_code": 

234 exit_code_holder[0] = int(line.decode()) 

235 yield source, line 

236 

237 # Display with rolling window (last 5 lines) 

238 self.output.live_lines(stream_with_exit_tracking(), padding=(0, 0, 0, 2), lines=5) 

239 return exit_code_holder[0] 

240 # No output display - just run and return exit code 

241 exit_code = 0 

242 for source, line in stream_command_output(cmd, env=command_env, cwd=None): 

243 if source == "exit_code": 

244 exit_code = int(line.decode()) 

245 return exit_code 

246 

247 def generate_certificate(self, certificate: SSLCertificate, dry_run: bool = False) -> tuple[Path, Path]: 

248 """ 

249 Issue individual certificate using acme.sh. 

250 

251 Each certificate is issued for a single domain only (no SANs). 

252 

253 Args: 

254 certificate: Certificate configuration 

255 dry_run: If True, skips copying cert files to permanent location (staging only) 

256 

257 Returns: 

258 Tuple of (privkey_path, fullchain_path) 

259 

260 Raises: 

261 SSLCertificateGenerateFailed: If certificate generation fails 

262 SSLCertificateNotFoundError: If generated certificate files not found 

263 """ 

264 self.output.change_head(f"Generating SSL certificate for {certificate.domain}") 

265 

266 use_staging = os.getenv("FM_LETSENCRYPT_STAGING", "").lower() in ("1", "true", "yes") 

267 if use_staging: 

268 self.output.print( 

269 "[yellow]Using Let's Encrypt STAGING server (test certificates)[/yellow]", 

270 emoji_code=":warning:", 

271 ) 

272 

273 ca_server = LETSENCRYPT_STAGING_SERVER if use_staging else LETSENCRYPT_PRODUCTION_SERVER 

274 

275 args = [ 

276 "--home", 

277 str(self.acmesh_home), 

278 "--issue", 

279 "-d", 

280 certificate.domain, 

281 "--server", 

282 ca_server, 

283 "--force", 

284 "--debug", 

285 "2", 

286 ] 

287 

288 env = {} 

289 

290 if certificate.challenge_type.value == "http01": 

291 self.output.info(f"Using HTTP-01 challenge with webroot: {self.webroot_dir}") 

292 args.extend(["-w", str(self.webroot_dir)]) 

293 

294 elif certificate.challenge_type.value == "dns01": 

295 self.output.info("Using DNS-01 challenge with Cloudflare") 

296 

297 self._clear_cached_dns_credentials() 

298 

299 from frappe_manager.ssl_manager.ssl_utils import get_dns_credentials_for_certificate 

300 

301 try: 

302 dns_creds = get_dns_credentials_for_certificate(certificate) 

303 if dns_creds: 

304 env.update(dns_creds) 

305 except Exception as e: 

306 self.output.display_error(f"Failed to get DNS credentials: {e}") 

307 raise 

308 

309 if isinstance(certificate, CustomDomainCertificate) and certificate.delegation_cname: 

310 self.output.info(f"Using challenge alias: {certificate.delegation_cname}") 

311 args.extend(["--dns", "dns_cf", "--challenge-alias", certificate.delegation_cname]) 

312 else: 

313 args.extend(["--dns", "dns_cf"]) 

314 

315 exit_code = self._stream_acmesh_command(args, env=env, show_live_output=True) 

316 

317 if exit_code != 0: 

318 error_msg = f"Certificate generation failed for {certificate.domain}" 

319 self.output.display_error(error_msg) 

320 raise SSLCertificateGenerateFailed(certificate.domain) 

321 

322 cert_dir = self.acmesh_home / certificate.domain 

323 if not cert_dir.exists(): 

324 cert_dir_ecc = self.acmesh_home / f"{certificate.domain}_ecc" 

325 if cert_dir_ecc.exists(): 

326 cert_dir = cert_dir_ecc 

327 

328 key_path = cert_dir / f"{certificate.domain}.key" 

329 fullchain_path = cert_dir / "fullchain.cer" 

330 

331 if not fullchain_path.exists() or not key_path.exists(): 

332 error_msg = f"Certificate files not found after generation in {cert_dir}" 

333 self.output.display_error(error_msg) 

334 raise SSLCertificateNotFoundError(certificate.domain) 

335 

336 if dry_run: 

337 self.output.print(f"[green]Certificate generated successfully (staging) for {certificate.domain}[/green]") 

338 self.output.print( 

339 "[yellow]Skipped: Copying certificate files (dry run)[/yellow]", 

340 emoji_code=":fast_forward:", 

341 ) 

342 return (key_path, fullchain_path) 

343 

344 dest_dir = self.root_dir / certificate.domain 

345 dest_dir.mkdir(parents=True, exist_ok=True) 

346 

347 dest_key = dest_dir / "key.pem" 

348 dest_fullchain = dest_dir / "fullchain.pem" 

349 

350 self.output.debug(f"Copying certificates to {dest_dir}") 

351 shutil.copy(fullchain_path, dest_fullchain) 

352 shutil.copy(key_path, dest_key) 

353 

354 self.output.print(f"Certificate generated successfully for {certificate.domain}") 

355 return (dest_key, dest_fullchain) 

356 

357 def renew_certificate(self, certificate: SSLCertificate, dry_run: bool = False) -> bool: 

358 """ 

359 Renew certificate using acme.sh. 

360 

361 Args: 

362 certificate: Certificate to renew 

363 dry_run: If True, skips copying cert files to permanent location (staging only) 

364 

365 Returns: 

366 True if renewal succeeded, False otherwise 

367 """ 

368 self.output.change_head(f"Renewing SSL certificate for {certificate.domain}") 

369 

370 if certificate.challenge_type.value == "dns01": 

371 self._clear_cached_dns_credentials() 

372 

373 use_staging = os.getenv("FM_LETSENCRYPT_STAGING", "").lower() in ("1", "true", "yes") 

374 if use_staging: 

375 self.output.print( 

376 "[yellow]Using Let's Encrypt STAGING server for renewal (test certificates)[/yellow]", 

377 emoji_code=":warning:", 

378 ) 

379 

380 ca_server = LETSENCRYPT_STAGING_SERVER if use_staging else LETSENCRYPT_PRODUCTION_SERVER 

381 

382 args = [ 

383 "--home", 

384 str(self.acmesh_home), 

385 "--renew", 

386 "-d", 

387 certificate.domain, 

388 "--server", 

389 ca_server, 

390 "--force", 

391 "--debug", 

392 ] 

393 

394 exit_code = self._stream_acmesh_command(args, show_live_output=True) 

395 

396 if exit_code != 0: 

397 self.output.display_error(f"Certificate renewal failed for {certificate.domain}") 

398 return False 

399 

400 cert_dir = self.acmesh_home / certificate.domain 

401 if not cert_dir.exists(): 

402 cert_dir_ecc = self.acmesh_home / f"{certificate.domain}_ecc" 

403 if cert_dir_ecc.exists(): 

404 cert_dir = cert_dir_ecc 

405 

406 fullchain_path = cert_dir / "fullchain.cer" 

407 key_path = cert_dir / f"{certificate.domain}.key" 

408 

409 if fullchain_path.exists() and key_path.exists(): 

410 if dry_run: 

411 self.output.print(f"[green]Certificate renewed successfully (staging) for {certificate.domain}[/green]") 

412 self.output.print( 

413 "[yellow]Skipped: Copying certificate files (dry run)[/yellow]", 

414 emoji_code=":fast_forward:", 

415 ) 

416 return True 

417 

418 dest_dir = self.root_dir / certificate.domain 

419 dest_dir.mkdir(parents=True, exist_ok=True) 

420 

421 dest_key = dest_dir / "key.pem" 

422 dest_fullchain = dest_dir / "fullchain.pem" 

423 

424 shutil.copy(fullchain_path, dest_fullchain) 

425 shutil.copy(key_path, dest_key) 

426 

427 self.output.print(f"Certificate renewed successfully for {certificate.domain}") 

428 return True 

429 

430 self.output.display_error(f"Renewed certificate files not found for {certificate.domain}") 

431 return False 

432 

433 def remove_certificate(self, certificate: SSLCertificate) -> bool: 

434 """ 

435 Remove certificate from acme.sh and service directory. 

436 

437 Args: 

438 certificate: Certificate to remove 

439 

440 Returns: 

441 True if removal succeeded, False otherwise 

442 """ 

443 self.output.change_head(f"Removing SSL certificate for {certificate.domain}") 

444 

445 args = ["--remove", "-d", certificate.domain] 

446 

447 result = self._run_acmesh_command(args) 

448 

449 # Remove from service directory 

450 cert_dir = self.root_dir / certificate.domain 

451 if cert_dir.exists(): 

452 try: 

453 shutil.rmtree(cert_dir) 

454 self.output.debug(f"Removed certificate directory: {cert_dir}") 

455 except Exception as e: 

456 self.output.display_error(f"Failed to remove directory {cert_dir}: {e}") 

457 

458 # Also remove acme.sh internal directory (.acme.sh/<domain>_ecc) 

459 # acme.sh --remove only renames the config to .conf.removed, it doesn't delete the directory 

460 acmesh_internal_dir = self.acmesh_home / f"{certificate.domain}_ecc" 

461 if acmesh_internal_dir.exists(): 

462 try: 

463 shutil.rmtree(acmesh_internal_dir) 

464 self.output.debug(f"Removed acme.sh internal directory: {acmesh_internal_dir}") 

465 except Exception as e: 

466 self.output.warning(f"Failed to remove acme.sh directory {acmesh_internal_dir}: {e}") 

467 

468 success = result.returncode == 0 

469 if success: 

470 self.output.print(f"Certificate removed successfully for {certificate.domain}") 

471 else: 

472 self.output.display_error(f"Failed to remove certificate for {certificate.domain}") 

473 

474 return success