Coverage for frappe_manager / site_manager / bench_config.py: 39%

573 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1import json 

2import os 

3import re 

4import subprocess 

5from concurrent.futures import ThreadPoolExecutor, as_completed 

6from dataclasses import dataclass 

7from enum import Enum 

8from pathlib import Path 

9from typing import Any 

10 

11import tomlkit 

12from pydantic import BaseModel, EmailStr, Field, field_validator 

13from tomlkit.items import Array as TOMLArray 

14 

15from frappe_manager import CLI_DEFAULT_DELIMETER 

16from frappe_manager.metadata_manager import FMConfigManager 

17from frappe_manager.services_manager.database_service_manager import DatabaseServerServiceInfo 

18from frappe_manager.ssl_manager import LETSENCRYPT_PREFERRED_CHALLENGE, SUPPORTED_SSL_TYPES 

19from frappe_manager.ssl_manager.certificate import SSLCertificate 

20from frappe_manager.ssl_manager.letsencrypt_certificate import LetsencryptSSLCertificate 

21from frappe_manager.utils.helpers import get_container_name_prefix, get_bench_connection_config 

22 

23 

24def extract_app_python_module_name(app_path: Path) -> str: 

25 """ 

26 Extract Python module name from pyproject.toml or hooks.py. 

27 

28 This is critical for subdirectory apps where the directory name may not match 

29 the Python module name. For example: 

30 - Directory: frappe-consent-management (with dashes) 

31 - Python module: frappe_consent_management (with underscores) 

32 

33 Priority order: 

34 1. pyproject.toml [project] name field 

35 2. hooks.py app_name variable 

36 3. Fallback to directory name 

37 

38 Args: 

39 app_path: Path to the app directory 

40 

41 Returns: 

42 Python module name (e.g., "frappe_consent_management") 

43 """ 

44 # Try pyproject.toml first (most reliable) 

45 pyproject = app_path / "pyproject.toml" 

46 if pyproject.exists(): 

47 try: 

48 data = tomlkit.parse(pyproject.read_text()) 

49 if "project" in data and "name" in data["project"]: 

50 return data["project"]["name"] 

51 except Exception: 

52 pass # Fall through to next method 

53 

54 # Try hooks.py (Frappe convention) 

55 # Search for hooks.py in immediate subdirectories (frappe convention: app_name/hooks.py) 

56 hooks_files = list(app_path.glob("*/hooks.py")) 

57 

58 # Filter to only top-level hooks.py (not nested deeper) 

59 top_level_hooks = [f for f in hooks_files if len(f.relative_to(app_path).parts) == 2] 

60 

61 if top_level_hooks: 

62 try: 

63 hooks_py = top_level_hooks[0] 

64 content = hooks_py.read_text() 

65 # Match: app_name = "module_name" 

66 match = re.search(r'app_name\s*=\s*["\']([^"\']+)["\']', content) 

67 if match: 

68 return match.group(1) 

69 except Exception: 

70 pass # Fall through to fallback 

71 

72 # Fallback: use directory name 

73 return app_path.name 

74 

75 

76def extract_python_version_requirement(frappe_app_path: Path) -> str | None: 

77 """ 

78 Extract Python version requirement from frappe app's pyproject.toml. 

79 

80 Reads the [project] requires-python field or [tool.poetry.dependencies] python field. 

81 

82 Args: 

83 frappe_app_path: Path to the frappe app directory 

84 

85 Returns: 

86 Python version requirement string (e.g., ">=3.10,<3.14") or None if not found 

87 """ 

88 pyproject = frappe_app_path / "pyproject.toml" 

89 if not pyproject.exists(): 

90 return None 

91 

92 try: 

93 data = tomlkit.parse(pyproject.read_text()) 

94 

95 # Check [project] requires-python (PEP 621 standard) 

96 if "project" in data and "requires-python" in data["project"]: 

97 return str(data["project"]["requires-python"]) 

98 

99 # Check [tool.poetry.dependencies] python (Poetry format) 

100 if "tool" in data and "poetry" in data["tool"]: 

101 if "dependencies" in data["tool"]["poetry"]: 

102 if "python" in data["tool"]["poetry"]["dependencies"]: 

103 python_dep = data["tool"]["poetry"]["dependencies"]["python"] 

104 # Poetry format can be string or dict 

105 if isinstance(python_dep, str): 

106 return python_dep 

107 if isinstance(python_dep, dict) and "version" in python_dep: 

108 return str(python_dep["version"]) 

109 

110 return None 

111 except Exception: 

112 return None 

113 

114 

115def extract_node_version_requirement(frappe_app_path: Path) -> str | None: 

116 """ 

117 Extract Node version requirement from frappe app's package.json. 

118 

119 Reads the engines.node field. 

120 

121 Args: 

122 frappe_app_path: Path to the frappe app directory 

123 

124 Returns: 

125 Node version requirement string (e.g., ">=18") or None if not found 

126 """ 

127 package_json = frappe_app_path / "package.json" 

128 if not package_json.exists(): 

129 return None 

130 

131 try: 

132 import json 

133 

134 data = json.loads(package_json.read_text()) 

135 

136 # Check engines.node 

137 if "engines" in data and "node" in data["engines"]: 

138 return str(data["engines"]["node"]) 

139 

140 return None 

141 except Exception: 

142 return None 

143 

144 

145def parse_python_version_for_runtime(version_requirement: str | None) -> str | None: 

146 """ 

147 Parse Python version requirement string to extract a usable Python version. 

148 

149 Handles various formats: 

150 - ">=3.10,<3.14" -> "3.10" 

151 - ">=3.14,<3.15" -> "3.14" 

152 - "^3.11" -> "3.11" 

153 - "3.11" -> "3.11" 

154 - "3.10.5" -> "3.10" 

155 

156 Strategy: Extract the minimum compatible version for maximum compatibility. 

157 

158 Args: 

159 version_requirement: Version requirement string from pyproject.toml 

160 

161 Returns: 

162 Python version string suitable for UV (e.g., "3.10", "3.14") 

163 Returns None if parsing fails 

164 """ 

165 if not version_requirement: 

166 return None 

167 

168 try: 

169 import re 

170 

