Coverage for frappe_manager / ssl_manager / ssl_certificate_manager.py: 9%

352 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:35 +0530

1""" 

2SSL Certificate Manager with multi-certificate support and dependency injection. 

3 

4This module orchestrates SSL certificate operations by coordinating between: 

5- SSL certificate services (Let's Encrypt, acme.sh, self-signed, etc.) 

6- Certificate link manager (for symlink operations) 

7- Nginx controller (for service restarts) 

8 

9The manager supports multiple certificates per bench, allowing different domains 

10to use different certificate types and validation methods. 

11""" 

12 

13import os 

14from collections.abc import Callable 

15from datetime import datetime, timedelta 

16from pathlib import Path 

17 

18from frappe_manager import SSL_RENEW_BEFORE_DAYS 

19from frappe_manager.logger.contextual import ContextualLogger 

20from frappe_manager.output_manager import OutputHandler 

21from frappe_manager.ssl_manager.certificate import SSLCertificate 

22from frappe_manager.ssl_manager.certificate_exceptions import ( 

23 SSLCertificateNotDueForRenewalError, 

24 SSLCertificateNotFoundError, 

25) 

26from frappe_manager.ssl_manager.certificate_link_manager import CertificateLinkManager 

27from frappe_manager.ssl_manager.nginx_controller import NginxController 

28from frappe_manager.ssl_manager.ssl_certificate_service import SSLCertificateService 

29from frappe_manager.ssl_manager.storage_config import SSLStorageConfig 

30from frappe_manager.ssl_manager.vhost_config_manager import VhostConfigManager 

31from frappe_manager.utils.helpers import get_certificate_expiry_date 

32 

33 

34class SSLCertificateManager: 

35 """ 

36 Manages multiple SSL certificates with dependency injection. 

37 

38 This manager coordinates SSL certificate operations for multiple domains/certificates 

39 without being tightly coupled to specific implementations. It supports multiple 

40 certificates per bench and uses a service factory to create appropriate services 

41 for each certificate type. 

42 

43 Attributes: 

44 certificates: List of SSL certificate configurations to manage 

45 service_factory: Factory function to create certificate services 

46 link_manager: Manages symlinks between cert files and nginx-proxy 

47 nginx_controller: Controls nginx service operations 

48 vhost_manager: Manages per-domain HTTPS redirect configuration 

49 storage_config: Storage configuration for SSL operations 

50 config_save_callback: Callback to persist config changes to bench_config.toml 

51 output_handler: Output handler for user-facing messages 

52 services: Dictionary mapping domain to certificate service instances 

53 """ 

54 

55 def __init__( 

56 self, 

57 logger: ContextualLogger, 

58 certificates: list[SSLCertificate], 

59 service_factory: Callable[[SSLCertificate, SSLStorageConfig, OutputHandler], SSLCertificateService], 

60 link_manager: CertificateLinkManager, 

61 nginx_controller: NginxController, 

62 storage_config: SSLStorageConfig, 

63 output_handler: OutputHandler, 

64 config_save_callback: Callable[[], None] | None = None, 

65 ): 

66 """ 

67 Initialize the SSL certificate manager. 

68 

69 Args: 

70 logger: Contextual logger for audit/debug logging 

71 certificates: List of SSL certificate configurations to manage 

72 service_factory: Factory function to create certificate services 

73 link_manager: Manager for certificate symlink operations 

74 nginx_controller: Controller for nginx service operations 

75 storage_config: Storage configuration for SSL operations 

76 output_handler: Output handler for user messages 

77 config_save_callback: Optional callback to save config after modifications 

78 

79 Raises: 

80 ValueError: If any required dependency is None or invalid 

81 """ 

82 if logger is None: 

83 raise ValueError("Logger is required") 

84 if certificates is None: 

85 raise ValueError("Certificate configuration is required") 

86 if service_factory is None: 

87 raise ValueError("Certificate service factory is required") 

88 if link_manager is None: 

89 raise ValueError("Certificate link manager is required") 

90 if nginx_controller is None: 

91 raise ValueError("Nginx controller is required") 

92 if storage_config is None: 

93 raise ValueError("Storage config is required") 

94 if output_handler is None: 

95 raise ValueError("Output handler is required") 

96 

97 self.logger = logger.child(component="ssl_manager") 

98 self.certificates: list[SSLCertificate] = certificates if isinstance(certificates, list) else [certificates] 

