Coverage for frappe_manager / commands / ssl / bench_helpers.py: 11%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1"""Helper functions for bench SSL certificate operations.""" 

2 

3import typer 

4from rich.table import Table 

5 

6from frappe_manager.logger.context import LoggerContext 

7from frappe_manager.output_manager import spinner, temporary_stop 

8from frappe_manager.site_manager.site import Bench 

9from frappe_manager.ssl_manager import LETSENCRYPT_PREFERRED_CHALLENGE, SUPPORTED_SSL_TYPES 

10from frappe_manager.ssl_manager.certificate_exceptions import SSLCertificateNotFoundError 

11from frappe_manager.ssl_manager.letsencrypt_certificate import CustomDomainCertificate, LetsencryptSSLCertificate 

12 

13from .helpers import get_output_handler 

14 

15 

16def _add_bench_certificate( 

17 ctx: typer.Context, 

18 benchname: str, 

19 domain: str, 

20 challenge: LETSENCRYPT_PREFERRED_CHALLENGE, 

21 cname: str | None, 

22 dry_run: bool, 

23): 

24 """Add SSL certificate for a bench domain (existing logic extracted).""" 

25 

26 services_manager = ctx.obj["services"] 

27 

28 context = LoggerContext(bench=benchname, operation="ssl-add") 

29 output = get_output_handler(ctx, context=context) 

30 logger = ctx.obj.get("logger") 

31 bench = Bench.get_object(benchname, services_manager, logger=logger, output_handler=output) 

32 

33 allowed_domains = bench.bench_config.get_all_domains() 

34 if domain not in allowed_domains: 

35 output.display_error( 

36 f"Domain '{domain}' is not configured for bench '{benchname}'.\n" 

37 f"Allowed domains: {', '.join(allowed_domains)}\n" 

38 f"To add an alias domain, use: fm update {benchname} --add-alias {domain}", 

39 ) 

40 raise typer.Exit(1) 

41 

42 if cname and challenge != LETSENCRYPT_PREFERRED_CHALLENGE.dns01: 

43 output.display_error("CNAME delegation (--cname) can only be used with DNS-01 challenge") 

44 raise typer.Exit(1) 

45 

46 output.change_head(f"Adding SSL certificate for {domain}") 

47 

48 try: 

49 if cname: 

50 cert = CustomDomainCertificate( 

51 domain=domain, 

52 ssl_type=SUPPORTED_SSL_TYPES.le, 

53 api_token=None, 

54 api_key=None, 

55 challenge_type=challenge, 

56 delegation_cname=cname, 

57 ) 

58 output.print(f"Using CNAME delegation: {cname}", emoji_code=":information:") 

59 else: 

60 cert = LetsencryptSSLCertificate( 

61 domain=domain, 

62 ssl_type=SUPPORTED_SSL_TYPES.le, 

63 api_token=None, 

64 api_key=None, 

65 challenge_type=challenge, 

66 ) 

67 

68 with spinner(output, f"Adding SSL certificate for {domain}"): 

69 bench.certificate_manager.add_certificate(cert, dry_run=dry_run) 

70 

71 if not dry_run: 

72 output.print(f"SSL certificate added for {domain}", emoji_code=":white_check_mark:") 

73 output.print("Certificate has been issued and configured.", emoji_code=":zap:") 

74 

75 except ValueError as e: 

76 output.display_error(f"Failed to add certificate: {e}") 

77 raise typer.Exit(1) 

78 except Exception as e: 

79 output.display_error(f"Failed to add certificate: {e}") 

80 output.display_error(f"Error details: {e!s}") 

81 raise typer.Exit(1) 

82 

83 

84def _remove_bench_certificate(ctx: typer.Context, benchname: str, domain: str, yes: bool): 

85 services_manager = ctx.obj["services"] 

86 

87 context = LoggerContext(bench=benchname, operation="ssl-remove") 

88 output = get_output_handler(ctx, context=context) 