171 # Remove whitespace 

172 version_str = version_requirement.strip() 

173 

174 # Handle poetry caret (^3.11 -> 3.11) 

175 if version_str.startswith("^"): 

176 version_str = version_str[1:] 

177 

178 # Extract version numbers using regex 

179 # Match patterns like: >=3.10, 3.10.5, 3.10 

180 match = re.search(r"(\d+)\.(\d+)(?:\.\d+)?", version_str) 

181 if match: 

182 major = match.group(1) 

183 minor = match.group(2) 

184 return f"{major}.{minor}" 

185 

186 return None 

187 except Exception: 

188 return None 

189 

190 

191def parse_node_version_for_runtime(version_requirement: str | None) -> str | None: 

192 """ 

193 Parse Node version requirement string to extract a usable Node version. 

194 

195 Handles various formats: 

196 - ">=18" -> "18" 

197 - ">=24" -> "24" 

198 - "^18.0.0" -> "18" 

199 - "18.x" -> "18" 

200 - "18.12.0" -> "18" 

201 

202 Strategy: Extract the major version for fnm compatibility. 

203 

204 Args: 

205 version_requirement: Version requirement string from package.json 

206 

207 Returns: 

208 Node major version string suitable for fnm (e.g., "18", "24") 

209 Returns None if parsing fails 

210 """ 

211 if not version_requirement: 

212 return None 

213 

214 try: 

215 import re 

216 

217 # Remove whitespace 

218 version_str = version_requirement.strip() 

219 

220 # Handle poetry/npm caret (^18.0.0 -> 18.0.0) 

221 if version_str.startswith("^"): 

222 version_str = version_str[1:] 

223 

224 # Extract major version number 

225 # Match patterns like: >=18, 18.12.0, 18.x, 18 

226 match = re.search(r"(\d+)", version_str) 

227 if match: 

228 return match.group(1) 

229 

230 return None 

231 except Exception: 

232 return None 

233 

234 

235def validate_python_version_compatibility(user_version: str, frappe_requirement: str) -> tuple[bool, str]: 

236 """ 

237 Validate if user-provided Python version is compatible with Frappe requirement. 

238 

239 Args: 

240 user_version: User-provided version (e.g., "3.11", ">=3.10,<3.14") 

241 frappe_requirement: Frappe's requirement (e.g., ">=3.14,<3.15") 

242 

243 Returns: 

244 Tuple of (is_compatible, error_message) 

245 """ 

246 import re 

247 

248 def parse_version_range(version_str: str) -> tuple[tuple[int, int] | None, tuple[int, int] | None]: 

249 min_ver = None 

250 max_ver = None 

251 

252 match_min = re.search(r">=(\d+)\.(\d+)", version_str) 

253 if match_min: 

254 min_ver = (int(match_min.group(1)), int(match_min.group(2))) 

255 

256 match_max = re.search(r"<(\d+)\.(\d+)", version_str) 

257 if match_max: 

258 max_ver = (int(match_max.group(1)), int(match_max.group(2))) 

259 

260 exact_match = re.match(r"^(\d+)\.(\d+)$", version_str.strip()) 

261 if exact_match: 

262 ver = (int(exact_match.group(1)), int(exact_match.group(2))) 

263 min_ver = ver 

264 max_ver = (ver[0], ver[1] + 1) 

265 

266 return min_ver, max_ver 

267 

268 user_min, user_max = parse_version_range(user_version) 

269 frappe_min, frappe_max = parse_version_range(frappe_requirement) 

270 

271 if not user_min: 

272 return False, f"Could not parse user version: {user_version}" 

273 

274 if not frappe_min: 

275 return True, "" 

276 

277 if frappe_max: 

278 if user_min < frappe_min or (user_max and user_max > frappe_max): 

279 return False, f"Python {user_version} is incompatible with Frappe requirement {frappe_requirement}" 

280 elif user_min < frappe_min: 

281 return False, f"Python {user_version} is incompatible with Frappe requirement {frappe_requirement}" 

282 

283 return True, "" 

284 

285 

286def validate_node_version_compatibility(user_version: str, frappe_requirement: str) -> tuple[bool, str]: 

287 """ 

288 Validate if user-provided Node version is compatible with Frappe requirement. 

289 

290 Args: 

291 user_version: User-provided version (e.g., "18", ">=20") 

292 frappe_requirement: Frappe's requirement (e.g., ">=24") 

293 

294 Returns: 

295 Tuple of (is_compatible, error_message) 

296 """ 

297 import re 

298 

299 def extract_min_version(version_str: str) -> int | None: 

300 match = re.search(r">=?(\d+)", version_str) 

301 if match: 

302 return int(match.group(1)) 

303 

304 exact_match = re.match(r"^(\d+)$", version_str.strip()) 

305 if exact_match: 

306 return int(exact_match.group(1)) 

307 

308 return None 

309 

310 user_min = extract_min_version(user_version) 

311 frappe_min = extract_min_version(frappe_requirement) 

312 

313 if user_min is None: 

314 return False, f"Could not parse user version: {user_version}" 

315 

316 if frappe_min is None: 

317 return True, "" 

318 

319 if user_min < frappe_min: 

320 return False, f"Node {user_version} is incompatible with Frappe requirement {frappe_requirement}" 

321 

322 return True, "" 

323 

324 

325class FMBenchEnvType(str, Enum): 

326 prod = "prod" 

327 dev = "dev" 

328 

329 

330class RestartPolicyEnum(str, Enum): 

331 """ 

332 Docker Compose restart policy options. 

333 

334 - no: Never restart (default, current behavior) 

335 - always: Always restart, even after manual stop 

336 - on_failure: Restart only on non-zero exit code 

337 - unless_stopped: Restart unless manually stopped (recommended for production) 

338 """ 

339 

340 no = "no" 

341 always = "always" 

342 on_failure = "on-failure" 

343 unless_stopped = "unless-stopped" 

344 

345 

346@dataclass 

347class AppValidationResult: 

348 """Result of validating a single app repository.""" 

349 

350 app_name: str 

351 repo: str 

352 ref: str | None 

353 success: bool 

354 auth_method: str | None 

355 validated_url: str | None 