99 self.service_factory = service_factory 

100 self.storage_config = storage_config 

101 self.output_handler = output_handler 

102 self.link_manager = link_manager 

103 self.nginx_controller = nginx_controller 

104 self.config_save_callback = config_save_callback 

105 

106 self.vhost_manager = VhostConfigManager(storage_config.vhostd_dir) 

107 

108 self.services: dict[str, SSLCertificateService] = {} 

109 for cert in self.certificates: 

110 self.services[cert.domain] = service_factory(cert, storage_config, output_handler) 

111 

112 def get_primary_certificate(self) -> SSLCertificate | None: 

113 """ 

114 Get the primary (first) certificate. 

115 

116 Returns: 

117 The first certificate in the list, or None if no certificates exist 

118 """ 

119 return self.certificates[0] if self.certificates else None 

120 

121 def add_certificate(self, certificate: SSLCertificate, dry_run: bool = False): 

122 self.logger.info("Adding certificate", extra_fields={"domain": certificate.domain, "dry_run": dry_run}) 

123 

124 if any(cert.domain == certificate.domain for cert in self.certificates): 

125 self.logger.warning("Certificate already exists", extra_fields={"domain": certificate.domain}) 

126 raise ValueError(f"Certificate for {certificate.domain} already exists") 

127 

128 if self.service_factory and self.storage_config and self.output_handler: 

129 service = self.service_factory(certificate, self.storage_config, self.output_handler) 

130 self.services[certificate.domain] = service 

131 self.logger.debug("Created certificate service", extra_fields={"domain": certificate.domain}) 

132 else: 

133 self.logger.error("Cannot add certificate: missing dependencies") 

134 raise RuntimeError( 

135 "Cannot add certificate: service_factory, storage_config, and output_handler are required", 

136 ) 

137 

138 original_staging = None 

139 if dry_run: 

140 self.output_handler.print( 

141 "[bold yellow]DRY RUN MODE: Using Let's Encrypt staging server[/bold yellow]", 

142 emoji_code="🧪", 

143 ) 

144 self.output_handler.print( 

145 "[dim]No system modifications will be made (no symlinks, nginx restart, or config save)[/dim]", 

146 ) 

147 

148 original_staging = os.environ.get("FM_LETSENCRYPT_STAGING") 

149 os.environ["FM_LETSENCRYPT_STAGING"] = "1" 

150 self.logger.debug("Enabled staging mode for dry run") 

151 

152 try: 

153 self.logger.info("Generating certificate", extra_fields={"domain": certificate.domain}) 

154 privkey_path, fullchain_path = service.generate_certificate(certificate, dry_run=dry_run) 

155 self.logger.info( 

156 "Certificate generated", 

157 extra_fields={ 

158 "domain": certificate.domain, 

159 "privkey": str(privkey_path), 

160 "fullchain": str(fullchain_path), 

161 }, 

162 ) 

163 

164 ssl_dir = self.storage_config.ssl_dir 

165 

166 try: 

167 relative_path = privkey_path.relative_to(ssl_dir) 

168 actual_cert_type = relative_path.parts[0] 

169 except (ValueError, IndexError): 

170 actual_cert_type = getattr(certificate, "acme_client", "letsencrypt") 

171 

172 if dry_run: 

173 self.output_handler.print(f"[green]Certificate validated successfully for {certificate.domain}[/green]") 

174 self.output_handler.print("[yellow]Skipped: Creating symlinks (dry run)[/yellow]", emoji_code="⏭️ ") 

175 self.output_handler.print( 

176 "[yellow]Skipped: Creating vhost.d redirect config (dry run)[/yellow]", 

177 emoji_code="⏭️ ", 

178 ) 

179 self.output_handler.print("[yellow]Skipped: Restarting nginx (dry run)[/yellow]", emoji_code="⏭️ ") 

180 self.output_handler.print("[yellow]Skipped: Saving configuration (dry run)[/yellow]", emoji_code="⏭️ ") 

181 self.logger.info("Dry run completed successfully", extra_fields={"domain": certificate.domain}) 

182 else: 

183 self.logger.info( 

184 "Creating certificate symlinks", 

185 extra_fields={"domain": certificate.domain, "cert_type": actual_cert_type}, 

186 ) 

187 self.link_manager.link_certificate( 

188 cert_type=actual_cert_type, 

189 domain=certificate.domain, 

190 privkey_path=privkey_path, 

191 fullchain_path=fullchain_path, 

192 alias_domains=None, 

193 ) 

