Coverage for frappe_manager / ssl_manager / acmesh_certificate_service.py: 11%
216 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:35 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:35 +0530
1"""
2acme.sh-based SSL certificate service implementation.
4Provides certificate issuance, renewal, and management using acme.sh client,
5following dependency injection pattern for testability and maintainability.
6"""
8import os
9import shutil
10import subprocess
11from pathlib import Path
13from frappe_manager.logger.contextual import ContextualLogger
14from frappe_manager.output_manager import OutputHandler
15from frappe_manager.output_manager.rich_output import RichOutputHandler
16from frappe_manager.ssl_manager.certificate import SSLCertificate
17from frappe_manager.ssl_manager.certificate_exceptions import (
18 SSLCertificateGenerateFailed,
19 SSLCertificateNotFoundError,
20)
21from frappe_manager.ssl_manager.letsencrypt_certificate import CustomDomainCertificate
22from frappe_manager.utils.subprocess import stream_command_output
24LETSENCRYPT_PRODUCTION_SERVER = "https://acme-v02.api.letsencrypt.org/directory"
25LETSENCRYPT_STAGING_SERVER = "https://acme-staging-v02.api.letsencrypt.org/directory"
28class AcmeShCertificateService:
29 """
30 SSL certificate service using acme.sh client.
32 Supports:
33 - HTTP-01 challenge (webroot validation)
34 - DNS-01 challenge (Cloudflare API)
35 - CNAME delegation for custom domains
37 Follows dependency injection pattern with OutputHandler for display operations.
38 """
40 # Class variable to cache acme.sh installation status
41 _acmesh_installed = False
43 def __init__(
44 self,
45 logger: ContextualLogger,
46 ssl_service_dir: Path,
47 webroot_dir: Path,
48 acmesh_home: Path | None = None,
49 output_handler: OutputHandler | None = None,
50 ):
51 self.logger = logger.child(component="acmesh")
52 self.webroot_dir = webroot_dir
53 self.root_dir = ssl_service_dir / "acmesh"
54 self.acmesh_home = acmesh_home or (ssl_service_dir / "acmesh" / ".acme.sh")
55 self.acmesh_bin = self.acmesh_home / "acme.sh"
56 self.output = output_handler or RichOutputHandler()
58 self.root_dir.mkdir(parents=True, exist_ok=True)
60 self._ensure_acmesh_installed()
62 def _ensure_acmesh_installed(self):
63 """
64 Install acme.sh if not present.
66 Uses class-level caching to avoid repeated installation checks.
67 """
68 if AcmeShCertificateService._acmesh_installed and self.acmesh_bin.exists():
69 return
71 if self.acmesh_bin.exists():
72 self.output.debug(f"acme.sh found at {self.acmesh_bin}")
73 AcmeShCertificateService._acmesh_installed = True
74 return
76 self.output.change_head("Installing acme.sh")
77 try:
78 # Install acme.sh with default noreply email
79 # Email notifications discontinued by Let's Encrypt (June 2025)
80 install_cmd = f"curl -s https://get.acme.sh | sh -s email=noreply@acme.sh --home {self.acmesh_home}"
81 result = subprocess.run(install_cmd, shell=True, check=True, capture_output=True, text=True)
83 # Enable logging in account.conf (commented out by default)
84 # This ensures all acme.sh operations are logged to acme.sh.log for debugging
85 account_conf = self.acmesh_home / "account.conf"
86 if account_conf.exists():
87 content = account_conf.read_text()
88 # Uncomment LOG_FILE and LOG_LEVEL if they are commented
89 content = content.replace("#LOG_FILE=", "LOG_FILE=")
90 content = content.replace("#LOG_LEVEL=", "LOG_LEVEL=")
91 account_conf.write_text(content)
92 self.output.debug(f"Enabled logging in {account_conf}")
94 self.output.print("acme.sh installed successfully")
95 AcmeShCertificateService._acmesh_installed = True
96 except subprocess.CalledProcessError as e:
97 error_msg = f"Failed to install acme.sh: {e.stderr}"
98 self.output.display_error(error_msg)
99 raise Exception(error_msg)
101 def _run_acmesh_command(self, args: list, env: dict[str, str] | None = None) -> subprocess.CompletedProcess:
102 """
103 Run acme.sh command with proper environment.
105 Args:
106 args: Command arguments (without acme.sh binary path)
107 env: Additional environment variables
109 Returns:
110 CompletedProcess result
111 """
112 cmd = [str(self.acmesh_bin)] + args
113 self.output.debug(f"Running acme.sh: {' '.join(args)}")
115 command_env = os.environ.copy()
116 command_env["LE_WORKING_DIR"] = str(self.acmesh_home)
118 if env:
119 command_env.update(env)
121 result = subprocess.run(cmd, capture_output=True, text=True, env=command_env)
123 if result.returncode != 0:
124 self.output.debug(f"acme.sh failed: {result.stderr}")
125 # Also show stdout for debug mode
126 if result.stdout:
127 self.output.debug(f"acme.sh stdout: {result.stdout}")
128 else:
129 self.output.debug("acme.sh succeeded")
130 # Show output in debug mode
131 if result.stdout:
132 self.output.debug(f"acme.sh output: {result.stdout}")
134 return result
136 def _clear_cached_dns_credentials(self):
137 """
138 Clear cached DNS credentials from acme.sh account.conf.
140 When users update their Cloudflare API credentials in FM config, acme.sh may
141 continue using old cached credentials from account.conf, causing authentication
142 failures. This method removes cached DNS provider credentials while preserving
143 all other acme.sh settings (ACME account configuration, CA preferences, etc.).
145 Credentials cleared:
146 - SAVED_CF_Token: Cloudflare API Token
147 - SAVED_CF_Account_ID: Cloudflare Account ID
148 - SAVED_CF_Key: Cloudflare Global API Key (legacy)
149 - SAVED_CF_Email: Email for Global API Key auth
150 - SAVED_CF_Zone_ID: Zone ID (optional optimization)
152 This ensures acme.sh always uses fresh credentials from FM configuration
153 instead of potentially stale/revoked cached values.
155 Note: This does NOT affect the ACME account key stored in ca/ directory,
156 which is required for certificate management and renewals.
157 """
158 account_conf = self.acmesh_home / "account.conf"
159 if not account_conf.exists():
160 self.output.debug("account.conf not found, skipping credential cache clear")
161 return
163 try:
164 # Read current configuration
165 content = account_conf.read_text()
166 lines = content.split("\n")
168 # Credential prefixes to remove (acme.sh uses SAVED_ prefix for mutable account config)
169 stale_cred_prefixes = (
170 "SAVED_CF_Token=",
171 "SAVED_CF_Account_ID=",
172 "SAVED_CF_Key=",
173 "SAVED_CF_Email=",
174 "SAVED_CF_Zone_ID=",
175 )
177 # Filter out credential lines, preserve everything else
178 original_count = len(lines)
179 filtered_lines = [line for line in lines if not line.startswith(stale_cred_prefixes)]
180 removed_count = original_count - len(filtered_lines)
182 # Write back filtered configuration
183 account_conf.write_text("\n".join(filtered_lines))
185 if removed_count > 0:
186 self.output.debug(f"Cleared {removed_count} cached DNS credential line(s) from account.conf")
187 else:
188 self.output.debug("No cached DNS credentials found in account.conf")
190 except Exception as e:
191 # Non-fatal: log warning but don't block certificate operations
192 # Environment variables will still override cache if present
193 self.output.warning(f"Failed to clear cached credentials: {e}")
195 def _stream_acmesh_command(
196 self,
197 args: list,
198 env: dict[str, str] | None = None,
199 show_live_output: bool = True,
200 ) -> int:
201 """
202 Run acme.sh command with live output streaming from stdout/stderr.
204 This method streams stdout/stderr in real-time to the user while the command runs,
205 providing feedback during long-running operations (certificate generation, DNS propagation).
206 Uses a rolling window display (shows last 5 lines) that updates as new output arrives.
207 acme.sh outputs debug info to stderr when --debug flag is used.
209 Args:
210 args: Command arguments (without acme.sh binary path)
211 env: Additional environment variables
212 show_live_output: Whether to display live output (default True)
214 Returns:
215 Exit code of the command
216 """
217 cmd = [str(self.acmesh_bin)] + args
218 self.output.debug(f"Running acme.sh with streaming: {' '.join(args)}")
220 # Build environment
221 command_env = os.environ.copy()
222 command_env["LE_WORKING_DIR"] = str(self.acmesh_home)
223 if env:
224 command_env.update(env)
226 if show_live_output:
227 # Use closure to track exit code while streaming
228 exit_code_holder = [0]
230 def stream_with_exit_tracking():
231 """Generator that tracks exit code while yielding output."""
232 for source, line in stream_command_output(cmd, env=command_env, cwd=None):
233 if source == "exit_code":
234 exit_code_holder[0] = int(line.decode())
235 yield source, line
237 # Display with rolling window (last 5 lines)
238 self.output.live_lines(stream_with_exit_tracking(), padding=(0, 0, 0, 2), lines=5)
239 return exit_code_holder[0]
240 # No output display - just run and return exit code
241 exit_code = 0
242 for source, line in stream_command_output(cmd, env=command_env, cwd=None):
243 if source == "exit_code":
244 exit_code = int(line.decode())
245 return exit_code
247 def generate_certificate(self, certificate: SSLCertificate, dry_run: bool = False) -> tuple[Path, Path]:
248 """
249 Issue individual certificate using acme.sh.
251 Each certificate is issued for a single domain only (no SANs).
253 Args:
254 certificate: Certificate configuration
255 dry_run: If True, skips copying cert files to permanent location (staging only)
257 Returns:
258 Tuple of (privkey_path, fullchain_path)
260 Raises:
261 SSLCertificateGenerateFailed: If certificate generation fails
262 SSLCertificateNotFoundError: If generated certificate files not found
263 """
264 self.output.change_head(f"Generating SSL certificate for {certificate.domain}")
266 use_staging = os.getenv("FM_LETSENCRYPT_STAGING", "").lower() in ("1", "true", "yes")
267 if use_staging:
268 self.output.print(
269 "[yellow]Using Let's Encrypt STAGING server (test certificates)[/yellow]",
270 emoji_code=":warning:",
271 )
273 ca_server = LETSENCRYPT_STAGING_SERVER if use_staging else LETSENCRYPT_PRODUCTION_SERVER
275 args = [
276 "--home",
277 str(self.acmesh_home),
278 "--issue",
279 "-d",
280 certificate.domain,
281 "--server",
282 ca_server,
283 "--force",
284 "--debug",
285 "2",
286 ]
288 env = {}
290 if certificate.challenge_type.value == "http01":
291 self.output.info(f"Using HTTP-01 challenge with webroot: {self.webroot_dir}")
292 args.extend(["-w", str(self.webroot_dir)])
294 elif certificate.challenge_type.value == "dns01":
295 self.output.info("Using DNS-01 challenge with Cloudflare")
297 self._clear_cached_dns_credentials()
299 from frappe_manager.ssl_manager.ssl_utils import get_dns_credentials_for_certificate
301 try:
302 dns_creds = get_dns_credentials_for_certificate(certificate)
303 if dns_creds:
304 env.update(dns_creds)
305 except Exception as e:
306 self.output.display_error(f"Failed to get DNS credentials: {e}")
307 raise
309 if isinstance(certificate, CustomDomainCertificate) and certificate.delegation_cname:
310 self.output.info(f"Using challenge alias: {certificate.delegation_cname}")
311 args.extend(["--dns", "dns_cf", "--challenge-alias", certificate.delegation_cname])
312 else:
313 args.extend(["--dns", "dns_cf"])
315 exit_code = self._stream_acmesh_command(args, env=env, show_live_output=True)
317 if exit_code != 0:
318 error_msg = f"Certificate generation failed for {certificate.domain}"
319 self.output.display_error(error_msg)
320 raise SSLCertificateGenerateFailed(certificate.domain)
322 cert_dir = self.acmesh_home / certificate.domain
323 if not cert_dir.exists():
324 cert_dir_ecc = self.acmesh_home / f"{certificate.domain}_ecc"
325 if cert_dir_ecc.exists():
326 cert_dir = cert_dir_ecc
328 key_path = cert_dir / f"{certificate.domain}.key"
329 fullchain_path = cert_dir / "fullchain.cer"
331 if not fullchain_path.exists() or not key_path.exists():
332 error_msg = f"Certificate files not found after generation in {cert_dir}"
333 self.output.display_error(error_msg)
334 raise SSLCertificateNotFoundError(certificate.domain)
336 if dry_run:
337 self.output.print(f"[green]Certificate generated successfully (staging) for {certificate.domain}[/green]")
338 self.output.print(
339 "[yellow]Skipped: Copying certificate files (dry run)[/yellow]",
340 emoji_code=":fast_forward:",
341 )
342 return (key_path, fullchain_path)
344 dest_dir = self.root_dir / certificate.domain
345 dest_dir.mkdir(parents=True, exist_ok=True)
347 dest_key = dest_dir / "key.pem"
348 dest_fullchain = dest_dir / "fullchain.pem"
350 self.output.debug(f"Copying certificates to {dest_dir}")
351 shutil.copy(fullchain_path, dest_fullchain)
352 shutil.copy(key_path, dest_key)
354 self.output.print(f"Certificate generated successfully for {certificate.domain}")
355 return (dest_key, dest_fullchain)
357 def renew_certificate(self, certificate: SSLCertificate, dry_run: bool = False) -> bool:
358 """
359 Renew certificate using acme.sh.
361 Args:
362 certificate: Certificate to renew
363 dry_run: If True, skips copying cert files to permanent location (staging only)
365 Returns:
366 True if renewal succeeded, False otherwise
367 """
368 self.output.change_head(f"Renewing SSL certificate for {certificate.domain}")
370 if certificate.challenge_type.value == "dns01":
371 self._clear_cached_dns_credentials()
373 use_staging = os.getenv("FM_LETSENCRYPT_STAGING", "").lower() in ("1", "true", "yes")
374 if use_staging:
375 self.output.print(
376 "[yellow]Using Let's Encrypt STAGING server for renewal (test certificates)[/yellow]",
377 emoji_code=":warning:",
378 )
380 ca_server = LETSENCRYPT_STAGING_SERVER if use_staging else LETSENCRYPT_PRODUCTION_SERVER
382 args = [
383 "--home",
384 str(self.acmesh_home),
385 "--renew",
386 "-d",
387 certificate.domain,
388 "--server",
389 ca_server,
390 "--force",
391 "--debug",
392 ]
394 exit_code = self._stream_acmesh_command(args, show_live_output=True)
396 if exit_code != 0:
397 self.output.display_error(f"Certificate renewal failed for {certificate.domain}")
398 return False
400 cert_dir = self.acmesh_home / certificate.domain
401 if not cert_dir.exists():
402 cert_dir_ecc = self.acmesh_home / f"{certificate.domain}_ecc"
403 if cert_dir_ecc.exists():
404 cert_dir = cert_dir_ecc
406 fullchain_path = cert_dir / "fullchain.cer"
407 key_path = cert_dir / f"{certificate.domain}.key"
409 if fullchain_path.exists() and key_path.exists():
410 if dry_run:
411 self.output.print(f"[green]Certificate renewed successfully (staging) for {certificate.domain}[/green]")
412 self.output.print(
413 "[yellow]Skipped: Copying certificate files (dry run)[/yellow]",
414 emoji_code=":fast_forward:",
415 )
416 return True
418 dest_dir = self.root_dir / certificate.domain
419 dest_dir.mkdir(parents=True, exist_ok=True)
421 dest_key = dest_dir / "key.pem"
422 dest_fullchain = dest_dir / "fullchain.pem"
424 shutil.copy(fullchain_path, dest_fullchain)
425 shutil.copy(key_path, dest_key)
427 self.output.print(f"Certificate renewed successfully for {certificate.domain}")
428 return True
430 self.output.display_error(f"Renewed certificate files not found for {certificate.domain}")
431 return False
433 def remove_certificate(self, certificate: SSLCertificate) -> bool:
434 """
435 Remove certificate from acme.sh and service directory.
437 Args:
438 certificate: Certificate to remove
440 Returns:
441 True if removal succeeded, False otherwise
442 """
443 self.output.change_head(f"Removing SSL certificate for {certificate.domain}")
445 args = ["--remove", "-d", certificate.domain]
447 result = self._run_acmesh_command(args)
449 # Remove from service directory
450 cert_dir = self.root_dir / certificate.domain
451 if cert_dir.exists():
452 try:
453 shutil.rmtree(cert_dir)
454 self.output.debug(f"Removed certificate directory: {cert_dir}")
455 except Exception as e:
456 self.output.display_error(f"Failed to remove directory {cert_dir}: {e}")
458 # Also remove acme.sh internal directory (.acme.sh/<domain>_ecc)
459 # acme.sh --remove only renames the config to .conf.removed, it doesn't delete the directory
460 acmesh_internal_dir = self.acmesh_home / f"{certificate.domain}_ecc"
461 if acmesh_internal_dir.exists():
462 try:
463 shutil.rmtree(acmesh_internal_dir)
464 self.output.debug(f"Removed acme.sh internal directory: {acmesh_internal_dir}")
465 except Exception as e:
466 self.output.warning(f"Failed to remove acme.sh directory {acmesh_internal_dir}: {e}")
468 success = result.returncode == 0
469 if success:
470 self.output.print(f"Certificate removed successfully for {certificate.domain}")
471 else:
472 self.output.display_error(f"Failed to remove certificate for {certificate.domain}")
474 return success