356 error_message: str | None = None 

357 

358 @property 

359 def display_message(self) -> str: 

360 """Get user-friendly display message.""" 

361 if self.success: 

362 ref_info = ( 

363 f"commit {self.ref[:8]}..." 

364 if self.ref and len(self.ref) == 40 

365 else f"branch '{self.ref}'" 

366 if self.ref 

367 else "default branch" 

368 ) 

369 auth = self.auth_method.lower().replace("github token", "token") 

370 return f"{self.app_name}{self.repo}:{ref_info} accessible via {auth}" 

371 return self.error_message or f"{self.app_name} ({self.repo}): Validation failed" 

372 

373 

374@dataclass 

375class AppBatchValidationResult: 

376 """Result of validating multiple app repositories.""" 

377 

378 results: list[AppValidationResult] 

379 

380 @property 

381 def all_valid(self) -> bool: 

382 """Check if all apps validated successfully.""" 

383 return all(r.success for r in self.results) 

384 

385 @property 

386 def success_count(self) -> int: 

387 """Count of successfully validated apps.""" 

388 return sum(1 for r in self.results if r.success) 

389 

390 @property 

391 def failure_count(self) -> int: 

392 """Count of failed validations.""" 

393 return sum(1 for r in self.results if not r.success) 

394 

395 @property 

396 def messages(self) -> list[str]: 

397 """Get all display messages (success + error).""" 

398 return [r.display_message for r in self.results] 

399 

400 

401class AppConfig(BaseModel): 

402 """ 

403 Configuration for a single Frappe app. 

404 

405 Supports multiple input formats: 

406 - Simple: "erpnext:version-15" 

407 - Repo: "frappe/erpnext:version-15" 

408 - Full URL: "https://github.com/frappe/erpnext:version-15" 

409 - Subdirectory: "frappe/frappe:version-15#apps/frappe" 

410 """ 

411 

412 name: str = Field(..., description="App name (e.g., 'erpnext')") 

413 repo: str = Field(..., description="GitHub repo (e.g., 'frappe/erpnext')") 

414 ref: str | None = Field(None, description="Branch, tag, or commit SHA") 

415 repo_url: str | None = Field(None, description="Full repo URL (auto-generated)") 

416 shallow_clone: bool = Field(True, description="Use shallow clone (depth=1)") 

417 subdir_path: str | None = Field(None, description="Subdirectory path for monorepo apps") 

418 symlink: bool = Field(False, description="Use symlink for subdirectory apps") 

419 

420 @property 

421 def is_commit(self) -> bool: 

422 """Check if ref is a commit SHA (40 hex characters).""" 

423 if self.ref is None: 

424 return False 

425 return len(self.ref) == 40 and all(c in "0123456789abcdef" for c in self.ref.lower()) 

426 

427 def _is_github_repo(self) -> bool: 

428 from urllib.parse import urlparse 

429 

430 if self.repo.startswith(("http://", "https://")): 

431 parsed = urlparse(self.repo) 

432 return parsed.netloc == "github.com" 

433 

434 if self.repo.startswith("git@"): 

435 return self.repo.startswith("git@github.com:") 

436 

437 if "/" in self.repo and not self.repo.startswith(("http://", "https://", "git@")): 

438 return True 

439 

440 return False 

441 

442 @classmethod 

443 def from_string(cls, app_string: str) -> "AppConfig": 

444 """ 

445 Parse app string into AppConfig. 

446 

447 Formats: 

448 - "erpnext" → frappe/erpnext:default-branch 

449 - "erpnext:version-15" → frappe/erpnext:version-15 

450 - "frappe/erpnext:version-15" → frappe/erpnext:version-15 

451 - "mycompany/custom-app:main" → mycompany/custom-app:main 

452 - "frappe/frappe:version-15#apps/frappe" → subdirectory app 

453 

454 Args: 

455 app_string: String describing the app to install 

456 

457 Returns: 

458 AppConfig instance 

459 """ 

460 # Split on '#' for subdirectory 

461 if "#" in app_string: 

462 app_part, subdir_path = app_string.split("#", 1) 

463 else: 

464 app_part = app_string 

465 subdir_path = None 

466 

467 # Check if this is a full URL (starts with protocol or git@) 

468 is_full_url = app_part.startswith(("http://", "https://")) 

469 is_ssh_url = app_part.startswith("git@") 

470 

471 # Split on ':' for branch/ref 

472 # For URLs: skip protocol colon, find the LAST colon (if any) for ref 

473 # For SSH: format is git@host:org/repo:ref, so find LAST colon after @ 

474 # For short form: any colon is ref separator 

475 ref = None 

476 if is_full_url: 

477 # For http(s):// URLs, look for colon AFTER the protocol and host 

478 # e.g., "https://github.com/frappe/frappe:version-15" 

479 # split at ":" -> ["https", "//github.com/frappe/frappe", "version-15"] 

480 parts = app_part.split(":") 

481 if len(parts) > 2: # Has protocol + potential ref 

482 # Rejoin protocol + host/path, last part is ref 

483 repo_part = ":".join(parts[:-1]) 

484 ref = parts[-1] 

485 else: 

486 repo_part = app_part 

487 elif is_ssh_url: 

488 # For git@ URLs: git@github.com:frappe/frappe:version-15 

489 # Find last colon after @ for ref 

490 at_index = app_part.index("@") 

491 remainder = app_part[at_index + 1 :] 

492 if remainder.count(":") > 1: 

493 # Multiple colons: last one is ref separator 

494 last_colon_idx = remainder.rfind(":") 

495 repo_part = app_part[: at_index + 1 + last_colon_idx] 

496 ref = remainder[last_colon_idx + 1 :] 

497 else: 

498 repo_part = app_part 

499 # Short form: org/repo:ref or app:ref 

500 elif ":" in app_part: 

501 repo_part, ref = app_part.split(":", 1) 

502 else: 

503 repo_part = app_part 

504 

505 # Parse repo (e.g., "frappe/erpnext" or just "erpnext" or full URL) 

506 if is_full_url or is_ssh_url: 

507 # Full URL: keep as-is, extract name from path 

508 repo = repo_part 

509 # Extract name from URL path (last segment before .git) 

