Coverage for frappe_manager / ssl_manager / ssl_certificate_manager.py: 9%
352 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:35 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:35 +0530
1"""
2SSL Certificate Manager with multi-certificate support and dependency injection.
4This module orchestrates SSL certificate operations by coordinating between:
5- SSL certificate services (Let's Encrypt, acme.sh, self-signed, etc.)
6- Certificate link manager (for symlink operations)
7- Nginx controller (for service restarts)
9The manager supports multiple certificates per bench, allowing different domains
10to use different certificate types and validation methods.
11"""
13import os
14from collections.abc import Callable
15from datetime import datetime, timedelta
16from pathlib import Path
18from frappe_manager import SSL_RENEW_BEFORE_DAYS
19from frappe_manager.logger.contextual import ContextualLogger
20from frappe_manager.output_manager import OutputHandler
21from frappe_manager.ssl_manager.certificate import SSLCertificate
22from frappe_manager.ssl_manager.certificate_exceptions import (
23 SSLCertificateNotDueForRenewalError,
24 SSLCertificateNotFoundError,
25)
26from frappe_manager.ssl_manager.certificate_link_manager import CertificateLinkManager
27from frappe_manager.ssl_manager.nginx_controller import NginxController
28from frappe_manager.ssl_manager.ssl_certificate_service import SSLCertificateService
29from frappe_manager.ssl_manager.storage_config import SSLStorageConfig
30from frappe_manager.ssl_manager.vhost_config_manager import VhostConfigManager
31from frappe_manager.utils.helpers import get_certificate_expiry_date
34class SSLCertificateManager:
35 """
36 Manages multiple SSL certificates with dependency injection.
38 This manager coordinates SSL certificate operations for multiple domains/certificates
39 without being tightly coupled to specific implementations. It supports multiple
40 certificates per bench and uses a service factory to create appropriate services
41 for each certificate type.
43 Attributes:
44 certificates: List of SSL certificate configurations to manage
45 service_factory: Factory function to create certificate services
46 link_manager: Manages symlinks between cert files and nginx-proxy
47 nginx_controller: Controls nginx service operations
48 vhost_manager: Manages per-domain HTTPS redirect configuration
49 storage_config: Storage configuration for SSL operations
50 config_save_callback: Callback to persist config changes to bench_config.toml
51 output_handler: Output handler for user-facing messages
52 services: Dictionary mapping domain to certificate service instances
53 """
55 def __init__(
56 self,
57 logger: ContextualLogger,
58 certificates: list[SSLCertificate],
59 service_factory: Callable[[SSLCertificate, SSLStorageConfig, OutputHandler], SSLCertificateService],
60 link_manager: CertificateLinkManager,
61 nginx_controller: NginxController,
62 storage_config: SSLStorageConfig,
63 output_handler: OutputHandler,
64 config_save_callback: Callable[[], None] | None = None,
65 ):
66 """
67 Initialize the SSL certificate manager.
69 Args:
70 logger: Contextual logger for audit/debug logging
71 certificates: List of SSL certificate configurations to manage
72 service_factory: Factory function to create certificate services
73 link_manager: Manager for certificate symlink operations
74 nginx_controller: Controller for nginx service operations
75 storage_config: Storage configuration for SSL operations
76 output_handler: Output handler for user messages
77 config_save_callback: Optional callback to save config after modifications
79 Raises:
80 ValueError: If any required dependency is None or invalid
81 """
82 if logger is None:
83 raise ValueError("Logger is required")
84 if certificates is None:
85 raise ValueError("Certificate configuration is required")
86 if service_factory is None:
87 raise ValueError("Certificate service factory is required")
88 if link_manager is None:
89 raise ValueError("Certificate link manager is required")
90 if nginx_controller is None:
91 raise ValueError("Nginx controller is required")
92 if storage_config is None:
93 raise ValueError("Storage config is required")
94 if output_handler is None:
95 raise ValueError("Output handler is required")
97 self.logger = logger.child(component="ssl_manager")
98 self.certificates: list[SSLCertificate] = certificates if isinstance(certificates, list) else [certificates]
99 self.service_factory = service_factory
100 self.storage_config = storage_config
101 self.output_handler = output_handler
102 self.link_manager = link_manager
103 self.nginx_controller = nginx_controller
104 self.config_save_callback = config_save_callback
106 self.vhost_manager = VhostConfigManager(storage_config.vhostd_dir)
108 self.services: dict[str, SSLCertificateService] = {}
109 for cert in self.certificates:
110 self.services[cert.domain] = service_factory(cert, storage_config, output_handler)
112 def get_primary_certificate(self) -> SSLCertificate | None:
113 """
114 Get the primary (first) certificate.
116 Returns:
117 The first certificate in the list, or None if no certificates exist
118 """
119 return self.certificates[0] if self.certificates else None
121 def add_certificate(self, certificate: SSLCertificate, dry_run: bool = False):
122 self.logger.info("Adding certificate", extra_fields={"domain": certificate.domain, "dry_run": dry_run})
124 if any(cert.domain == certificate.domain for cert in self.certificates):
125 self.logger.warning("Certificate already exists", extra_fields={"domain": certificate.domain})
126 raise ValueError(f"Certificate for {certificate.domain} already exists")
128 if self.service_factory and self.storage_config and self.output_handler:
129 service = self.service_factory(certificate, self.storage_config, self.output_handler)
130 self.services[certificate.domain] = service
131 self.logger.debug("Created certificate service", extra_fields={"domain": certificate.domain})
132 else:
133 self.logger.error("Cannot add certificate: missing dependencies")
134 raise RuntimeError(
135 "Cannot add certificate: service_factory, storage_config, and output_handler are required",
136 )
138 original_staging = None
139 if dry_run:
140 self.output_handler.print(
141 "[bold yellow]DRY RUN MODE: Using Let's Encrypt staging server[/bold yellow]",
142 emoji_code="🧪",
143 )
144 self.output_handler.print(
145 "[dim]No system modifications will be made (no symlinks, nginx restart, or config save)[/dim]",
146 )
148 original_staging = os.environ.get("FM_LETSENCRYPT_STAGING")
149 os.environ["FM_LETSENCRYPT_STAGING"] = "1"
150 self.logger.debug("Enabled staging mode for dry run")
152 try:
153 self.logger.info("Generating certificate", extra_fields={"domain": certificate.domain})
154 privkey_path, fullchain_path = service.generate_certificate(certificate, dry_run=dry_run)
155 self.logger.info(
156 "Certificate generated",
157 extra_fields={
158 "domain": certificate.domain,
159 "privkey": str(privkey_path),
160 "fullchain": str(fullchain_path),
161 },
162 )
164 ssl_dir = self.storage_config.ssl_dir
166 try:
167 relative_path = privkey_path.relative_to(ssl_dir)
168 actual_cert_type = relative_path.parts[0]
169 except (ValueError, IndexError):
170 actual_cert_type = getattr(certificate, "acme_client", "letsencrypt")
172 if dry_run:
173 self.output_handler.print(f"[green]Certificate validated successfully for {certificate.domain}[/green]")
174 self.output_handler.print("[yellow]Skipped: Creating symlinks (dry run)[/yellow]", emoji_code="⏭️ ")
175 self.output_handler.print(
176 "[yellow]Skipped: Creating vhost.d redirect config (dry run)[/yellow]",
177 emoji_code="⏭️ ",
178 )
179 self.output_handler.print("[yellow]Skipped: Restarting nginx (dry run)[/yellow]", emoji_code="⏭️ ")
180 self.output_handler.print("[yellow]Skipped: Saving configuration (dry run)[/yellow]", emoji_code="⏭️ ")
181 self.logger.info("Dry run completed successfully", extra_fields={"domain": certificate.domain})
182 else:
183 self.logger.info(
184 "Creating certificate symlinks",
185 extra_fields={"domain": certificate.domain, "cert_type": actual_cert_type},
186 )
187 self.link_manager.link_certificate(
188 cert_type=actual_cert_type,
189 domain=certificate.domain,
190 privkey_path=privkey_path,
191 fullchain_path=fullchain_path,
192 alias_domains=None,
193 )
195 self.vhost_manager.enable_https_redirect(certificate.domain)
196 self.output_handler.print(f"Created vhost.d redirect config for {certificate.domain}")
197 self.logger.debug("Enabled HTTPS redirect", extra_fields={"domain": certificate.domain})
199 self.certificates.append(certificate)
200 self.logger.info("Restarting nginx")
201 self.nginx_controller.restart()
202 if self.config_save_callback:
203 self.config_save_callback()
204 self.logger.debug("Saved configuration")
206 self.logger.info("Certificate added successfully", extra_fields={"domain": certificate.domain})
208 finally:
209 if dry_run:
210 if original_staging is not None:
211 os.environ["FM_LETSENCRYPT_STAGING"] = original_staging
212 else:
213 os.environ.pop("FM_LETSENCRYPT_STAGING", None)
215 def remove_certificate_by_domain(self, domain: str):
216 self.logger.info("Removing certificate", extra_fields={"domain": domain})
218 cert_to_remove = None
219 for cert in self.certificates:
220 if cert.domain == domain:
221 cert_to_remove = cert
222 break
224 if not cert_to_remove:
225 self.logger.warning("Certificate not found", extra_fields={"domain": domain})
226 raise SSLCertificateNotFoundError(domain)
228 service = self.services.get(domain)
229 if not service:
230 self.logger.error("No service found for domain", extra_fields={"domain": domain})
231 raise RuntimeError(f"No service found for domain {domain}")
233 self.logger.debug("Unlinking certificate symlinks", extra_fields={"domain": domain})
234 self.link_manager.unlink_certificate(domain, alias_domains=None)
236 self.vhost_manager.disable_https_redirect(domain)
237 self.logger.debug("Disabled HTTPS redirect", extra_fields={"domain": domain})
239 self.logger.debug("Removing certificate files", extra_fields={"domain": domain})
240 service.remove_certificate(cert_to_remove)
242 self.certificates.remove(cert_to_remove)
243 del self.services[domain]
245 self.logger.info("Restarting nginx")
246 self.nginx_controller.restart()
248 if self.config_save_callback:
249 self.config_save_callback()
250 self.logger.debug("Saved configuration")
252 self.logger.info("Certificate removed successfully", extra_fields={"domain": domain})
254 def list_certificates(self) -> list[dict]:
255 """
256 List all managed certificates with their status.
258 Returns:
259 List of dictionaries with certificate information:
260 - domain: Certificate domain
261 - ssl_type: Certificate type (letsencrypt, etc.)
262 - challenge_type: Challenge type (http01 or dns01)
263 - exists: Whether certificate files exist
264 - expiry_date: Certificate expiry date (if exists)
265 - needs_renewal: Whether certificate needs renewal
266 - days_until_expiry: Days until certificate expires
267 """
268 result = []
269 for cert in self.certificates:
270 info = {
271 "domain": cert.domain,
272 "ssl_type": cert.ssl_type.value,
273 "challenge_type": cert.challenge_type.value,
274 "exists": False,
275 "expiry_date": None,
276 "needs_renewal": False,
277 "days_until_expiry": None,
278 }
280 try:
281 # Check if certificate exists
282 privkey_path, fullchain_path = self.link_manager.get_certificate_paths(cert.domain)
283 info["exists"] = True
285 # Get expiry information
286 expiry_date = get_certificate_expiry_date(fullchain_path)
287 info["expiry_date"] = expiry_date
289 # Calculate renewal status
290 expiry_date_with_threshold = expiry_date - timedelta(days=SSL_RENEW_BEFORE_DAYS)
291 today_date = datetime.now()
292 if expiry_date_with_threshold.tzinfo:
293 today_date = today_date.replace(tzinfo=expiry_date_with_threshold.tzinfo)
295 info["needs_renewal"] = not expiry_date_with_threshold > today_date
297 # Calculate days until expiry
298 days_until_expiry = (expiry_date - today_date).days
299 info["days_until_expiry"] = days_until_expiry
301 except (FileNotFoundError, SSLCertificateNotFoundError):
302 pass
304 result.append(info)
306 return result
308 def has_certificate(self, domain: str | None = None) -> bool:
309 """
310 Check if a certificate exists for a domain.
312 Args:
313 domain: Domain to check. If None, checks the primary certificate.
315 Returns:
316 True if certificate files exist, False otherwise
317 """
318 if domain is None:
319 primary = self.get_primary_certificate()
320 if not primary:
321 return False
322 domain = primary.domain
324 try:
325 self.link_manager.get_certificate_paths(domain)
326 return True
327 except (FileNotFoundError, SSLCertificateNotFoundError):
328 return False
330 def get_certificate_paths(self, domain: str | None = None) -> tuple[Path, Path]:
331 """
332 Get paths to the certificate files for a domain.
334 Args:
335 domain: Domain to get paths for. If None, uses primary certificate.
337 Returns:
338 Tuple of (privkey_path, fullchain_path)
340 Raises:
341 SSLCertificateNotFoundError: If certificate doesn't exist
342 """
343 if domain is None:
344 primary = self.get_primary_certificate()
345 if not primary:
346 raise SSLCertificateNotFoundError("No primary certificate configured")
347 domain = primary.domain
349 try:
350 return self.link_manager.get_certificate_paths(domain)
351 except FileNotFoundError:
352 raise SSLCertificateNotFoundError(domain)
354 def get_certificate_expiry(self, domain: str | None = None) -> datetime:
355 """
356 Get the expiry date of a certificate.
358 Args:
359 domain: Domain to check. If None, uses primary certificate.
361 Returns:
362 Datetime object representing when the certificate expires
364 Raises:
365 SSLCertificateNotFoundError: If certificate doesn't exist
366 """
367 privkey_path, fullchain_path = self.get_certificate_paths(domain)
368 return get_certificate_expiry_date(fullchain_path)
370 def needs_renewal(self, domain: str | None = None) -> bool:
371 """
372 Check if a certificate needs renewal.
374 A certificate needs renewal if it will expire within SSL_RENEW_BEFORE_DAYS.
376 Args:
377 domain: Domain to check. If None, uses primary certificate.
379 Returns:
380 True if certificate should be renewed, False otherwise
381 """
382 expiry_date = self.get_certificate_expiry(domain)
383 expiry_date_with_threshold = expiry_date - timedelta(days=SSL_RENEW_BEFORE_DAYS)
385 today_date = datetime.now()
386 if expiry_date_with_threshold.tzinfo:
387 today_date = today_date.replace(tzinfo=expiry_date_with_threshold.tzinfo)
389 return not expiry_date_with_threshold > today_date
391 def generate_all_certificates(self):
392 """
393 Generate individual SSL certificates for ALL configured certificates.
395 This method generates a separate certificate for each entry in the
396 certificates list, without combining domains into SAN certificates.
397 Each certificate is generated independently for its own domain only.
399 This is useful when you want individual certificates per domain rather
400 than a single certificate covering multiple domains via SANs.
402 Raises:
403 SSLCertificateGenerateFailed: If any certificate generation fails
404 """
405 if not self.certificates:
406 raise ValueError("No certificates configured")
408 self.output_handler.change_head("Generating individual certificates for all domains")
410 for certificate in self.certificates:
411 # Get service for this certificate
412 service = self.services.get(certificate.domain)
413 if not service:
414 raise RuntimeError(f"No service found for domain {certificate.domain}")
416 # Generate certificate for this domain ONLY (individual cert)
417 self.output_handler.print(f"Generating certificate for {certificate.domain}")
418 privkey_path, fullchain_path = service.generate_certificate(certificate)
420 # Determine actual cert_type from the returned path
421 # (e.g., letsencrypt service stores in "letsencrypt", acmesh stores in "acmesh")
422 ssl_dir = self.storage_config.ssl_dir
423 try:
424 # Path structure is: ssl_dir / cert_type / domain / file
425 relative_path = privkey_path.relative_to(ssl_dir)
426 actual_cert_type = relative_path.parts[0] # First directory after ssl_dir
427 except (ValueError, IndexError):
428 # Fallback to certificate's ssl_type if path detection fails
429 actual_cert_type = certificate.ssl_type.value
431 # Create symlinks for nginx-proxy (no alias_domains)
432 self.link_manager.link_certificate(
433 cert_type=actual_cert_type,
434 domain=certificate.domain,
435 privkey_path=privkey_path,
436 fullchain_path=fullchain_path,
437 alias_domains=None,
438 )
440 # Enable HTTPS redirect for this domain
441 self.vhost_manager.enable_https_redirect(certificate.domain)
442 self.output_handler.print(f"Created vhost.d redirect config for {certificate.domain}")
444 # Restart nginx once after all certificates are generated
445 self.nginx_controller.restart()
446 self.output_handler.print("All individual certificates generated successfully")
448 def _renew_single_certificate(
449 self,
450 certificate: SSLCertificate,
451 dry_run: bool,
452 force: bool,
453 skip_nginx_restart: bool = False,
454 ):
455 """
456 Core renewal logic for a single certificate.
458 This method handles the actual renewal process including fallback to re-issue
459 if the certificate is not found in acme.sh.
461 Args:
462 certificate: The certificate to renew
463 dry_run: If True, uses Let's Encrypt staging server and skips system modifications
464 force: If True, forces renewal even if certificate is not due for renewal
465 skip_nginx_restart: If True, skips nginx restart (for batch renewals)
467 Raises:
468 SSLCertificateNotDueForRenewalError: If certificate doesn't need renewal yet
469 RuntimeError: If no service found for the domain
470 """
471 if not force and not self.needs_renewal(certificate.domain):
472 raise SSLCertificateNotDueForRenewalError(
473 certificate.domain,
474 self.get_certificate_expiry(certificate.domain),
475 )
477 # Get service for this certificate
478 service = self.services.get(certificate.domain)
479 if not service:
480 raise RuntimeError(f"No service found for domain {certificate.domain}")
482 # Renew the certificate
483 self.output_handler.print(f"Renewing certificate for {certificate.domain}", emoji_code="🔄")
484 renewal_success = service.renew_certificate(certificate, dry_run=dry_run)
486 reissued_paths = None
487 if not renewal_success:
488 self.output_handler.print(
489 "[yellow]Certificate not found in acme.sh, re-issuing...[/yellow]",
490 emoji_code="⚠️",
491 )
492 key_path, fullchain_path = service.generate_certificate(certificate, dry_run=dry_run)
493 reissued_paths = (key_path, fullchain_path)
495 if not dry_run:
496 self.output_handler.print(f"[green]Certificate re-issued successfully for {certificate.domain}[/green]")
498 if dry_run:
499 self.output_handler.print(
500 f"[green]Certificate renewal validated successfully for {certificate.domain}[/green]",
501 )
502 self.output_handler.print("[yellow]️Skipped: Updating symlinks (dry run)[/yellow]", emoji_code="⏭ ")
503 if not skip_nginx_restart:
504 self.output_handler.print("[yellow]Skipped: Restarting nginx (dry run)[/yellow]", emoji_code="⏭️ ")
505 else:
506 try:
507 if reissued_paths:
508 privkey_path, fullchain_path = reissued_paths
509 else:
510 privkey_path, fullchain_path = self.get_certificate_paths(certificate.domain)
512 ssl_dir = self.storage_config.ssl_dir
513 try:
514 relative_path = privkey_path.relative_to(ssl_dir)
515 actual_cert_type = relative_path.parts[0]
516 except (ValueError, IndexError):
517 actual_cert_type = getattr(certificate, "acme_client", "letsencrypt")
519 self.link_manager.link_certificate(
520 cert_type=actual_cert_type,
521 domain=certificate.domain,
522 privkey_path=privkey_path,
523 fullchain_path=fullchain_path,
524 alias_domains=None,
525 )
526 except Exception:
527 pass
529 if not skip_nginx_restart:
530 self.nginx_controller.restart()
532 self.output_handler.print(f"Successfully renewed {certificate.domain}")
534 def renew_certificate(self, domain: str | None = None, dry_run: bool = False, force: bool = False):
535 self.logger.info(
536 "Renewing certificate",
537 extra_fields={"domain": domain or "primary", "dry_run": dry_run, "force": force},
538 )
540 if domain is None:
541 primary = self.get_primary_certificate()
542 if not primary:
543 self.logger.error("No primary certificate configured")
544 raise ValueError("No primary certificate configured")
545 certificate = primary
546 else:
547 certificate = None
548 for cert in self.certificates:
549 if cert.domain == domain:
550 certificate = cert
551 break
552 if not certificate:
553 self.logger.warning("Certificate not found for renewal", extra_fields={"domain": domain})
554 raise SSLCertificateNotFoundError(domain)
556 original_staging = None
557 if dry_run:
558 self.output_handler.print(
559 "[bold yellow]DRY RUN MODE: Using Let's Encrypt staging server[/bold yellow]",
560 emoji_code="🧪 ",
561 )
562 self.output_handler.print("[dim]No system modifications will be made (no symlinks or nginx restart)[/dim]")
564 original_staging = os.environ.get("FM_LETSENCRYPT_STAGING")
565 os.environ["FM_LETSENCRYPT_STAGING"] = "1"
566 self.logger.debug("Enabled staging mode for dry run")
568 try:
569 self._renew_single_certificate(
570 certificate=certificate,
571 dry_run=dry_run,
572 force=force,
573 skip_nginx_restart=False,
574 )
575 self.logger.info("Certificate renewed successfully", extra_fields={"domain": certificate.domain})
577 finally:
578 if dry_run:
579 if original_staging is not None:
580 os.environ["FM_LETSENCRYPT_STAGING"] = original_staging
581 else:
582 os.environ.pop("FM_LETSENCRYPT_STAGING", None)
584 def renew_all_certificates(self, dry_run: bool = False, force: bool = False):
585 """
586 Renew all SSL certificates that are due for renewal.
588 This method iterates through all configured certificates and renews
589 those that are due for renewal. Certificates not due for renewal are
590 skipped with a warning. After all renewals, nginx is restarted once.
592 Args:
593 dry_run: If True, uses Let's Encrypt staging server and skips system modifications
594 force: If True, forces renewal for all certificates regardless of expiry
596 Raises:
597 SSLCertificateNotFoundError: If any certificate doesn't exist
598 """
599 if not self.certificates:
600 raise ValueError("No certificates configured")
602 self.output_handler.change_head("Renewing certificates for all domains")
604 original_staging = None
605 if dry_run:
606 self.output_handler.print(
607 "[bold yellow]DRY RUN MODE: Using Let's Encrypt staging server[/bold yellow]",
608 emoji_code="🧪",
609 )
610 self.output_handler.print("[dim]No system modifications will be made (no symlinks or nginx restart)[/dim]")
612 original_staging = os.environ.get("FM_LETSENCRYPT_STAGING")
613 os.environ["FM_LETSENCRYPT_STAGING"] = "1"
615 renewed_count = 0
616 skipped_count = 0
618 try:
619 for certificate in self.certificates:
620 try:
621 self._renew_single_certificate(
622 certificate=certificate,
623 dry_run=dry_run,
624 force=force,
625 skip_nginx_restart=True,
626 )
627 renewed_count += 1
629 except SSLCertificateNotDueForRenewalError as e:
630 self.output_handler.print(f"{e}", emoji_code="⏭️ ")
631 skipped_count += 1
632 except Exception as e:
633 self.output_handler.print(f"Failed to renew {certificate.domain}: {e}", emoji_code="❌")
635 if renewed_count > 0:
636 if dry_run:
637 self.output_handler.print("[yellow]Skipped: Restarting nginx (dry run)[/yellow]", emoji_code="⏭️ ")
638 else:
639 self.nginx_controller.restart()
640 self.output_handler.print(f"Renewal complete: {renewed_count} renewed, {skipped_count} skipped")
642 finally:
643 if dry_run:
644 if original_staging is not None:
645 os.environ["FM_LETSENCRYPT_STAGING"] = original_staging
646 else:
647 os.environ.pop("FM_LETSENCRYPT_STAGING", None)
649 def remove_certificate(self, domain: str | None = None):
650 """
651 Remove an SSL certificate and its symlinks.
653 This removes the certificate files, all associated symlinks, and restarts nginx.
655 Args:
656 domain: Domain to remove. If None, uses primary certificate.
657 """
658 if domain is None:
659 primary = self.get_primary_certificate()
660 if not primary:
661 raise ValueError("No primary certificate configured")
662 certificate = primary
663 else:
664 # Find certificate for this domain
665 certificate = None
666 for cert in self.certificates:
667 if cert.domain == domain:
668 certificate = cert
669 break
670 if not certificate:
671 raise SSLCertificateNotFoundError(domain)
673 # Get service for this certificate
674 service = self.services.get(certificate.domain)
675 if not service:
676 raise RuntimeError(f"No service found for domain {certificate.domain}")
678 # Remove symlinks first
679 self.link_manager.unlink_certificate(certificate.domain, None)
681 # Disable HTTPS redirect for this domain (remove vhost.d config)
682 self.vhost_manager.disable_https_redirect(certificate.domain)
684 # Remove actual certificate files
685 service.remove_certificate(certificate)
687 # Restart nginx to apply changes
688 self.nginx_controller.restart()
690 def remove_all_certificates(self):
691 """
692 Remove ALL SSL certificates managed by this manager.
694 This removes all certificate files, symlinks, vhost configs, and acme.sh
695 configurations for every certificate in the certificates list. This is
696 useful for cleanup when deleting a bench entirely.
698 The nginx service is restarted once after all certificates are removed.
699 """
700 if not self.certificates:
701 self.output_handler.print("No certificates to remove")
702 return
704 self.output_handler.change_head(f"Removing all SSL certificates ({len(self.certificates)} total)")
706 removed_count = 0
707 failed_domains = []
709 for certificate in self.certificates[:]: # Create a copy to iterate over
710 try:
711 self.output_handler.print(f"Removing certificate for {certificate.domain}")
713 # Get service for this certificate
714 service = self.services.get(certificate.domain)
715 if not service:
716 self.output_handler.warning(f"No service found for domain {certificate.domain}, skipping")
717 continue
719 # Remove symlinks
720 try:
721 self.link_manager.unlink_certificate(certificate.domain, None)
722 except Exception as e:
723 self.output_handler.warning(f"Failed to remove symlinks for {certificate.domain}: {e}")
725 # Disable HTTPS redirect for this domain (remove vhost.d config)
726 try:
727 self.vhost_manager.disable_https_redirect(certificate.domain)
728 except Exception as e:
729 self.output_handler.warning(f"Failed to remove vhost config for {certificate.domain}: {e}")
731 # Remove actual certificate files and acme.sh configuration
732 try:
733 service.remove_certificate(certificate)
734 removed_count += 1
735 except Exception as e:
736 self.output_handler.warning(f"Failed to remove certificate files for {certificate.domain}: {e}")
737 failed_domains.append(certificate.domain)
739 except Exception as e:
740 self.output_handler.warning(f"Error removing certificate for {certificate.domain}: {e}")
741 failed_domains.append(certificate.domain)
743 # Restart nginx once after all removals
744 try:
745 self.nginx_controller.restart()
746 except Exception as e:
747 self.output_handler.warning(f"Failed to restart nginx: {e}")
749 # Report results
750 if removed_count > 0:
751 self.output_handler.print(f"Successfully removed {removed_count} certificate(s)")
752 if failed_domains:
753 self.output_handler.warning(f"Failed to remove certificates for: {', '.join(failed_domains)}")