194 

195 self.vhost_manager.enable_https_redirect(certificate.domain) 

196 self.output_handler.print(f"Created vhost.d redirect config for {certificate.domain}") 

197 self.logger.debug("Enabled HTTPS redirect", extra_fields={"domain": certificate.domain}) 

198 

199 self.certificates.append(certificate) 

200 self.logger.info("Restarting nginx") 

201 self.nginx_controller.restart() 

202 if self.config_save_callback: 

203 self.config_save_callback() 

204 self.logger.debug("Saved configuration") 

205 

206 self.logger.info("Certificate added successfully", extra_fields={"domain": certificate.domain}) 

207 

208 finally: 

209 if dry_run: 

210 if original_staging is not None: 

211 os.environ["FM_LETSENCRYPT_STAGING"] = original_staging 

212 else: 

213 os.environ.pop("FM_LETSENCRYPT_STAGING", None) 

214 

215 def remove_certificate_by_domain(self, domain: str): 

216 self.logger.info("Removing certificate", extra_fields={"domain": domain}) 

217 

218 cert_to_remove = None 

219 for cert in self.certificates: 

220 if cert.domain == domain: 

221 cert_to_remove = cert 

222 break 

223 

224 if not cert_to_remove: 

225 self.logger.warning("Certificate not found", extra_fields={"domain": domain}) 

226 raise SSLCertificateNotFoundError(domain) 

227 

228 service = self.services.get(domain) 

229 if not service: 

230 self.logger.error("No service found for domain", extra_fields={"domain": domain}) 

231 raise RuntimeError(f"No service found for domain {domain}") 

232 

233 self.logger.debug("Unlinking certificate symlinks", extra_fields={"domain": domain}) 

234 self.link_manager.unlink_certificate(domain, alias_domains=None) 

235 

236 self.vhost_manager.disable_https_redirect(domain) 

237 self.logger.debug("Disabled HTTPS redirect", extra_fields={"domain": domain}) 

238 

239 self.logger.debug("Removing certificate files", extra_fields={"domain": domain}) 

240 service.remove_certificate(cert_to_remove) 

241 

242 self.certificates.remove(cert_to_remove) 

243 del self.services[domain] 

244 

245 self.logger.info("Restarting nginx") 

246 self.nginx_controller.restart() 

247 

248 if self.config_save_callback: 

249 self.config_save_callback() 

250 self.logger.debug("Saved configuration") 

251 

252 self.logger.info("Certificate removed successfully", extra_fields={"domain": domain}) 

253 

254 def list_certificates(self) -> list[dict]: 

255 """ 

256 List all managed certificates with their status. 

257 

258 Returns: 

259 List of dictionaries with certificate information: 

260 - domain: Certificate domain 

261 - ssl_type: Certificate type (letsencrypt, etc.) 

262 - challenge_type: Challenge type (http01 or dns01) 

263 - exists: Whether certificate files exist 

264 - expiry_date: Certificate expiry date (if exists) 

265 - needs_renewal: Whether certificate needs renewal 

266 - days_until_expiry: Days until certificate expires 

267 """ 

268 result = [] 

269 for cert in self.certificates: 

270 info = { 

271 "domain": cert.domain, 

272 "ssl_type": cert.ssl_type.value, 

273 "challenge_type": cert.challenge_type.value, 

274 "exists": False, 

275 "expiry_date": None, 

276 "needs_renewal": False, 

277 "days_until_expiry": None, 

278 } 

279 

280 try: 

281 # Check if certificate exists 

282 privkey_path, fullchain_path = self.link_manager.get_certificate_paths(cert.domain) 

283 info["exists"] = True 

284 

285 # Get expiry information 

286 expiry_date = get_certificate_expiry_date(fullchain_path) 

287 info["expiry_date"] = expiry_date 

288 

289 # Calculate renewal status 

290 expiry_date_with_threshold = expiry_date - timedelta(days=SSL_RENEW_BEFORE_DAYS) 

291 today_date = datetime.now() 

292 if expiry_date_with_threshold.tzinfo: 

293 today_date = today_date.replace(tzinfo=expiry_date_with_threshold.tzinfo) 

294 

295 info["needs_renewal"] = not expiry_date_with_threshold > today_date 

296 

297 # Calculate days until expiry 

298 days_until_expiry = (expiry_date - today_date).days 