510 path_part = repo_part.split("/")[-1] 

511 name = path_part.replace(".git", "") 

512 elif "/" in repo_part: 

513 repo = repo_part 

514 name = repo_part.split("/")[-1] 

515 else: 

516 name = repo_part 

517 repo = f"frappe/{name}" # Default to frappe org 

518 

519 # Override name if subdirectory specified 

520 if subdir_path: 

521 name = subdir_path.split("/")[-1] 

522 

523 return cls( 

524 name=name, 

525 repo=repo, 

526 ref=ref, 

527 repo_url=None, 

528 shallow_clone=True, 

529 subdir_path=subdir_path, 

530 ) 

531 

532 @classmethod 

533 def from_dict(cls, app_dict: dict[str, str | None], github_token: str | None = None) -> "AppConfig": 

534 """ 

535 Convert from simple dict format to AppConfig. 

536 

537 Args: 

538 app_dict: {"app": "erpnext", "branch": "version-15"} 

539 github_token: Optional GitHub token (unused, kept for backward compatibility) 

540 

541 Returns: 

542 AppConfig instance 

543 """ 

544 app_name = app_dict.get("app") 

545 if not app_name: 

546 raise ValueError("app_dict must contain 'app' key with non-empty value") 

547 

548 branch = app_dict.get("branch") 

549 

550 if branch: 

551 app_string: str = f"{app_name}:{branch}" 

552 else: 

553 app_string: str = app_name 

554 

555 return cls.from_string(app_string) 

556 

557 @staticmethod 

558 def validate_github_token(github_token: str) -> tuple[bool, str | None]: 

559 """ 

560 Validate GitHub token by making an API call to GitHub. 

561 

562 Args: 

563 github_token: GitHub personal access token 

564 

565 Returns: 

566 Tuple of (is_valid, error_message) 

567 """ 

568 import subprocess 

569 

570 from frappe_manager.logger import log 

571 

572 logger = log.get_logger() 

573 

574 try: 

575 cmd = ["git", "ls-remote", f"https://{github_token}@github.com/octocat/hello-world.git", "HEAD"] 

576 env = os.environ.copy() 

577 env["GIT_TERMINAL_PROMPT"] = "0" 

578 env["GIT_ASKPASS"] = "echo" 

579 

580 result = subprocess.run(cmd, capture_output=True, text=True, timeout=10, env=env) 

581 

582 if result.returncode == 0: 

583 logger.debug("GitHub token validated successfully") 

584 return (True, None) 

585 error = result.stderr.strip() 

586 if "bad credentials" in error.lower() or "authentication failed" in error.lower(): 

587 return (False, "Invalid or expired GitHub token") 

588 if "could not resolve host" in error.lower(): 

589 return (False, "Network error: Cannot reach GitHub") 

590 return (False, f"Token validation failed: {error}") 

591 

592 except subprocess.TimeoutExpired: 

593 return (False, "Timeout while validating token (network issue?)") 

594 except FileNotFoundError: 

595 logger.error("Git is not installed or not in PATH") 

596 return (False, "Git is not installed") 

597 except Exception as e: 

598 logger.debug(f"Exception during token validation: {e}") 

599 return (False, f"Unexpected error: {e!s}") 

600 

601 def get_auth_methods(self, github_token: str | None = None) -> list[tuple[str, str]]: 

602 """ 

603 Get ordered list of authentication methods to try for this app. 

604 

605 Returns list of (method_name, repo_url) tuples to try in fallback order. 

606 If self.repo_url is already set (from previous validation), returns it first, 

607 but ALWAYS includes fallback methods for resilience. 

608 

609 Args: 

610 github_token: Optional GitHub token for private repos 

611 

612 Returns: 

613 List of (method_name, url) tuples ordered by priority 

614 """ 

615 methods = [] 

616 

617 if self.repo_url: 

618 methods.append(("Validated URL", self.repo_url)) 

619 

620 if self.repo.startswith(("http://", "https://", "git@")): 

621 if self.repo not in [m[1] for m in methods]: 

622 methods.append(("Full URL", self.repo)) 

623 else: 

624 if github_token: 

625 token_url = f"https://{github_token}@github.com/{self.repo}.git" 

626 if token_url != self.repo_url: 

627 methods.append(("GitHub Token", token_url)) 

628 

629 https_url = f"https://github.com/{self.repo}.git" 

630 if https_url != self.repo_url: 

631 methods.append(("HTTPS", https_url)) 

632 

633 ssh_url = f"git@github.com:{self.repo}.git" 

634 if ssh_url != self.repo_url: 

635 methods.append(("SSH", ssh_url)) 

636 

637 return methods 

638 

639 def validate_repo_exists(self, github_token: str | None = None) -> AppValidationResult: 

640 """ 

641 Validate that this app's repository exists and is accessible. 

642 

643 Uses git ls-remote to check repository without cloning. 

644 Tries authentication methods in priority order: 

645 - With token: Token → HTTPS → SSH 

646 - Without token: HTTPS → SSH 

647 Validates GitHub token before attempting to use it. 

648 

649 Args: 

650 github_token: Optional GitHub token for private repos 

651 

652 Returns: 

653 AppValidationResult with success status, auth method used, and validated URL 

654 """ 

655 from frappe_manager.logger import log 

656 

657 logger = log.get_logger() 

658 

659 if github_token and self._is_github_repo(): 

660 is_valid, error_msg = AppConfig.validate_github_token(github_token) 

661 if not is_valid: 

662 logger.warning(f"GitHub token validation failed: {error_msg}") 

663 logger.warning(f"Will attempt to validate {self.name} without token authentication") 

664 

665 auth_methods = self.get_auth_methods(github_token) 

666 last_error = None 

667 last_method = None 

668 

669 for method_name, url in auth_methods: 

670 try: 

671 if self.is_commit: 

672 cmd = ["git", "ls-remote", url, "HEAD"] 

673 else: 

674 cmd = ["git", "ls-remote", "--heads", "--tags", url] 

675 if self.ref: 

676 cmd.extend([self.ref]) 

677 

678 env = os.environ.copy() 

679 env["GIT_TERMINAL_PROMPT"] = "0" 