89 logger = ctx.obj.get("logger") 

90 bench = Bench.get_object(benchname, services_manager, logger=logger, output_handler=output) 

91 

92 domains = bench.bench_config.get_all_domains() 

93 if domain not in domains: 

94 output.display_error(f"Domain '{domain}' is not configured for bench '{benchname}'") 

95 raise typer.Exit(1) 

96 

97 output.change_head(f"Removing SSL certificate for {domain}") 

98 

99 if not yes: 

100 with temporary_stop(output): 

101 choice = output.prompt_ask( 

102 prompt=f"Remove SSL certificate for {domain}?", 

103 choices=["yes", "no"], 

104 default="no", 

105 required_flag="--yes or -y", 

106 ) 

107 if choice != "yes": 

108 output.print("Cancelled.", emoji_code=":x:") 

109 raise typer.Exit(0) 

110 

111 output.change_head(f"Removing SSL certificate for {domain}") 

112 

113 try: 

114 with spinner(output, f"Removing SSL certificate for {domain}"): 

115 bench.certificate_manager.remove_certificate_by_domain(domain) 

116 

117 output.print(f"SSL certificate removed for {domain}", emoji_code=":white_check_mark:") 

118 

119 except SSLCertificateNotFoundError as e: 

120 output.display_error(f"Certificate not found: {e}") 

121 raise typer.Exit(1) 

122 except Exception as e: 

123 output.display_error(f"Failed to remove certificate: {e}") 

124 output.display_error(f"Error details: {e!s}") 

125 raise typer.Exit(1) 

126 

127 

128def _list_bench_certificates(ctx: typer.Context, benchname: str): 

129 """List all SSL certificates for a bench (existing logic extracted).""" 

130 

131 services_manager = ctx.obj["services"] 

132 

133 context = LoggerContext(bench=benchname, operation="ssl-list") 

134 output = get_output_handler(ctx, context=context) 

135 logger = ctx.obj.get("logger") 

136 bench = Bench.get_object(benchname, services_manager, logger=logger, output_handler=output) 

137 

138 all_domains = bench.bench_config.get_all_domains() 

139 

140 certs = bench.certificate_manager.list_certificates() 

141 

142 cert_map = {cert["domain"]: cert for cert in certs} 

143 

144 table = Table(show_header=True, header_style="bold magenta") 

145 table.add_column("Domain", style="cyan") 

146 table.add_column("Type", style="yellow") 

147 table.add_column("Challenge", style="magenta") 

148 table.add_column("Status", style="green") 

149 table.add_column("Expiry", style="blue") 

150 table.add_column("Days Left", justify="right") 

151 table.add_column("Renewal", style="red") 

152 

153 # Show all domains, whether they have certificates or not 

154 for domain in all_domains: 

155 if domain in cert_map: 

156 # Domain has a certificate configured 

157 cert = cert_map[domain] 

158 ssl_type = cert["ssl_type"] 

159 challenge_type = cert.get("challenge_type", "N/A") 

160 status = "✅ Issued" if cert["exists"] else "❌ Not Issued" 

161 

162 if cert["exists"] and cert["expiry_date"]: 

163 expiry = cert["expiry_date"].strftime("%Y-%m-%d %H:%M") 

164 days_left = str(cert["days_until_expiry"]) 

165 renewal = "⚠️ DUE" if cert["needs_renewal"] else "✓ OK" 

166 else: 

167 expiry = "N/A" 

168 days_left = "N/A" 

169 renewal = "N/A" 

170 else: 

171 # Domain has no certificate configured 

172 ssl_type = "none" 

173 challenge_type = "N/A" 

174 status = "⚪ No SSL" 

175 expiry = "N/A" 

176 days_left = "N/A" 

177 renewal = "N/A" 

178 

179 table.add_row(domain, ssl_type, challenge_type, status, expiry, days_left, renewal) 

180 

181 output.stop() 

182 output.print_data(table)