299 info["days_until_expiry"] = days_until_expiry 

300 

301 except (FileNotFoundError, SSLCertificateNotFoundError): 

302 pass 

303 

304 result.append(info) 

305 

306 return result 

307 

308 def has_certificate(self, domain: str | None = None) -> bool: 

309 """ 

310 Check if a certificate exists for a domain. 

311 

312 Args: 

313 domain: Domain to check. If None, checks the primary certificate. 

314 

315 Returns: 

316 True if certificate files exist, False otherwise 

317 """ 

318 if domain is None: 

319 primary = self.get_primary_certificate() 

320 if not primary: 

321 return False 

322 domain = primary.domain 

323 

324 try: 

325 self.link_manager.get_certificate_paths(domain) 

326 return True 

327 except (FileNotFoundError, SSLCertificateNotFoundError): 

328 return False 

329 

330 def get_certificate_paths(self, domain: str | None = None) -> tuple[Path, Path]: 

331 """ 

332 Get paths to the certificate files for a domain. 

333 

334 Args: 

335 domain: Domain to get paths for. If None, uses primary certificate. 

336 

337 Returns: 

338 Tuple of (privkey_path, fullchain_path) 

339 

340 Raises: 

341 SSLCertificateNotFoundError: If certificate doesn't exist 

342 """ 

343 if domain is None: 

344 primary = self.get_primary_certificate() 

345 if not primary: 

346 raise SSLCertificateNotFoundError("No primary certificate configured") 

347 domain = primary.domain 

348 

349 try: 

350 return self.link_manager.get_certificate_paths(domain) 

351 except FileNotFoundError: 

352 raise SSLCertificateNotFoundError(domain) 

353 

354 def get_certificate_expiry(self, domain: str | None = None) -> datetime: 

355 """ 

356 Get the expiry date of a certificate. 

357 

358 Args: 

359 domain: Domain to check. If None, uses primary certificate. 

360 

361 Returns: 

362 Datetime object representing when the certificate expires 

363 

364 Raises: 

365 SSLCertificateNotFoundError: If certificate doesn't exist 

366 """ 

367 privkey_path, fullchain_path = self.get_certificate_paths(domain) 

368 return get_certificate_expiry_date(fullchain_path) 

369 

370 def needs_renewal(self, domain: str | None = None) -> bool: 

371 """ 

372 Check if a certificate needs renewal. 

373 

374 A certificate needs renewal if it will expire within SSL_RENEW_BEFORE_DAYS. 

375 

376 Args: 

377 domain: Domain to check. If None, uses primary certificate. 

378 

379 Returns: 

380 True if certificate should be renewed, False otherwise 

381 """ 

382 expiry_date = self.get_certificate_expiry(domain) 

383 expiry_date_with_threshold = expiry_date - timedelta(days=SSL_RENEW_BEFORE_DAYS) 

384 

385 today_date = datetime.now() 

386 if expiry_date_with_threshold.tzinfo: 

387 today_date = today_date.replace(tzinfo=expiry_date_with_threshold.tzinfo) 

388 

389 return not expiry_date_with_threshold > today_date 

390 

391 def generate_all_certificates(self): 

392 """ 

393 Generate individual SSL certificates for ALL configured certificates. 

394 

395 This method generates a separate certificate for each entry in the 

396 certificates list, without combining domains into SAN certificates. 

397 Each certificate is generated independently for its own domain only. 

398 

399 This is useful when you want individual certificates per domain rather 

400 than a single certificate covering multiple domains via SANs. 

401 

402 Raises: 

403 SSLCertificateGenerateFailed: If any certificate generation fails 

404 """ 

405 if not self.certificates: 

406 raise ValueError("No certificates configured") 

407 

408 self.output_handler.change_head("Generating individual certificates for all domains") 

409 

410 for certificate in self.certificates: 

411 # Get service for this certificate 

412 service = self.services.get(certificate.domain) 

413 if not service: 

414 raise RuntimeError(f"No service found for domain {certificate.domain}") 

415 

416 # Generate certificate for this domain ONLY (individual cert) 

417 self.output_handler.print(f"Generating certificate for {certificate.domain}") 

418 privkey_path, fullchain_path = service.generate_certificate(certificate) 

419 

420 # Determine actual cert_type from the returned path 

421 # (e.g., letsencrypt service stores in "letsencrypt", acmesh stores in "acmesh") 

422 ssl_dir = self.storage_config.ssl_dir 