680 env["GIT_ASKPASS"] = "echo" 

681 

682 result = subprocess.run(cmd, capture_output=True, text=True, timeout=10, env=env) 

683 

684 if result.returncode == 0: 

685 if self.is_commit: 

686 if result.stdout: 

687 self.repo_url = url 

688 logger.info(f"Validated {self.name} ({self.repo}) using {method_name}") 

689 return AppValidationResult( 

690 app_name=self.name, 

691 repo=self.repo, 

692 ref=self.ref, 

693 success=True, 

694 auth_method=method_name, 

695 validated_url=url, 

696 ) 

697 last_error = "Repository exists but appears empty or inaccessible" 

698 last_method = method_name 

699 elif self.ref and not result.stdout: 

700 last_error = f"Branch/tag '{self.ref}' not found" 

701 last_method = method_name 

702 else: 

703 self.repo_url = url 

704 logger.info(f"Validated {self.name} ({self.repo}) using {method_name}") 

705 return AppValidationResult( 

706 app_name=self.name, 

707 repo=self.repo, 

708 ref=self.ref, 

709 success=True, 

710 auth_method=method_name, 

711 validated_url=url, 

712 ) 

713 else: 

714 last_error = result.stderr.strip() 

715 last_method = method_name 

716 logger.debug(f"Validation failed for {self.name} using {method_name}: {last_error}") 

717 

718 except subprocess.TimeoutExpired: 

719 last_error = "Timeout while checking repository (network issue?)" 

720 last_method = method_name 

721 logger.debug(f"Timeout validating {self.name} using {method_name}") 

722 except FileNotFoundError: 

723 logger.error("Git is not installed or not in PATH") 

724 return AppValidationResult( 

725 app_name=self.name, 

726 repo=self.repo, 

727 ref=self.ref, 

728 success=False, 

729 auth_method=None, 

730 validated_url=None, 

731 error_message="Git is not installed or not in PATH", 

732 ) 

733 except Exception as e: 

734 last_error = str(e) 

735 last_method = method_name 

736 logger.debug(f"Exception validating {self.name} using {method_name}: {e}") 

737 

738 ref_info = f" (ref: {self.ref})" if self.ref else "" 

739 

740 if last_error and "not found" in last_error.lower(): 

741 error_msg = f"{self.name} ({self.repo}{ref_info}): Repository not found on GitHub" 

742 elif last_error and self.ref and "branch/tag" in last_error.lower(): 

743 error_msg = ( 

744 f"{self.name} ({self.repo}): Branch/tag '{self.ref}' not found\n" 

745 f" Check branch name or use a valid tag/commit" 

746 ) 

747 elif github_token: 

748 logger.warning(f"GitHub token provided but failed for {self.name}") 

749 error_msg = ( 

750 f"{self.name} ({self.repo}{ref_info}): Could not access repository\n" 

751 f" • HTTPS: Failed\n" 

752 f" • GitHub Token: Failed (token may be invalid/expired/insufficient permissions)\n" 

753 f" • SSH: Failed\n" 

754 f" Last error ({last_method}): {last_error}" 

755 ) 

756 else: 

757 error_msg = ( 

758 f"{self.name} ({self.repo}{ref_info}): Could not access repository\n" 

759 f" • HTTPS: Failed (repo may be private)\n" 

760 f" • SSH: Failed\n" 

761 f" 💡 If private repo: use --github-token or configure SSH keys\n" 

762 f" Last error ({last_method}): {last_error}" 

763 ) 

764 

765 logger.error(f"Failed to validate {self.name}: {last_error}") 

766 return AppValidationResult( 

767 app_name=self.name, 

768 repo=self.repo, 

769 ref=self.ref, 

770 success=False, 

771 auth_method=None, 

772 validated_url=None, 

773 error_message=error_msg, 

774 ) 

775 

776 @classmethod 

777 def validate_repos_batch( 

778 cls, 

779 apps: list["AppConfig"], 

780 github_token: str | None = None, 

781 max_workers: int = 10, 

782 ) -> AppBatchValidationResult: 

783 """ 

784 Validate multiple app repositories in parallel. 

785 

786 Args: 

787 apps: List of AppConfig objects to validate 

788 github_token: Optional GitHub token for private repos 

789 max_workers: Maximum parallel workers (default: 10) 

790 

791 Returns: 

792 AppBatchValidationResult with all individual results 

793 """ 

794 results = [] 

795 

796 with ThreadPoolExecutor(max_workers=min(max_workers, len(apps))) as executor: 

797 futures = [executor.submit(app.validate_repo_exists, github_token) for app in apps] 

798 for future in as_completed(futures): 

799 result = future.result() 

800 results.append(result) 

801 if not result.success and "Git is not installed" in result.error_message: 

802 break 

803 

804 return AppBatchValidationResult(results=results) 

805 

806 

807class DNSProviderConfig(BaseModel): 

808 """DNS provider credentials for DNS-01 challenge at bench level.""" 

809 

810 email: EmailStr | None = Field(None, description="DNS provider account email (if required).") 

811 api_token: str | None = Field(None, description="DNS provider API Token.") 

812 api_key: str | None = Field(None, description="DNS provider API Key.") 

813 

814 @property 

815 def exists(self) -> bool: 

816 """Check if any DNS credentials are configured.""" 

817 return bool(self.api_token or self.api_key) 

818 

819 def get_toml_doc(self): 

820 """Convert to TOML document.""" 

821 model_dict = self.model_dump(exclude_none=True) 

822 toml_doc = tomlkit.document() 

823 

824 for key, value in model_dict.items(): 

825 toml_doc[key] = value 

826 return toml_doc 

827 

828 @classmethod 

829 def import_from_toml_doc(cls, toml_doc): 

830 """Import from TOML document.""" 

831 return cls(**toml_doc) 

832 

833 

834def ssl_certificate_from_toml_data(ssl_data: dict, domain: str) -> SSLCertificate: 

835 """Parse a single certificate from TOML data.""" 

836 ssl_type = ssl_data.get("ssl_type", SUPPORTED_SSL_TYPES.none) 

837 

838 if ssl_type == SUPPORTED_SSL_TYPES.le: 

839 # Email field removed - Let's Encrypt discontinued notifications (June 2025) 

