Coverage for frappe_manager / ssl_manager / dns_validator.py: 0%

84 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:35 +0530

1""" 

2DNS validation utilities for SSL certificate generation. 

3 

4Validates DNS configuration before attempting ACME challenges to provide 

5early feedback and better error messages to users. 

6""" 

7 

8import subprocess 

9import time 

10from dataclasses import dataclass 

11 

12from frappe_manager.output_manager import OutputHandler 

13 

14 

15@dataclass 

16class ValidationResult: 

17 """Result of a DNS validation check.""" 

18 

19 valid: bool 

20 actual_value: str | None 

21 expected_value: str | None 

22 message: str 

23 dns_query_output: str # Raw dig output for debugging 

24 

25 

26@dataclass 

27class PropagationStatus: 

28 """DNS propagation status across multiple checks.""" 

29 

30 propagated: bool 

31 checks_passed: int 

32 checks_total: int 

33 message: str 

34 

35 

36class DNSValidator: 

37 """ 

38 Validates DNS configuration for SSL certificate generation. 

39 

40 Provides pre-flight checks for: 

41 - CNAME records (for DNS-01 challenges with delegation) 

42 - A records (for HTTP-01 challenges) 

43 - DNS propagation status 

44 """ 

45 

46 def __init__(self, output_handler: OutputHandler | None = None): 

47 """ 

48 Initialize DNS validator. 

49 

50 Args: 

51 output_handler: Optional output handler for user messages 

52 """ 

53 self.output = output_handler 

54 

55 def validate_cname_for_acme(self, domain: str, challenge_alias: str, timeout: int = 10) -> ValidationResult: 

56 """ 

57 Validate CNAME for ACME DNS-01 challenge with challenge-alias. 

58 

59 When using acme.sh with --challenge-alias, the expected setup is: 

60 _acme-challenge.<domain> → _acme-challenge.<challenge_alias> 

61 

62 Example: 

63 domain = "gg.alok.rtmake.com" 

64 challenge_alias = "alok.rt.gw" 

65 

66 Expected CNAME: 

67 _acme-challenge.gg.alok.rtmake.com → _acme-challenge.alok.rt.gw 

68 

69 Args: 

70 domain: The domain to issue certificate for 

71 challenge_alias: The challenge alias domain (base domain without _acme-challenge prefix) 

72 timeout: DNS query timeout in seconds 

73 

74 Returns: 

75 ValidationResult with status and details 

76 """ 

77 challenge_domain = f"_acme-challenge.{domain}" 

78 expected_target = f"_acme-challenge.{challenge_alias}." # Note trailing dot for FQDN 

79 

80 if self.output: 

81 self.output.debug(f"Validating CNAME: {challenge_domain}{expected_target}") 

82 

83 try: 

84 # Query DNS using dig 

85 result = subprocess.run( 

86 ["dig", "+short", challenge_domain, "CNAME"], 

87 capture_output=True, 

88 text=True, 

89 timeout=timeout, 

90 ) 

91 

92 actual_value = result.stdout.strip() 

93 dns_output = result.stdout + (result.stderr if result.stderr else "") 

94 

95 if not actual_value: 

96 return ValidationResult( 

97 valid=False, 

98 actual_value=None, 

99 expected_value=expected_target, 

100 message=f"CNAME record not found for {challenge_domain}", 

101 dns_query_output=dns_output, 

102 ) 

103 

104 # Normalize (add trailing dot if missing for comparison) 

105 normalized_actual = actual_value if actual_value.endswith(".") else f"{actual_value}." 

106 

107 if normalized_actual == expected_target: 

108 return ValidationResult( 

109 valid=True, 

110 actual_value=actual_value, 

111 expected_value=expected_target, 

112 message="CNAME record is correctly configured", 

113 dns_query_output=dns_output, 

114 ) 

115 return ValidationResult( 

116 valid=False, 

117 actual_value=actual_value, 

118 expected_value=expected_target, 

119 message=f"CNAME mismatch: found {actual_value}, expected {expected_target}", 

120 dns_query_output=dns_output, 

121 ) 

122 

123 except subprocess.TimeoutExpired: 

124 return ValidationResult( 

125 valid=False, 

126 actual_value=None, 

127 expected_value=expected_target, 

128 message=f"DNS query timeout after {timeout} seconds", 

129 dns_query_output="", 

130 ) 

131 except FileNotFoundError: 

132 return ValidationResult( 

133 valid=False, 

134 actual_value=None, 

135 expected_value=expected_target, 

136 message="'dig' command not found. Install dnsutils/bind-tools package.", 

137 dns_query_output="", 

138 ) 

139 except Exception as e: 

140 return ValidationResult( 

141 valid=False, 

142 actual_value=None, 

143 expected_value=expected_target, 

144 message=f"DNS validation error: {e!s}", 

145 dns_query_output="", 

146 ) 

147 

148 def validate_a_record(self, domain: str, timeout: int = 10) -> ValidationResult: 

