Coverage for frappe_manager / ssl_manager / dns_validator.py: 0%
84 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:35 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:35 +0530
1"""
2DNS validation utilities for SSL certificate generation.
4Validates DNS configuration before attempting ACME challenges to provide
5early feedback and better error messages to users.
6"""
8import subprocess
9import time
10from dataclasses import dataclass
12from frappe_manager.output_manager import OutputHandler
15@dataclass
16class ValidationResult:
17 """Result of a DNS validation check."""
19 valid: bool
20 actual_value: str | None
21 expected_value: str | None
22 message: str
23 dns_query_output: str # Raw dig output for debugging
26@dataclass
27class PropagationStatus:
28 """DNS propagation status across multiple checks."""
30 propagated: bool
31 checks_passed: int
32 checks_total: int
33 message: str
36class DNSValidator:
37 """
38 Validates DNS configuration for SSL certificate generation.
40 Provides pre-flight checks for:
41 - CNAME records (for DNS-01 challenges with delegation)
42 - A records (for HTTP-01 challenges)
43 - DNS propagation status
44 """
46 def __init__(self, output_handler: OutputHandler | None = None):
47 """
48 Initialize DNS validator.
50 Args:
51 output_handler: Optional output handler for user messages
52 """
53 self.output = output_handler
55 def validate_cname_for_acme(self, domain: str, challenge_alias: str, timeout: int = 10) -> ValidationResult:
56 """
57 Validate CNAME for ACME DNS-01 challenge with challenge-alias.
59 When using acme.sh with --challenge-alias, the expected setup is:
60 _acme-challenge.<domain> → _acme-challenge.<challenge_alias>
62 Example:
63 domain = "gg.alok.rtmake.com"
64 challenge_alias = "alok.rt.gw"
66 Expected CNAME:
67 _acme-challenge.gg.alok.rtmake.com → _acme-challenge.alok.rt.gw
69 Args:
70 domain: The domain to issue certificate for
71 challenge_alias: The challenge alias domain (base domain without _acme-challenge prefix)
72 timeout: DNS query timeout in seconds
74 Returns:
75 ValidationResult with status and details
76 """
77 challenge_domain = f"_acme-challenge.{domain}"
78 expected_target = f"_acme-challenge.{challenge_alias}." # Note trailing dot for FQDN
80 if self.output:
81 self.output.debug(f"Validating CNAME: {challenge_domain} → {expected_target}")
83 try:
84 # Query DNS using dig
85 result = subprocess.run(
86 ["dig", "+short", challenge_domain, "CNAME"],
87 capture_output=True,
88 text=True,
89 timeout=timeout,
90 )
92 actual_value = result.stdout.strip()
93 dns_output = result.stdout + (result.stderr if result.stderr else "")
95 if not actual_value:
96 return ValidationResult(
97 valid=False,
98 actual_value=None,
99 expected_value=expected_target,
100 message=f"CNAME record not found for {challenge_domain}",
101 dns_query_output=dns_output,
102 )
104 # Normalize (add trailing dot if missing for comparison)
105 normalized_actual = actual_value if actual_value.endswith(".") else f"{actual_value}."
107 if normalized_actual == expected_target:
108 return ValidationResult(
109 valid=True,
110 actual_value=actual_value,
111 expected_value=expected_target,
112 message="CNAME record is correctly configured",
113 dns_query_output=dns_output,
114 )
115 return ValidationResult(
116 valid=False,
117 actual_value=actual_value,
118 expected_value=expected_target,
119 message=f"CNAME mismatch: found {actual_value}, expected {expected_target}",
120 dns_query_output=dns_output,
121 )
123 except subprocess.TimeoutExpired:
124 return ValidationResult(
125 valid=False,
126 actual_value=None,
127 expected_value=expected_target,
128 message=f"DNS query timeout after {timeout} seconds",
129 dns_query_output="",
130 )
131 except FileNotFoundError:
132 return ValidationResult(
133 valid=False,
134 actual_value=None,
135 expected_value=expected_target,
136 message="'dig' command not found. Install dnsutils/bind-tools package.",
137 dns_query_output="",
138 )
139 except Exception as e:
140 return ValidationResult(
141 valid=False,
142 actual_value=None,
143 expected_value=expected_target,
144 message=f"DNS validation error: {e!s}",
145 dns_query_output="",
146 )
148 def validate_a_record(self, domain: str, timeout: int = 10) -> ValidationResult:
149 """
150 Validate that domain has an A record.
152 Used for HTTP-01 challenge validation to ensure domain resolves.
153 Does not check if it points to the correct IP (since that varies by setup).
155 Args:
156 domain: The domain to check
157 timeout: DNS query timeout in seconds
159 Returns:
160 ValidationResult with status and IP address if found
161 """
162 if self.output:
163 self.output.debug(f"Validating A record for: {domain}")
165 try:
166 result = subprocess.run(["dig", "+short", domain, "A"], capture_output=True, text=True, timeout=timeout)
168 actual_value = result.stdout.strip()
169 dns_output = result.stdout + (result.stderr if result.stderr else "")
171 if not actual_value:
172 return ValidationResult(
173 valid=False,
174 actual_value=None,
175 expected_value="An IP address",
176 message=f"No A record found for {domain}",
177 dns_query_output=dns_output,
178 )
180 # Check if it looks like an IP address (basic validation)
181 ip_parts = actual_value.split("\n")[0].split(".") # Take first line if multiple IPs
182 if len(ip_parts) == 4 and all(part.isdigit() for part in ip_parts):
183 return ValidationResult(
184 valid=True,
185 actual_value=actual_value.split("\n")[0], # Return first IP
186 expected_value=None,
187 message=f"Domain resolves to {actual_value.split()[0]}",
188 dns_query_output=dns_output,
189 )
190 return ValidationResult(
191 valid=False,
192 actual_value=actual_value,
193 expected_value="An IP address",
194 message=f"Invalid A record response: {actual_value}",
195 dns_query_output=dns_output,
196 )
198 except subprocess.TimeoutExpired:
199 return ValidationResult(
200 valid=False,
201 actual_value=None,
202 expected_value="An IP address",
203 message=f"DNS query timeout after {timeout} seconds",
204 dns_query_output="",
205 )
206 except FileNotFoundError:
207 return ValidationResult(
208 valid=False,
209 actual_value=None,
210 expected_value="An IP address",
211 message="'dig' command not found. Install dnsutils/bind-tools package.",
212 dns_query_output="",
213 )
214 except Exception as e:
215 return ValidationResult(
216 valid=False,
217 actual_value=None,
218 expected_value="An IP address",
219 message=f"DNS validation error: {e!s}",
220 dns_query_output="",
221 )
223 def wait_for_cname_propagation(
224 self,
225 domain: str,
226 challenge_alias: str,
227 timeout: int = 300,
228 check_interval: int = 30,
229 ) -> PropagationStatus:
230 """
231 Wait for CNAME record to propagate.
233 Polls DNS every check_interval seconds until CNAME is valid or timeout reached.
235 Args:
236 domain: The domain to issue certificate for
237 challenge_alias: The challenge alias domain
238 timeout: Maximum time to wait in seconds (default: 5 minutes)
239 check_interval: Time between checks in seconds (default: 30 seconds)
241 Returns:
242 PropagationStatus indicating if propagation completed
243 """
244 max_checks = timeout // check_interval
245 checks_done = 0
247 if self.output:
248 self.output.print(
249 f"Waiting for DNS propagation (checking every {check_interval}s, max {timeout}s)...",
250 emoji_code=":hourglass:",
251 )
253 start_time = time.time()
255 while checks_done < max_checks:
256 checks_done += 1
257 elapsed = int(time.time() - start_time)
259 if self.output:
260 self.output.print(f"Check {checks_done}/{max_checks} (elapsed: {elapsed}s)...", emoji_code="")
262 validation = self.validate_cname_for_acme(domain, challenge_alias)
264 if validation.valid:
265 return PropagationStatus(
266 propagated=True,
267 checks_passed=checks_done,
268 checks_total=max_checks,
269 message=f"CNAME propagated after {elapsed} seconds",
270 )
272 if checks_done < max_checks:
273 time.sleep(check_interval)
275 return PropagationStatus(
276 propagated=False,
277 checks_passed=0,
278 checks_total=max_checks,
279 message=f"CNAME did not propagate within {timeout} seconds",
280 )
282 def get_nameservers(self, domain: str) -> list[str]:
283 """
284 Get authoritative nameservers for a domain.
286 Useful for checking propagation across multiple nameservers.
288 Args:
289 domain: Domain to query
291 Returns:
292 List of nameserver hostnames
293 """
294 try:
295 result = subprocess.run(["dig", "+short", domain, "NS"], capture_output=True, text=True, timeout=10)
297 nameservers = [ns.strip() for ns in result.stdout.strip().split("\n") if ns.strip()]
298 return nameservers
299 except Exception:
300 return []