840 # Remove email from TOML data if present (backward compatibility) 

841 ssl_data.pop("email", None) 

842 

843 fm_config_manager = FMConfigManager.import_from_toml() 

844 

845 # Read challenge_type (new field) or preferred_challenge (backward compat) 

846 challenge_type = ssl_data.get("challenge_type") 

847 if not challenge_type: 

848 # Fall back to preferred_challenge for backward compatibility 

849 challenge_type = ssl_data.get("preferred_challenge") 

850 

851 api_token = ssl_data.get("api_token") 

852 if not api_token: 

853 api_token = fm_config_manager.cloudflare.api_token 

854 

855 api_key = ssl_data.get("api_key") 

856 if not api_key: 

857 api_key = fm_config_manager.cloudflare.api_key 

858 

859 # If no challenge type specified, infer from available credentials 

860 if not challenge_type: 

861 if fm_config_manager.cloudflare.exists: 

862 challenge_type = LETSENCRYPT_PREFERRED_CHALLENGE.dns01 

863 else: 

864 challenge_type = LETSENCRYPT_PREFERRED_CHALLENGE.http01 

865 

866 # Read acme_client field (defaults to "acme.sh" if not specified) 

867 acme_client = ssl_data.get("acme_client", "acme.sh") 

868 

869 return LetsencryptSSLCertificate( 

870 domain=domain, 

871 ssl_type=ssl_type, 

872 # Email removed - credentials loaded from FM config 

873 challenge_type=challenge_type, 

874 api_key=api_key, 

875 api_token=api_token, 

876 acme_client=acme_client, 

877 ) 

878 return SSLCertificate(domain=domain, ssl_type=SUPPORTED_SSL_TYPES.none) 

879 

880 

881def ssl_certificate_to_toml_doc(cert: SSLCertificate) -> tomlkit.TOMLDocument | None: 

882 """Convert a single certificate to TOML document.""" 

883 if cert.ssl_type == SUPPORTED_SSL_TYPES.none: 

884 return None 

885 

886 # Explicitly exclude only the computed field, but INCLUDE domain 

887 model_dict = cert.model_dump(exclude={"toml_exclude"}, exclude_none=True) 

888 toml_doc = tomlkit.document() 

889 

890 for key, value in model_dict.items(): 

891 if isinstance(value, Path): 

892 toml_doc[key] = str(value.absolute()) 

893 else: 

894 toml_doc[key] = value 

895 return toml_doc 

896 

897 

898def ssl_certificates_to_toml_array(certs: list[SSLCertificate]) -> TOMLArray: 

899 """Convert a list of certificates to TOML array-of-tables.""" 

900 # Use aot() for array-of-tables format [[ssl_certificates]] 

901 toml_aot = tomlkit.aot() 

902 for cert in certs: 

903 if cert.ssl_type != SUPPORTED_SSL_TYPES.none: 

904 cert_doc = ssl_certificate_to_toml_doc(cert) 

905 if cert_doc: 

906 toml_aot.append(cert_doc) 

907 return toml_aot 

908 

909 

910class MigrationState(BaseModel): 

911 migrated_to: str | None = Field(None, description="Version bench is migrated to (e.g., '0.19.0')") 

912 last_migration_date: str | None = Field(None, description="ISO timestamp of last migration") 

913 

914 

915class BenchConfig(BaseModel): 

916 name: str = Field(..., description="The name of the bench") 

917 developer_mode: bool = Field(..., description="Whether developer mode is enabled") 

918 admin_tools: bool = Field(..., description="Whether admin tools are enabled") 

919 environment_type: FMBenchEnvType = Field(..., description="The type of environment") 

920 

921 # Multi-certificate support 

922 ssl_certificates: list[SSLCertificate] = Field(default=[], description="List of SSL certificates for this bench") 

923 

924 # DNS provider credentials for DNS-01 challenge (optional, bench-specific override) 

925 dns_providers: dict[str, DNSProviderConfig] | None = Field( 

926 default=None, 

927 description="DNS provider credentials for DNS-01 challenge (e.g., {'cloudflare': {...}})", 

928 ) 

929 

930 alias_domains: list[str] = Field(default=[], description="List of alias domains for the bench") 

931 

932 upload_limit: str = Field(default="50M", description="Maximum upload size (e.g., '50M', '100M', '500M', '1G')") 

933 

934 admin_pass: str = Field("admin", description="The admin password") 

935 root_path: Path = Field(..., description="The root path") 

936 apps_list: list["AppConfig"] = Field(default=[], description="List of apps") 

937 userid: int = Field(default_factory=os.getuid, description="The user ID of the current process") 

938 usergroup: int = Field(default_factory=os.getgid, description="The group ID of the current process") 

939 admin_tools_username: str | None = Field(None, description="Username for admin tools basic auth") 

940 admin_tools_password: str | None = Field(None, description="Password for admin tools basic auth") 

941 

942 # NEW: GitHub token for private repositories 

943 github_token: str | None = Field(None, description="GitHub personal access token for private repositories") 

944 

945 # NEW: UV installation preference (always True, with fallback) 

946 use_uv: bool = Field( 

947 True, 

948 description="Use UV for faster Python package installation (with automatic fallback to pip)", 

949 ) 

950 

951 # NEW: Auto-detected Python and Node version requirements from frappe 

952 python_version: str | None = Field( 

953 None, 

954 description="Python version requirement from frappe app (e.g., '>=3.10,<3.14')", 

955 ) 

956 node_version: str | None = Field(None, description="Node version requirement from frappe app (e.g., '>=18')") 

957 

958 # Database name (randomized on creation to avoid conflicts) 

959 db_name: str | None = Field(None, description="Database name for this bench (auto-generated random string)") 

960 

961 restart_policy: RestartPolicyEnum | None = Field( 

962 default=None, 

963 description="Docker Compose restart policy for all services in this bench", 

964 ) 

965 

966 migration_state: MigrationState | None = Field( 

967 None, 

968 description="Migration state tracking (managed by migration system)", 

969 ) 

970 

971 # NewRelic APM 

972 newrelic_enabled: bool = Field(False, description="Enable NewRelic APM monitoring for the web process") 