149 """ 

150 Validate that domain has an A record. 

151 

152 Used for HTTP-01 challenge validation to ensure domain resolves. 

153 Does not check if it points to the correct IP (since that varies by setup). 

154 

155 Args: 

156 domain: The domain to check 

157 timeout: DNS query timeout in seconds 

158 

159 Returns: 

160 ValidationResult with status and IP address if found 

161 """ 

162 if self.output: 

163 self.output.debug(f"Validating A record for: {domain}") 

164 

165 try: 

166 result = subprocess.run(["dig", "+short", domain, "A"], capture_output=True, text=True, timeout=timeout) 

167 

168 actual_value = result.stdout.strip() 

169 dns_output = result.stdout + (result.stderr if result.stderr else "") 

170 

171 if not actual_value: 

172 return ValidationResult( 

173 valid=False, 

174 actual_value=None, 

175 expected_value="An IP address", 

176 message=f"No A record found for {domain}", 

177 dns_query_output=dns_output, 

178 ) 

179 

180 # Check if it looks like an IP address (basic validation) 

181 ip_parts = actual_value.split("\n")[0].split(".") # Take first line if multiple IPs 

182 if len(ip_parts) == 4 and all(part.isdigit() for part in ip_parts): 

183 return ValidationResult( 

184 valid=True, 

185 actual_value=actual_value.split("\n")[0], # Return first IP 

186 expected_value=None, 

187 message=f"Domain resolves to {actual_value.split()[0]}", 

188 dns_query_output=dns_output, 

189 ) 

190 return ValidationResult( 

191 valid=False, 

192 actual_value=actual_value, 

193 expected_value="An IP address", 

194 message=f"Invalid A record response: {actual_value}", 

195 dns_query_output=dns_output, 

196 ) 

197 

198 except subprocess.TimeoutExpired: 

199 return ValidationResult( 

200 valid=False, 

201 actual_value=None, 

202 expected_value="An IP address", 

203 message=f"DNS query timeout after {timeout} seconds", 

204 dns_query_output="", 

205 ) 

206 except FileNotFoundError: 

207 return ValidationResult( 

208 valid=False, 

209 actual_value=None, 

210 expected_value="An IP address", 

211 message="'dig' command not found. Install dnsutils/bind-tools package.", 

212 dns_query_output="", 

213 ) 

214 except Exception as e: 

215 return ValidationResult( 

216 valid=False, 

217 actual_value=None, 

218 expected_value="An IP address", 

219 message=f"DNS validation error: {e!s}", 

220 dns_query_output="", 

221 ) 

222 

223 def wait_for_cname_propagation( 

224 self, 

225 domain: str, 

226 challenge_alias: str, 

227 timeout: int = 300, 

228 check_interval: int = 30, 

229 ) -> PropagationStatus: 

230 """ 

231 Wait for CNAME record to propagate. 

232 

233 Polls DNS every check_interval seconds until CNAME is valid or timeout reached. 

234 

235 Args: 

236 domain: The domain to issue certificate for 

237 challenge_alias: The challenge alias domain 

238 timeout: Maximum time to wait in seconds (default: 5 minutes) 

239 check_interval: Time between checks in seconds (default: 30 seconds) 

240 

241 Returns: 

242 PropagationStatus indicating if propagation completed 

243 """ 

244 max_checks = timeout // check_interval 

245 checks_done = 0 

246 

247 if self.output: 

248 self.output.print( 

249 f"Waiting for DNS propagation (checking every {check_interval}s, max {timeout}s)...", 

250 emoji_code=":hourglass:", 

251 ) 

252 

253 start_time = time.time() 

254 

255 while checks_done < max_checks: 

256 checks_done += 1 

257 elapsed = int(time.time() - start_time) 

258 

259 if self.output: 

260 self.output.print(f"Check {checks_done}/{max_checks} (elapsed: {elapsed}s)...", emoji_code="") 

261 

262 validation = self.validate_cname_for_acme(domain, challenge_alias) 

263 

264 if validation.valid: 

265 return PropagationStatus( 

266 propagated=True, 

267 checks_passed=checks_done, 

268 checks_total=max_checks, 

269 message=f"CNAME propagated after {elapsed} seconds", 

270 ) 

271 

272 if checks_done < max_checks: 

273 time.sleep(check_interval) 

274 

275 return PropagationStatus( 

276 propagated=False, 

277 checks_passed=0, 

278 checks_total=max_checks, 

279 message=f"CNAME did not propagate within {timeout} seconds", 

280 ) 

281 

282 def get_nameservers(self, domain: str) -> list[str]: 

283 """ 

284 Get authoritative nameservers for a domain. 

285 

286 Useful for checking propagation across multiple nameservers. 

287 

288 Args: 

289 domain: Domain to query 

290 

291 Returns: 

292 List of nameserver hostnames 

293 """ 

294 try: 

295 result = subprocess.run(["dig", "+short", domain, "NS"], capture_output=True, text=True, timeout=10) 

296 

297 nameservers = [ns.strip() for ns in result.stdout.strip().split("\n") if ns.strip()] 

298 return nameservers 

299 except Exception: 

300 return []