423 try: 

424 # Path structure is: ssl_dir / cert_type / domain / file 

425 relative_path = privkey_path.relative_to(ssl_dir) 

426 actual_cert_type = relative_path.parts[0] # First directory after ssl_dir 

427 except (ValueError, IndexError): 

428 # Fallback to certificate's ssl_type if path detection fails 

429 actual_cert_type = certificate.ssl_type.value 

430 

431 # Create symlinks for nginx-proxy (no alias_domains) 

432 self.link_manager.link_certificate( 

433 cert_type=actual_cert_type, 

434 domain=certificate.domain, 

435 privkey_path=privkey_path, 

436 fullchain_path=fullchain_path, 

437 alias_domains=None, 

438 ) 

439 

440 # Enable HTTPS redirect for this domain 

441 self.vhost_manager.enable_https_redirect(certificate.domain) 

442 self.output_handler.print(f"Created vhost.d redirect config for {certificate.domain}") 

443 

444 # Restart nginx once after all certificates are generated 

445 self.nginx_controller.restart() 

446 self.output_handler.print("All individual certificates generated successfully") 

447 

448 def _renew_single_certificate( 

449 self, 

450 certificate: SSLCertificate, 

451 dry_run: bool, 

452 force: bool, 

453 skip_nginx_restart: bool = False, 

454 ): 

455 """ 

456 Core renewal logic for a single certificate. 

457 

458 This method handles the actual renewal process including fallback to re-issue 

459 if the certificate is not found in acme.sh. 

460 

461 Args: 

462 certificate: The certificate to renew 

463 dry_run: If True, uses Let's Encrypt staging server and skips system modifications 

464 force: If True, forces renewal even if certificate is not due for renewal 

465 skip_nginx_restart: If True, skips nginx restart (for batch renewals) 

466 

467 Raises: 

468 SSLCertificateNotDueForRenewalError: If certificate doesn't need renewal yet 

469 RuntimeError: If no service found for the domain 

470 """ 

471 if not force and not self.needs_renewal(certificate.domain): 

472 raise SSLCertificateNotDueForRenewalError( 

473 certificate.domain, 

474 self.get_certificate_expiry(certificate.domain), 

475 ) 

476 

477 # Get service for this certificate 

478 service = self.services.get(certificate.domain) 

479 if not service: 

480 raise RuntimeError(f"No service found for domain {certificate.domain}") 

481 

482 # Renew the certificate 

483 self.output_handler.print(f"Renewing certificate for {certificate.domain}", emoji_code="🔄") 

484 renewal_success = service.renew_certificate(certificate, dry_run=dry_run) 

485 

486 reissued_paths = None 

487 if not renewal_success: 

488 self.output_handler.print( 

489 "[yellow]Certificate not found in acme.sh, re-issuing...[/yellow]", 

490 emoji_code="⚠️", 

491 ) 

492 key_path, fullchain_path = service.generate_certificate(certificate, dry_run=dry_run) 

493 reissued_paths = (key_path, fullchain_path) 

494 

495 if not dry_run: 

496 self.output_handler.print(f"[green]Certificate re-issued successfully for {certificate.domain}[/green]") 

497 

498 if dry_run: 

499 self.output_handler.print( 

500 f"[green]Certificate renewal validated successfully for {certificate.domain}[/green]", 

501 ) 

502 self.output_handler.print("[yellow]️Skipped: Updating symlinks (dry run)[/yellow]", emoji_code="⏭ ") 

503 if not skip_nginx_restart: 

504 self.output_handler.print("[yellow]Skipped: Restarting nginx (dry run)[/yellow]", emoji_code="⏭️ ") 

505 else: 

506 try: 

507 if reissued_paths: 

508 privkey_path, fullchain_path = reissued_paths 

509 else: 

510 privkey_path, fullchain_path = self.get_certificate_paths(certificate.domain) 

511 

512 ssl_dir = self.storage_config.ssl_dir 

513 try: 

514 relative_path = privkey_path.relative_to(ssl_dir) 

515 actual_cert_type = relative_path.parts[0] 

516 except (ValueError, IndexError): 

517 actual_cert_type = getattr(certificate, "acme_client", "letsencrypt") 

518 

519 self.link_manager.link_certificate( 

520 cert_type=actual_cert_type, 

521 domain=certificate.domain, 

522 privkey_path=privkey_path, 

523 fullchain_path=fullchain_path, 

524 alias_domains=None, 

525 ) 