973 newrelic_license_key: str | None = Field(None, description="NewRelic license key (ingest key)") 

974 

975 @field_validator("restart_policy", mode="before") 

976 @classmethod 

977 def set_default_restart_policy(cls, v, info): 

978 if v is None and "environment_type" in info.data: 

979 env_type = info.data["environment_type"] 

980 if env_type == FMBenchEnvType.prod: 

981 return RestartPolicyEnum.unless_stopped 

982 return RestartPolicyEnum.no 

983 return v if v is not None else RestartPolicyEnum.no 

984 

985 def get_apps_config(self) -> list[AppConfig]: 

986 """ 

987 Convert apps_list to AppConfig objects. 

988 

989 Handles conversion from simple format to detailed AppConfig. 

990 

991 Returns: 

992 List of AppConfig objects 

993 """ 

994 return self.apps_list 

995 

996 def get_primary_certificate(self) -> SSLCertificate: 

997 """ 

998 Get the primary SSL certificate (certificate for the bench's primary domain). 

999 

1000 Returns the first certificate in ssl_certificates list, which should be 

1001 the certificate for the bench's primary domain, or creates a default 

1002 disabled certificate if the list is empty. 

1003 

1004 Note: With individual certificates, each domain has its own entry in 

1005 ssl_certificates. The first entry is conventionally the primary domain. 

1006 """ 

1007 if self.ssl_certificates: 

1008 return self.ssl_certificates[0] 

1009 

1010 # Return default disabled certificate 

1011 return SSLCertificate(domain=self.name, ssl_type=SUPPORTED_SSL_TYPES.none) 

1012 

1013 def set_primary_certificate(self, certificate: SSLCertificate): 

1014 """ 

1015 Set the primary SSL certificate (certificate for the bench's primary domain). 

1016 

1017 Updates the first certificate in the list, or creates a new list with this 

1018 certificate. This is typically used when enabling/disabling SSL for the 

1019 primary domain. 

1020 

1021 Note: For managing individual domain certificates, use create_individual_certificates() 

1022 or directly manipulate the ssl_certificates list. 

1023 """ 

1024 if self.ssl_certificates: 

1025 self.ssl_certificates[0] = certificate 

1026 else: 

1027 self.ssl_certificates = [certificate] 

1028 

1029 def create_individual_certificates(self, template_certificate: SSLCertificate) -> None: 

1030 """ 

1031 Create individual certificate entries for primary domain and all alias domains. 

1032 

1033 This replaces any existing certificates with individual certificate entries 

1034 for each domain (primary + aliases), using the template certificate as a base. 

1035 

1036 Args: 

1037 template_certificate: Certificate configuration to use as template 

1038 (email, acme_client, ssl_type, etc.) 

1039 """ 

1040 from copy import deepcopy 

1041 

1042 new_certificates = [] 

1043 

1044 # Create certificate for primary domain 

1045 primary_cert = deepcopy(template_certificate) 

1046 primary_cert.domain = self.name 

1047 new_certificates.append(primary_cert) 

1048 

1049 # Create individual certificates for each alias domain 

1050 for alias_domain in self.alias_domains: 

1051 alias_cert = deepcopy(template_certificate) 

1052 alias_cert.domain = alias_domain 

1053 new_certificates.append(alias_cert) 

1054 

1055 # Replace all certificates with individual ones 

1056 self.ssl_certificates = new_certificates 

1057 

1058 @property 

1059 def container_name_prefix(self): 

1060 return get_container_name_prefix(self.name) 

1061 

1062 def get_all_domains(self) -> list[str]: 

1063 """ 

1064 Get all domains configured for this bench (primary + aliases). 

1065 

1066 Returns: 

1067 List of all domains that can have SSL certificates. 

1068 """ 

1069 all_domains = [self.name] 

1070 if self.alias_domains: 

1071 all_domains.extend(self.alias_domains) 

1072 return all_domains 

1073 

1074 def export_to_toml(self, path: Path) -> bool: 

1075 """ 

1076 Export bench configuration to TOML file. 

1077 """ 

1078 exclude = { 

1079 "root_path", 

1080 "mariadb_root_pass", 

1081 "userid", 

1082 "mariadb_host", 

1083 "usergroup", 

1084 "apps_list", 

1085 "frappe_branch", 

1086 "admin_pass", 

1087 } 

1088 

1089 # Convert the BenchConfig instance to a dictionary 

1090 bench_dict = self.model_dump(exclude=exclude, exclude_none=True) 

1091 

1092 # Handle SSL certificates 

1093 if self.ssl_certificates: 

1094 # Export as array 

1095 certs_array = ssl_certificates_to_toml_array(self.ssl_certificates) 

1096 if len(certs_array) > 0: 

1097 bench_dict["ssl_certificates"] = certs_array 

1098 else: 

1099 # No active certificates, remove the key 

1100 bench_dict.pop("ssl_certificates", None) 

1101 else: 

1102 # No certificates at all 

1103 bench_dict.pop("ssl_certificates", None) 

1104 

1105 # Handle DNS providers (convert to nested tables) 

1106 if self.dns_providers: 

1107 dns_providers_toml = tomlkit.table() 

1108 for provider_name, provider_config in self.dns_providers.items(): 

1109 if provider_config.exists: 

1110 dns_providers_toml[provider_name] = provider_config.get_toml_doc() 

1111 if len(dns_providers_toml) > 0: 

1112 bench_dict["dns_providers"] = dns_providers_toml 

1113 else: 

1114 bench_dict.pop("dns_providers", None) 

1115 else: 

1116 bench_dict.pop("dns_providers", None) 

1117 

1118 # Serialize the dictionary to a TOML string 

1119 toml_doc = tomlkit.document() 

1120 

1121 for key, value in bench_dict.items(): 

1122 if isinstance(value, Path): 

1123 toml_doc[key] = str(value.absolute()) 

1124 else: 

1125 toml_doc[key] = value 

1126 

1127 try: 

1128 with open(path, "w") as f: 

1129 f.write(tomlkit.dumps(toml_doc)) 

1130 return True 

1131 except Exception as e: 

1132 return False 

1133 

1134 @classmethod 