526 except Exception: 

527 pass 

528 

529 if not skip_nginx_restart: 

530 self.nginx_controller.restart() 

531 

532 self.output_handler.print(f"Successfully renewed {certificate.domain}") 

533 

534 def renew_certificate(self, domain: str | None = None, dry_run: bool = False, force: bool = False): 

535 self.logger.info( 

536 "Renewing certificate", 

537 extra_fields={"domain": domain or "primary", "dry_run": dry_run, "force": force}, 

538 ) 

539 

540 if domain is None: 

541 primary = self.get_primary_certificate() 

542 if not primary: 

543 self.logger.error("No primary certificate configured") 

544 raise ValueError("No primary certificate configured") 

545 certificate = primary 

546 else: 

547 certificate = None 

548 for cert in self.certificates: 

549 if cert.domain == domain: 

550 certificate = cert 

551 break 

552 if not certificate: 

553 self.logger.warning("Certificate not found for renewal", extra_fields={"domain": domain}) 

554 raise SSLCertificateNotFoundError(domain) 

555 

556 original_staging = None 

557 if dry_run: 

558 self.output_handler.print( 

559 "[bold yellow]DRY RUN MODE: Using Let's Encrypt staging server[/bold yellow]", 

560 emoji_code="🧪 ", 

561 ) 

562 self.output_handler.print("[dim]No system modifications will be made (no symlinks or nginx restart)[/dim]") 

563 

564 original_staging = os.environ.get("FM_LETSENCRYPT_STAGING") 

565 os.environ["FM_LETSENCRYPT_STAGING"] = "1" 

566 self.logger.debug("Enabled staging mode for dry run") 

567 

568 try: 

569 self._renew_single_certificate( 

570 certificate=certificate, 

571 dry_run=dry_run, 

572 force=force, 

573 skip_nginx_restart=False, 

574 ) 

575 self.logger.info("Certificate renewed successfully", extra_fields={"domain": certificate.domain}) 

576 

577 finally: 

578 if dry_run: 

579 if original_staging is not None: 

580 os.environ["FM_LETSENCRYPT_STAGING"] = original_staging 

581 else: 

582 os.environ.pop("FM_LETSENCRYPT_STAGING", None) 

583 

584 def renew_all_certificates(self, dry_run: bool = False, force: bool = False): 

585 """ 

586 Renew all SSL certificates that are due for renewal. 

587 

588 This method iterates through all configured certificates and renews 

589 those that are due for renewal. Certificates not due for renewal are 

590 skipped with a warning. After all renewals, nginx is restarted once. 

591 

592 Args: 

593 dry_run: If True, uses Let's Encrypt staging server and skips system modifications 

594 force: If True, forces renewal for all certificates regardless of expiry 

595 

596 Raises: 

597 SSLCertificateNotFoundError: If any certificate doesn't exist 

598 """ 

599 if not self.certificates: 

600 raise ValueError("No certificates configured") 

601 

602 self.output_handler.change_head("Renewing certificates for all domains") 

603 

604 original_staging = None 

605 if dry_run: 

606 self.output_handler.print( 

607 "[bold yellow]DRY RUN MODE: Using Let's Encrypt staging server[/bold yellow]", 

608 emoji_code="🧪", 

609 ) 

610 self.output_handler.print("[dim]No system modifications will be made (no symlinks or nginx restart)[/dim]") 

611 

612 original_staging = os.environ.get("FM_LETSENCRYPT_STAGING") 

613 os.environ["FM_LETSENCRYPT_STAGING"] = "1" 

614 

615 renewed_count = 0 

616 skipped_count = 0 

617 

618 try: 

619 for certificate in self.certificates: 

620 try: 

621 self._renew_single_certificate( 

622 certificate=certificate, 

623 dry_run=dry_run, 

624 force=force, 

625 skip_nginx_restart=True, 

626 ) 

627 renewed_count += 1 

628 

629 except SSLCertificateNotDueForRenewalError as e: 

630 self.output_handler.print(f"{e}", emoji_code="⏭️ ") 

631 skipped_count += 1 

632 except Exception as e: 

633 self.output_handler.print(f"Failed to renew {certificate.domain}: {e}", emoji_code="❌") 

634 

635 if renewed_count > 0: 

636 if dry_run: 

637 self.output_handler.print("[yellow]Skipped: Restarting nginx (dry run)[/yellow]", emoji_code="⏭️ ") 

638 else: 

639 self.nginx_controller.restart() 

640 self.output_handler.print(f"Renewal complete: {renewed_count} renewed, {skipped_count} skipped") 

641 

642 finally: 

643 if dry_run: 

644 if original_staging is not None: 

645 os.environ["FM_LETSENCRYPT_STAGING"] = original_staging 

646 else: 

647 os.environ.pop("FM_LETSENCRYPT_STAGING", None) 

648 

649 def remove_certificate(self, domain: str | None = None): 

650 """ 

651 Remove an SSL certificate and its symlinks. 

652 

653 This removes the certificate files, all associated symlinks, and restarts nginx. 

654 

655 Args: 

656 domain: Domain to remove. If None, uses primary certificate. 

657 """ 

658 if domain is None: 

659 primary = self.get_primary_certificate() 

660 if not primary: 

661 raise ValueError("No primary certificate configured") 

662 certificate = primary 

663 else: 

664 # Find certificate for this domain 

665 certificate = None 

666 for cert in self.certificates: 

667 if cert.domain == domain: 

668 certificate = cert 

669 break 

670 if not certificate: 

671 raise SSLCertificateNotFoundError(domain) 

672 

673 # Get service for this certificate 

674 service = self.services.get(certificate.domain) 

675 if not service: 

676 raise RuntimeError(f"No service found for domain {certificate.domain}") 

677 

678 # Remove symlinks first 

679 self.link_manager.unlink_certificate(certificate.domain, None) 

680 

681 # Disable HTTPS redirect for this domain (remove vhost.d config) 

682 self.vhost_manager.disable_https_redirect(certificate.domain) 

683 

684 # Remove actual certificate files 

685 service.remove_certificate(certificate) 

686 

687 # Restart nginx to apply changes 

688 self.nginx_controller.restart() 

689 

690 def remove_all_certificates(self): 

691 """ 

692 Remove ALL SSL certificates managed by this manager. 

693 

694 This removes all certificate files, symlinks, vhost configs, and acme.sh 

695 configurations for every certificate in the certificates list. This is 

696 useful for cleanup when deleting a bench entirely. 

697 

698 The nginx service is restarted once after all certificates are removed. 

699 """ 

700 if not self.certificates: 

701 self.output_handler.print("No certificates to remove") 

702 return 

703 

704 self.output_handler.change_head(f"Removing all SSL certificates ({len(self.certificates)} total)") 

705 

706 removed_count = 0 

707 failed_domains = [] 

708 

709 for certificate in self.certificates[:]: # Create a copy to iterate over 

710 try: 

711 self.output_handler.print(f"Removing certificate for {certificate.domain}") 

712 

713 # Get service for this certificate 

714 service = self.services.get(certificate.domain) 

715 if not service: 

716 self.output_handler.warning(f"No service found for domain {certificate.domain}, skipping") 

717 continue 

718 

719 # Remove symlinks 

720 try: 

721 self.link_manager.unlink_certificate(certificate.domain, None) 

722 except Exception as e: 

723 self.output_handler.warning(f"Failed to remove symlinks for {certificate.domain}: {e}") 

724 

725 # Disable HTTPS redirect for this domain (remove vhost.d config) 

726 try: 

727 self.vhost_manager.disable_https_redirect(certificate.domain) 

728 except Exception as e: 

729 self.output_handler.warning(f"Failed to remove vhost config for {certificate.domain}: {e}") 

730 

731 # Remove actual certificate files and acme.sh configuration 

732 try: 

733 service.remove_certificate(certificate) 

734 removed_count += 1 

735 except Exception as e: 

736 self.output_handler.warning(f"Failed to remove certificate files for {certificate.domain}: {e}") 

737 failed_domains.append(certificate.domain) 

738 

739 except Exception as e: 

740 self.output_handler.warning(f"Error removing certificate for {certificate.domain}: {e}") 

741 failed_domains.append(certificate.domain) 

742 

743 # Restart nginx once after all removals 

744 try: 

745 self.nginx_controller.restart() 

746 except Exception as e: 

747 self.output_handler.warning(f"Failed to restart nginx: {e}") 

748 

749 # Report results 

750 if removed_count > 0: 

751 self.output_handler.print(f"Successfully removed {removed_count} certificate(s)") 

752 if failed_domains: 

753 self.output_handler.warning(f"Failed to remove certificates for: {', '.join(failed_domains)}")