1135 def import_from_toml(cls, path: Path) -> "BenchConfig": 

1136 """ 

1137 Import bench configuration from TOML file. 

1138 

1139 Uses the multi-certificate format (ssl_certificates array). 

1140 """ 

1141 data = tomlkit.parse(path.read_text()) 

1142 data["root_path"] = str(path) 

1143 

1144 domain: str = data.get("name", "") 

1145 ssl_certificates_list: list[SSLCertificate] = [] 

1146 

1147 # Parse multi-certificate format 

1148 ssl_certificates_data = data.get("ssl_certificates", None) 

1149 if ssl_certificates_data and isinstance(ssl_certificates_data, list): 

1150 for cert_data in ssl_certificates_data: 

1151 cert_domain = cert_data.get("domain", domain) 

1152 ssl_cert = ssl_certificate_from_toml_data(cert_data, cert_domain) 

1153 ssl_certificates_list.append(ssl_cert) 

1154 

1155 # If no certificates found, start with empty list 

1156 # (default cert will be created via get_primary_certificate() when needed) 

1157 

1158 # Read alias_domains from root level only 

1159 alias_domains_list = data.get("alias_domains", []) 

1160 

1161 # Parse DNS providers (nested tables) 

1162 dns_providers_dict = {} 

1163 dns_providers_data = data.get("dns_providers", None) 

1164 if dns_providers_data and isinstance(dns_providers_data, dict): 

1165 for provider_name, provider_data in dns_providers_data.items(): 

1166 if isinstance(provider_data, dict): 

1167 dns_providers_dict[provider_name] = DNSProviderConfig.import_from_toml_doc(provider_data) 

1168 

1169 migration_state_data = data.get("migration_state", None) 

1170 migration_state_obj = None 

1171 if migration_state_data and isinstance(migration_state_data, dict): 

1172 migration_state_obj = MigrationState(**migration_state_data) 

1173 

1174 input_data = { 

1175 "name": data.get("name", None), 

1176 "developer_mode": data.get("developer_mode", None), 

1177 "admin_tools": data.get("admin_tools", False), 

1178 "environment_type": data.get("environment_type", None), 

1179 "root_path": data.get("root_path", None), 

1180 "ssl_certificates": ssl_certificates_list, 

1181 "dns_providers": dns_providers_dict if dns_providers_dict else None, 

1182 "alias_domains": alias_domains_list, 

1183 "upload_limit": data.get("upload_limit", "50M"), 

1184 "admin_tools_username": data.get("admin_tools_username", None), 

1185 "admin_tools_password": data.get("admin_tools_password", None), 

1186 "admin_pass": data.get("admin_pass", "admin"), 

1187 "apps_list": data.get("apps_list", []), 

1188 "github_token": data.get("github_token", None), 

1189 "use_uv": data.get("use_uv", True), 

1190 "python_version": data.get("python_version", None), 

1191 "node_version": data.get("node_version", None), 

1192 "db_name": data.get("db_name"), 

1193 "restart_policy": data.get("restart_policy", None), 

1194 "migration_state": migration_state_obj, 

1195 "newrelic_enabled": data.get("newrelic_enabled", False), 

1196 "newrelic_license_key": data.get("newrelic_license_key", None), 

1197 } 

1198 

1199 bench_config_instance = cls(**input_data) 

1200 return bench_config_instance 

1201 

1202 def get_commmon_site_config_data(self, db_server_info: DatabaseServerServiceInfo) -> dict[str, Any]: 

1203 common_site_config_data = get_bench_connection_config( 

1204 self.container_name_prefix, db_server_info.host, db_server_info.port 

1205 ) 

1206 common_site_config_data.update( 

1207 { 

1208 "install_apps": [], 

1209 "webserver_port": 80, 

1210 "socketio_port": 80, 

1211 "restart_supervisor_on_update": 0, 

1212 "developer_mode": self.developer_mode, 

1213 } 

1214 ) 

1215 

1216 return common_site_config_data 

1217 

1218 def get_site_mappings(self) -> dict[str, str]: 

1219 mappings = {} 

1220 mappings[self.name] = self.name 

1221 if self.alias_domains: 

1222 for alias in self.alias_domains: 

1223 mappings[alias] = self.name 

1224 return mappings 

1225 

1226 def export_to_compose_inputs(self): 

1227 all_domains = [self.name] 

1228 if self.alias_domains: 

1229 all_domains.extend(self.alias_domains) 

1230 domains_string = ",".join(all_domains) 

1231 

1232 environment = { 

1233 "frappe": { 

1234 "USERID": self.userid, 

1235 "USERGROUP": self.usergroup, 

1236 "SERVICE_NAME": "frappe", 

1237 **( 

1238 {"NEWRELIC_ENABLED": "true", "NEWRELIC_LICENSE_KEY": self.newrelic_license_key} 

1239 if self.newrelic_enabled and self.newrelic_license_key 

1240 else {} 

1241 ), 

1242 }, 

1243 "nginx": { 

1244 "SITE_MAPPINGS": json.dumps(self.get_site_mappings()), 

1245 "VIRTUAL_HOST": domains_string, 

1246 "VIRTUAL_PORT": 80, 

1247 "HTTPS_METHOD": "noredirect", 

1248 "HSTS": self.get_primary_certificate().hsts, 

1249 "CLIENT_MAX_BODY_SIZE": self.upload_limit.lower(), 

1250 }, 

1251 "worker": { 

1252 "USERID": self.userid, 

1253 "USERGROUP": self.usergroup, 

1254 }, 

1255 "schedule": { 

1256 "USERID": self.userid, 

1257 "USERGROUP": self.usergroup, 

1258 "SERVICE_NAME": "schedule", 

1259 }, 

1260 "socketio": { 

1261 "USERID": self.userid, 

1262 "USERGROUP": self.usergroup, 

1263 "SERVICE_NAME": "socketio", 

1264 }, 

1265 } 

1266 

1267 users: dict = {"nginx": {"uid": self.userid, "gid": self.usergroup}} 

1268 template_inputs: dict = { 

1269 "environment": environment, 

1270 "user": users, 

1271 "restart_policy": self.restart_policy.value, 

1272 } 

1273 return template_inputs