Coverage for frappe_manager / site_manager / bench_config.py: 39%
573 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1import json
2import os
3import re
4import subprocess
5from concurrent.futures import ThreadPoolExecutor, as_completed
6from dataclasses import dataclass
7from enum import Enum
8from pathlib import Path
9from typing import Any
11import tomlkit
12from pydantic import BaseModel, EmailStr, Field, field_validator
13from tomlkit.items import Array as TOMLArray
15from frappe_manager import CLI_DEFAULT_DELIMETER
16from frappe_manager.metadata_manager import FMConfigManager
17from frappe_manager.services_manager.database_service_manager import DatabaseServerServiceInfo
18from frappe_manager.ssl_manager import LETSENCRYPT_PREFERRED_CHALLENGE, SUPPORTED_SSL_TYPES
19from frappe_manager.ssl_manager.certificate import SSLCertificate
20from frappe_manager.ssl_manager.letsencrypt_certificate import LetsencryptSSLCertificate
21from frappe_manager.utils.helpers import get_container_name_prefix, get_bench_connection_config
24def extract_app_python_module_name(app_path: Path) -> str:
25 """
26 Extract Python module name from pyproject.toml or hooks.py.
28 This is critical for subdirectory apps where the directory name may not match
29 the Python module name. For example:
30 - Directory: frappe-consent-management (with dashes)
31 - Python module: frappe_consent_management (with underscores)
33 Priority order:
34 1. pyproject.toml [project] name field
35 2. hooks.py app_name variable
36 3. Fallback to directory name
38 Args:
39 app_path: Path to the app directory
41 Returns:
42 Python module name (e.g., "frappe_consent_management")
43 """
44 # Try pyproject.toml first (most reliable)
45 pyproject = app_path / "pyproject.toml"
46 if pyproject.exists():
47 try:
48 data = tomlkit.parse(pyproject.read_text())
49 if "project" in data and "name" in data["project"]:
50 return data["project"]["name"]
51 except Exception:
52 pass # Fall through to next method
54 # Try hooks.py (Frappe convention)
55 # Search for hooks.py in immediate subdirectories (frappe convention: app_name/hooks.py)
56 hooks_files = list(app_path.glob("*/hooks.py"))
58 # Filter to only top-level hooks.py (not nested deeper)
59 top_level_hooks = [f for f in hooks_files if len(f.relative_to(app_path).parts) == 2]
61 if top_level_hooks:
62 try:
63 hooks_py = top_level_hooks[0]
64 content = hooks_py.read_text()
65 # Match: app_name = "module_name"
66 match = re.search(r'app_name\s*=\s*["\']([^"\']+)["\']', content)
67 if match:
68 return match.group(1)
69 except Exception:
70 pass # Fall through to fallback
72 # Fallback: use directory name
73 return app_path.name
76def extract_python_version_requirement(frappe_app_path: Path) -> str | None:
77 """
78 Extract Python version requirement from frappe app's pyproject.toml.
80 Reads the [project] requires-python field or [tool.poetry.dependencies] python field.
82 Args:
83 frappe_app_path: Path to the frappe app directory
85 Returns:
86 Python version requirement string (e.g., ">=3.10,<3.14") or None if not found
87 """
88 pyproject = frappe_app_path / "pyproject.toml"
89 if not pyproject.exists():
90 return None
92 try:
93 data = tomlkit.parse(pyproject.read_text())
95 # Check [project] requires-python (PEP 621 standard)
96 if "project" in data and "requires-python" in data["project"]:
97 return str(data["project"]["requires-python"])
99 # Check [tool.poetry.dependencies] python (Poetry format)
100 if "tool" in data and "poetry" in data["tool"]:
101 if "dependencies" in data["tool"]["poetry"]:
102 if "python" in data["tool"]["poetry"]["dependencies"]:
103 python_dep = data["tool"]["poetry"]["dependencies"]["python"]
104 # Poetry format can be string or dict
105 if isinstance(python_dep, str):
106 return python_dep
107 if isinstance(python_dep, dict) and "version" in python_dep:
108 return str(python_dep["version"])
110 return None
111 except Exception:
112 return None
115def extract_node_version_requirement(frappe_app_path: Path) -> str | None:
116 """
117 Extract Node version requirement from frappe app's package.json.
119 Reads the engines.node field.
121 Args:
122 frappe_app_path: Path to the frappe app directory
124 Returns:
125 Node version requirement string (e.g., ">=18") or None if not found
126 """
127 package_json = frappe_app_path / "package.json"
128 if not package_json.exists():
129 return None
131 try:
132 import json
134 data = json.loads(package_json.read_text())
136 # Check engines.node
137 if "engines" in data and "node" in data["engines"]:
138 return str(data["engines"]["node"])
140 return None
141 except Exception:
142 return None
145def parse_python_version_for_runtime(version_requirement: str | None) -> str | None:
146 """
147 Parse Python version requirement string to extract a usable Python version.
149 Handles various formats:
150 - ">=3.10,<3.14" -> "3.10"
151 - ">=3.14,<3.15" -> "3.14"
152 - "^3.11" -> "3.11"
153 - "3.11" -> "3.11"
154 - "3.10.5" -> "3.10"
156 Strategy: Extract the minimum compatible version for maximum compatibility.
158 Args:
159 version_requirement: Version requirement string from pyproject.toml
161 Returns:
162 Python version string suitable for UV (e.g., "3.10", "3.14")
163 Returns None if parsing fails
164 """
165 if not version_requirement:
166 return None
168 try:
169 import re
171 # Remove whitespace
172 version_str = version_requirement.strip()
174 # Handle poetry caret (^3.11 -> 3.11)
175 if version_str.startswith("^"):
176 version_str = version_str[1:]
178 # Extract version numbers using regex
179 # Match patterns like: >=3.10, 3.10.5, 3.10
180 match = re.search(r"(\d+)\.(\d+)(?:\.\d+)?", version_str)
181 if match:
182 major = match.group(1)
183 minor = match.group(2)
184 return f"{major}.{minor}"
186 return None
187 except Exception:
188 return None
191def parse_node_version_for_runtime(version_requirement: str | None) -> str | None:
192 """
193 Parse Node version requirement string to extract a usable Node version.
195 Handles various formats:
196 - ">=18" -> "18"
197 - ">=24" -> "24"
198 - "^18.0.0" -> "18"
199 - "18.x" -> "18"
200 - "18.12.0" -> "18"
202 Strategy: Extract the major version for fnm compatibility.
204 Args:
205 version_requirement: Version requirement string from package.json
207 Returns:
208 Node major version string suitable for fnm (e.g., "18", "24")
209 Returns None if parsing fails
210 """
211 if not version_requirement:
212 return None
214 try:
215 import re
217 # Remove whitespace
218 version_str = version_requirement.strip()
220 # Handle poetry/npm caret (^18.0.0 -> 18.0.0)
221 if version_str.startswith("^"):
222 version_str = version_str[1:]
224 # Extract major version number
225 # Match patterns like: >=18, 18.12.0, 18.x, 18
226 match = re.search(r"(\d+)", version_str)
227 if match:
228 return match.group(1)
230 return None
231 except Exception:
232 return None
235def validate_python_version_compatibility(user_version: str, frappe_requirement: str) -> tuple[bool, str]:
236 """
237 Validate if user-provided Python version is compatible with Frappe requirement.
239 Args:
240 user_version: User-provided version (e.g., "3.11", ">=3.10,<3.14")
241 frappe_requirement: Frappe's requirement (e.g., ">=3.14,<3.15")
243 Returns:
244 Tuple of (is_compatible, error_message)
245 """
246 import re
248 def parse_version_range(version_str: str) -> tuple[tuple[int, int] | None, tuple[int, int] | None]:
249 min_ver = None
250 max_ver = None
252 match_min = re.search(r">=(\d+)\.(\d+)", version_str)
253 if match_min:
254 min_ver = (int(match_min.group(1)), int(match_min.group(2)))
256 match_max = re.search(r"<(\d+)\.(\d+)", version_str)
257 if match_max:
258 max_ver = (int(match_max.group(1)), int(match_max.group(2)))
260 exact_match = re.match(r"^(\d+)\.(\d+)$", version_str.strip())
261 if exact_match:
262 ver = (int(exact_match.group(1)), int(exact_match.group(2)))
263 min_ver = ver
264 max_ver = (ver[0], ver[1] + 1)
266 return min_ver, max_ver
268 user_min, user_max = parse_version_range(user_version)
269 frappe_min, frappe_max = parse_version_range(frappe_requirement)
271 if not user_min:
272 return False, f"Could not parse user version: {user_version}"
274 if not frappe_min:
275 return True, ""
277 if frappe_max:
278 if user_min < frappe_min or (user_max and user_max > frappe_max):
279 return False, f"Python {user_version} is incompatible with Frappe requirement {frappe_requirement}"
280 elif user_min < frappe_min:
281 return False, f"Python {user_version} is incompatible with Frappe requirement {frappe_requirement}"
283 return True, ""
286def validate_node_version_compatibility(user_version: str, frappe_requirement: str) -> tuple[bool, str]:
287 """
288 Validate if user-provided Node version is compatible with Frappe requirement.
290 Args:
291 user_version: User-provided version (e.g., "18", ">=20")
292 frappe_requirement: Frappe's requirement (e.g., ">=24")
294 Returns:
295 Tuple of (is_compatible, error_message)
296 """
297 import re
299 def extract_min_version(version_str: str) -> int | None:
300 match = re.search(r">=?(\d+)", version_str)
301 if match:
302 return int(match.group(1))
304 exact_match = re.match(r"^(\d+)$", version_str.strip())
305 if exact_match:
306 return int(exact_match.group(1))
308 return None
310 user_min = extract_min_version(user_version)
311 frappe_min = extract_min_version(frappe_requirement)
313 if user_min is None:
314 return False, f"Could not parse user version: {user_version}"
316 if frappe_min is None:
317 return True, ""
319 if user_min < frappe_min:
320 return False, f"Node {user_version} is incompatible with Frappe requirement {frappe_requirement}"
322 return True, ""
325class FMBenchEnvType(str, Enum):
326 prod = "prod"
327 dev = "dev"
330class RestartPolicyEnum(str, Enum):
331 """
332 Docker Compose restart policy options.
334 - no: Never restart (default, current behavior)
335 - always: Always restart, even after manual stop
336 - on_failure: Restart only on non-zero exit code
337 - unless_stopped: Restart unless manually stopped (recommended for production)
338 """
340 no = "no"
341 always = "always"
342 on_failure = "on-failure"
343 unless_stopped = "unless-stopped"
346@dataclass
347class AppValidationResult:
348 """Result of validating a single app repository."""
350 app_name: str
351 repo: str
352 ref: str | None
353 success: bool
354 auth_method: str | None
355 validated_url: str | None
356 error_message: str | None = None
358 @property
359 def display_message(self) -> str:
360 """Get user-friendly display message."""
361 if self.success:
362 ref_info = (
363 f"commit {self.ref[:8]}..."
364 if self.ref and len(self.ref) == 40
365 else f"branch '{self.ref}'"
366 if self.ref
367 else "default branch"
368 )
369 auth = self.auth_method.lower().replace("github token", "token")
370 return f"{self.app_name} → {self.repo}:{ref_info} accessible via {auth}"
371 return self.error_message or f"{self.app_name} ({self.repo}): Validation failed"
374@dataclass
375class AppBatchValidationResult:
376 """Result of validating multiple app repositories."""
378 results: list[AppValidationResult]
380 @property
381 def all_valid(self) -> bool:
382 """Check if all apps validated successfully."""
383 return all(r.success for r in self.results)
385 @property
386 def success_count(self) -> int:
387 """Count of successfully validated apps."""
388 return sum(1 for r in self.results if r.success)
390 @property
391 def failure_count(self) -> int:
392 """Count of failed validations."""
393 return sum(1 for r in self.results if not r.success)
395 @property
396 def messages(self) -> list[str]:
397 """Get all display messages (success + error)."""
398 return [r.display_message for r in self.results]
401class AppConfig(BaseModel):
402 """
403 Configuration for a single Frappe app.
405 Supports multiple input formats:
406 - Simple: "erpnext:version-15"
407 - Repo: "frappe/erpnext:version-15"
408 - Full URL: "https://github.com/frappe/erpnext:version-15"
409 - Subdirectory: "frappe/frappe:version-15#apps/frappe"
410 """
412 name: str = Field(..., description="App name (e.g., 'erpnext')")
413 repo: str = Field(..., description="GitHub repo (e.g., 'frappe/erpnext')")
414 ref: str | None = Field(None, description="Branch, tag, or commit SHA")
415 repo_url: str | None = Field(None, description="Full repo URL (auto-generated)")
416 shallow_clone: bool = Field(True, description="Use shallow clone (depth=1)")
417 subdir_path: str | None = Field(None, description="Subdirectory path for monorepo apps")
418 symlink: bool = Field(False, description="Use symlink for subdirectory apps")
420 @property
421 def is_commit(self) -> bool:
422 """Check if ref is a commit SHA (40 hex characters)."""
423 if self.ref is None:
424 return False
425 return len(self.ref) == 40 and all(c in "0123456789abcdef" for c in self.ref.lower())
427 def _is_github_repo(self) -> bool:
428 from urllib.parse import urlparse
430 if self.repo.startswith(("http://", "https://")):
431 parsed = urlparse(self.repo)
432 return parsed.netloc == "github.com"
434 if self.repo.startswith("git@"):
435 return self.repo.startswith("git@github.com:")
437 if "/" in self.repo and not self.repo.startswith(("http://", "https://", "git@")):
438 return True
440 return False
442 @classmethod
443 def from_string(cls, app_string: str) -> "AppConfig":
444 """
445 Parse app string into AppConfig.
447 Formats:
448 - "erpnext" → frappe/erpnext:default-branch
449 - "erpnext:version-15" → frappe/erpnext:version-15
450 - "frappe/erpnext:version-15" → frappe/erpnext:version-15
451 - "mycompany/custom-app:main" → mycompany/custom-app:main
452 - "frappe/frappe:version-15#apps/frappe" → subdirectory app
454 Args:
455 app_string: String describing the app to install
457 Returns:
458 AppConfig instance
459 """
460 # Split on '#' for subdirectory
461 if "#" in app_string:
462 app_part, subdir_path = app_string.split("#", 1)
463 else:
464 app_part = app_string
465 subdir_path = None
467 # Check if this is a full URL (starts with protocol or git@)
468 is_full_url = app_part.startswith(("http://", "https://"))
469 is_ssh_url = app_part.startswith("git@")
471 # Split on ':' for branch/ref
472 # For URLs: skip protocol colon, find the LAST colon (if any) for ref
473 # For SSH: format is git@host:org/repo:ref, so find LAST colon after @
474 # For short form: any colon is ref separator
475 ref = None
476 if is_full_url:
477 # For http(s):// URLs, look for colon AFTER the protocol and host
478 # e.g., "https://github.com/frappe/frappe:version-15"
479 # split at ":" -> ["https", "//github.com/frappe/frappe", "version-15"]
480 parts = app_part.split(":")
481 if len(parts) > 2: # Has protocol + potential ref
482 # Rejoin protocol + host/path, last part is ref
483 repo_part = ":".join(parts[:-1])
484 ref = parts[-1]
485 else:
486 repo_part = app_part
487 elif is_ssh_url:
488 # For git@ URLs: git@github.com:frappe/frappe:version-15
489 # Find last colon after @ for ref
490 at_index = app_part.index("@")
491 remainder = app_part[at_index + 1 :]
492 if remainder.count(":") > 1:
493 # Multiple colons: last one is ref separator
494 last_colon_idx = remainder.rfind(":")
495 repo_part = app_part[: at_index + 1 + last_colon_idx]
496 ref = remainder[last_colon_idx + 1 :]
497 else:
498 repo_part = app_part
499 # Short form: org/repo:ref or app:ref
500 elif ":" in app_part:
501 repo_part, ref = app_part.split(":", 1)
502 else:
503 repo_part = app_part
505 # Parse repo (e.g., "frappe/erpnext" or just "erpnext" or full URL)
506 if is_full_url or is_ssh_url:
507 # Full URL: keep as-is, extract name from path
508 repo = repo_part
509 # Extract name from URL path (last segment before .git)
510 path_part = repo_part.split("/")[-1]
511 name = path_part.replace(".git", "")
512 elif "/" in repo_part:
513 repo = repo_part
514 name = repo_part.split("/")[-1]
515 else:
516 name = repo_part
517 repo = f"frappe/{name}" # Default to frappe org
519 # Override name if subdirectory specified
520 if subdir_path:
521 name = subdir_path.split("/")[-1]
523 return cls(
524 name=name,
525 repo=repo,
526 ref=ref,
527 repo_url=None,
528 shallow_clone=True,
529 subdir_path=subdir_path,
530 )
532 @classmethod
533 def from_dict(cls, app_dict: dict[str, str | None], github_token: str | None = None) -> "AppConfig":
534 """
535 Convert from simple dict format to AppConfig.
537 Args:
538 app_dict: {"app": "erpnext", "branch": "version-15"}
539 github_token: Optional GitHub token (unused, kept for backward compatibility)
541 Returns:
542 AppConfig instance
543 """
544 app_name = app_dict.get("app")
545 if not app_name:
546 raise ValueError("app_dict must contain 'app' key with non-empty value")
548 branch = app_dict.get("branch")
550 if branch:
551 app_string: str = f"{app_name}:{branch}"
552 else:
553 app_string: str = app_name
555 return cls.from_string(app_string)
557 @staticmethod
558 def validate_github_token(github_token: str) -> tuple[bool, str | None]:
559 """
560 Validate GitHub token by making an API call to GitHub.
562 Args:
563 github_token: GitHub personal access token
565 Returns:
566 Tuple of (is_valid, error_message)
567 """
568 import subprocess
570 from frappe_manager.logger import log
572 logger = log.get_logger()
574 try:
575 cmd = ["git", "ls-remote", f"https://{github_token}@github.com/octocat/hello-world.git", "HEAD"]
576 env = os.environ.copy()
577 env["GIT_TERMINAL_PROMPT"] = "0"
578 env["GIT_ASKPASS"] = "echo"
580 result = subprocess.run(cmd, capture_output=True, text=True, timeout=10, env=env)
582 if result.returncode == 0:
583 logger.debug("GitHub token validated successfully")
584 return (True, None)
585 error = result.stderr.strip()
586 if "bad credentials" in error.lower() or "authentication failed" in error.lower():
587 return (False, "Invalid or expired GitHub token")
588 if "could not resolve host" in error.lower():
589 return (False, "Network error: Cannot reach GitHub")
590 return (False, f"Token validation failed: {error}")
592 except subprocess.TimeoutExpired:
593 return (False, "Timeout while validating token (network issue?)")
594 except FileNotFoundError:
595 logger.error("Git is not installed or not in PATH")
596 return (False, "Git is not installed")
597 except Exception as e:
598 logger.debug(f"Exception during token validation: {e}")
599 return (False, f"Unexpected error: {e!s}")
601 def get_auth_methods(self, github_token: str | None = None) -> list[tuple[str, str]]:
602 """
603 Get ordered list of authentication methods to try for this app.
605 Returns list of (method_name, repo_url) tuples to try in fallback order.
606 If self.repo_url is already set (from previous validation), returns it first,
607 but ALWAYS includes fallback methods for resilience.
609 Args:
610 github_token: Optional GitHub token for private repos
612 Returns:
613 List of (method_name, url) tuples ordered by priority
614 """
615 methods = []
617 if self.repo_url:
618 methods.append(("Validated URL", self.repo_url))
620 if self.repo.startswith(("http://", "https://", "git@")):
621 if self.repo not in [m[1] for m in methods]:
622 methods.append(("Full URL", self.repo))
623 else:
624 if github_token:
625 token_url = f"https://{github_token}@github.com/{self.repo}.git"
626 if token_url != self.repo_url:
627 methods.append(("GitHub Token", token_url))
629 https_url = f"https://github.com/{self.repo}.git"
630 if https_url != self.repo_url:
631 methods.append(("HTTPS", https_url))
633 ssh_url = f"git@github.com:{self.repo}.git"
634 if ssh_url != self.repo_url:
635 methods.append(("SSH", ssh_url))
637 return methods
639 def validate_repo_exists(self, github_token: str | None = None) -> AppValidationResult:
640 """
641 Validate that this app's repository exists and is accessible.
643 Uses git ls-remote to check repository without cloning.
644 Tries authentication methods in priority order:
645 - With token: Token → HTTPS → SSH
646 - Without token: HTTPS → SSH
647 Validates GitHub token before attempting to use it.
649 Args:
650 github_token: Optional GitHub token for private repos
652 Returns:
653 AppValidationResult with success status, auth method used, and validated URL
654 """
655 from frappe_manager.logger import log
657 logger = log.get_logger()
659 if github_token and self._is_github_repo():
660 is_valid, error_msg = AppConfig.validate_github_token(github_token)
661 if not is_valid:
662 logger.warning(f"GitHub token validation failed: {error_msg}")
663 logger.warning(f"Will attempt to validate {self.name} without token authentication")
665 auth_methods = self.get_auth_methods(github_token)
666 last_error = None
667 last_method = None
669 for method_name, url in auth_methods:
670 try:
671 if self.is_commit:
672 cmd = ["git", "ls-remote", url, "HEAD"]
673 else:
674 cmd = ["git", "ls-remote", "--heads", "--tags", url]
675 if self.ref:
676 cmd.extend([self.ref])
678 env = os.environ.copy()
679 env["GIT_TERMINAL_PROMPT"] = "0"
680 env["GIT_ASKPASS"] = "echo"
682 result = subprocess.run(cmd, capture_output=True, text=True, timeout=10, env=env)
684 if result.returncode == 0:
685 if self.is_commit:
686 if result.stdout:
687 self.repo_url = url
688 logger.info(f"Validated {self.name} ({self.repo}) using {method_name}")
689 return AppValidationResult(
690 app_name=self.name,
691 repo=self.repo,
692 ref=self.ref,
693 success=True,
694 auth_method=method_name,
695 validated_url=url,
696 )
697 last_error = "Repository exists but appears empty or inaccessible"
698 last_method = method_name
699 elif self.ref and not result.stdout:
700 last_error = f"Branch/tag '{self.ref}' not found"
701 last_method = method_name
702 else:
703 self.repo_url = url
704 logger.info(f"Validated {self.name} ({self.repo}) using {method_name}")
705 return AppValidationResult(
706 app_name=self.name,
707 repo=self.repo,
708 ref=self.ref,
709 success=True,
710 auth_method=method_name,
711 validated_url=url,
712 )
713 else:
714 last_error = result.stderr.strip()
715 last_method = method_name
716 logger.debug(f"Validation failed for {self.name} using {method_name}: {last_error}")
718 except subprocess.TimeoutExpired:
719 last_error = "Timeout while checking repository (network issue?)"
720 last_method = method_name
721 logger.debug(f"Timeout validating {self.name} using {method_name}")
722 except FileNotFoundError:
723 logger.error("Git is not installed or not in PATH")
724 return AppValidationResult(
725 app_name=self.name,
726 repo=self.repo,
727 ref=self.ref,
728 success=False,
729 auth_method=None,
730 validated_url=None,
731 error_message="Git is not installed or not in PATH",
732 )
733 except Exception as e:
734 last_error = str(e)
735 last_method = method_name
736 logger.debug(f"Exception validating {self.name} using {method_name}: {e}")
738 ref_info = f" (ref: {self.ref})" if self.ref else ""
740 if last_error and "not found" in last_error.lower():
741 error_msg = f"{self.name} ({self.repo}{ref_info}): Repository not found on GitHub"
742 elif last_error and self.ref and "branch/tag" in last_error.lower():
743 error_msg = (
744 f"{self.name} ({self.repo}): Branch/tag '{self.ref}' not found\n"
745 f" Check branch name or use a valid tag/commit"
746 )
747 elif github_token:
748 logger.warning(f"GitHub token provided but failed for {self.name}")
749 error_msg = (
750 f"{self.name} ({self.repo}{ref_info}): Could not access repository\n"
751 f" • HTTPS: Failed\n"
752 f" • GitHub Token: Failed (token may be invalid/expired/insufficient permissions)\n"
753 f" • SSH: Failed\n"
754 f" Last error ({last_method}): {last_error}"
755 )
756 else:
757 error_msg = (
758 f"{self.name} ({self.repo}{ref_info}): Could not access repository\n"
759 f" • HTTPS: Failed (repo may be private)\n"
760 f" • SSH: Failed\n"
761 f" 💡 If private repo: use --github-token or configure SSH keys\n"
762 f" Last error ({last_method}): {last_error}"
763 )
765 logger.error(f"Failed to validate {self.name}: {last_error}")
766 return AppValidationResult(
767 app_name=self.name,
768 repo=self.repo,
769 ref=self.ref,
770 success=False,
771 auth_method=None,
772 validated_url=None,
773 error_message=error_msg,
774 )
776 @classmethod
777 def validate_repos_batch(
778 cls,
779 apps: list["AppConfig"],
780 github_token: str | None = None,
781 max_workers: int = 10,
782 ) -> AppBatchValidationResult:
783 """
784 Validate multiple app repositories in parallel.
786 Args:
787 apps: List of AppConfig objects to validate
788 github_token: Optional GitHub token for private repos
789 max_workers: Maximum parallel workers (default: 10)
791 Returns:
792 AppBatchValidationResult with all individual results
793 """
794 results = []
796 with ThreadPoolExecutor(max_workers=min(max_workers, len(apps))) as executor:
797 futures = [executor.submit(app.validate_repo_exists, github_token) for app in apps]
798 for future in as_completed(futures):
799 result = future.result()
800 results.append(result)
801 if not result.success and "Git is not installed" in result.error_message:
802 break
804 return AppBatchValidationResult(results=results)
807class DNSProviderConfig(BaseModel):
808 """DNS provider credentials for DNS-01 challenge at bench level."""
810 email: EmailStr | None = Field(None, description="DNS provider account email (if required).")
811 api_token: str | None = Field(None, description="DNS provider API Token.")
812 api_key: str | None = Field(None, description="DNS provider API Key.")
814 @property
815 def exists(self) -> bool:
816 """Check if any DNS credentials are configured."""
817 return bool(self.api_token or self.api_key)
819 def get_toml_doc(self):
820 """Convert to TOML document."""
821 model_dict = self.model_dump(exclude_none=True)
822 toml_doc = tomlkit.document()
824 for key, value in model_dict.items():
825 toml_doc[key] = value
826 return toml_doc
828 @classmethod
829 def import_from_toml_doc(cls, toml_doc):
830 """Import from TOML document."""
831 return cls(**toml_doc)
834def ssl_certificate_from_toml_data(ssl_data: dict, domain: str) -> SSLCertificate:
835 """Parse a single certificate from TOML data."""
836 ssl_type = ssl_data.get("ssl_type", SUPPORTED_SSL_TYPES.none)
838 if ssl_type == SUPPORTED_SSL_TYPES.le:
839 # Email field removed - Let's Encrypt discontinued notifications (June 2025)
840 # Remove email from TOML data if present (backward compatibility)
841 ssl_data.pop("email", None)
843 fm_config_manager = FMConfigManager.import_from_toml()
845 # Read challenge_type (new field) or preferred_challenge (backward compat)
846 challenge_type = ssl_data.get("challenge_type")
847 if not challenge_type:
848 # Fall back to preferred_challenge for backward compatibility
849 challenge_type = ssl_data.get("preferred_challenge")
851 api_token = ssl_data.get("api_token")
852 if not api_token:
853 api_token = fm_config_manager.cloudflare.api_token
855 api_key = ssl_data.get("api_key")
856 if not api_key:
857 api_key = fm_config_manager.cloudflare.api_key
859 # If no challenge type specified, infer from available credentials
860 if not challenge_type:
861 if fm_config_manager.cloudflare.exists:
862 challenge_type = LETSENCRYPT_PREFERRED_CHALLENGE.dns01
863 else:
864 challenge_type = LETSENCRYPT_PREFERRED_CHALLENGE.http01
866 # Read acme_client field (defaults to "acme.sh" if not specified)
867 acme_client = ssl_data.get("acme_client", "acme.sh")
869 return LetsencryptSSLCertificate(
870 domain=domain,
871 ssl_type=ssl_type,
872 # Email removed - credentials loaded from FM config
873 challenge_type=challenge_type,
874 api_key=api_key,
875 api_token=api_token,
876 acme_client=acme_client,
877 )
878 return SSLCertificate(domain=domain, ssl_type=SUPPORTED_SSL_TYPES.none)
881def ssl_certificate_to_toml_doc(cert: SSLCertificate) -> tomlkit.TOMLDocument | None:
882 """Convert a single certificate to TOML document."""
883 if cert.ssl_type == SUPPORTED_SSL_TYPES.none:
884 return None
886 # Explicitly exclude only the computed field, but INCLUDE domain
887 model_dict = cert.model_dump(exclude={"toml_exclude"}, exclude_none=True)
888 toml_doc = tomlkit.document()
890 for key, value in model_dict.items():
891 if isinstance(value, Path):
892 toml_doc[key] = str(value.absolute())
893 else:
894 toml_doc[key] = value
895 return toml_doc
898def ssl_certificates_to_toml_array(certs: list[SSLCertificate]) -> TOMLArray:
899 """Convert a list of certificates to TOML array-of-tables."""
900 # Use aot() for array-of-tables format [[ssl_certificates]]
901 toml_aot = tomlkit.aot()
902 for cert in certs:
903 if cert.ssl_type != SUPPORTED_SSL_TYPES.none:
904 cert_doc = ssl_certificate_to_toml_doc(cert)
905 if cert_doc:
906 toml_aot.append(cert_doc)
907 return toml_aot
910class MigrationState(BaseModel):
911 migrated_to: str | None = Field(None, description="Version bench is migrated to (e.g., '0.19.0')")
912 last_migration_date: str | None = Field(None, description="ISO timestamp of last migration")
915class BenchConfig(BaseModel):
916 name: str = Field(..., description="The name of the bench")
917 developer_mode: bool = Field(..., description="Whether developer mode is enabled")
918 admin_tools: bool = Field(..., description="Whether admin tools are enabled")
919 environment_type: FMBenchEnvType = Field(..., description="The type of environment")
921 # Multi-certificate support
922 ssl_certificates: list[SSLCertificate] = Field(default=[], description="List of SSL certificates for this bench")
924 # DNS provider credentials for DNS-01 challenge (optional, bench-specific override)
925 dns_providers: dict[str, DNSProviderConfig] | None = Field(
926 default=None,
927 description="DNS provider credentials for DNS-01 challenge (e.g., {'cloudflare': {...}})",
928 )
930 alias_domains: list[str] = Field(default=[], description="List of alias domains for the bench")
932 upload_limit: str = Field(default="50M", description="Maximum upload size (e.g., '50M', '100M', '500M', '1G')")
934 admin_pass: str = Field("admin", description="The admin password")
935 root_path: Path = Field(..., description="The root path")
936 apps_list: list["AppConfig"] = Field(default=[], description="List of apps")
937 userid: int = Field(default_factory=os.getuid, description="The user ID of the current process")
938 usergroup: int = Field(default_factory=os.getgid, description="The group ID of the current process")
939 admin_tools_username: str | None = Field(None, description="Username for admin tools basic auth")
940 admin_tools_password: str | None = Field(None, description="Password for admin tools basic auth")
942 # NEW: GitHub token for private repositories
943 github_token: str | None = Field(None, description="GitHub personal access token for private repositories")
945 # NEW: UV installation preference (always True, with fallback)
946 use_uv: bool = Field(
947 True,
948 description="Use UV for faster Python package installation (with automatic fallback to pip)",
949 )
951 # NEW: Auto-detected Python and Node version requirements from frappe
952 python_version: str | None = Field(
953 None,
954 description="Python version requirement from frappe app (e.g., '>=3.10,<3.14')",
955 )
956 node_version: str | None = Field(None, description="Node version requirement from frappe app (e.g., '>=18')")
958 # Database name (randomized on creation to avoid conflicts)
959 db_name: str | None = Field(None, description="Database name for this bench (auto-generated random string)")
961 restart_policy: RestartPolicyEnum | None = Field(
962 default=None,
963 description="Docker Compose restart policy for all services in this bench",
964 )
966 migration_state: MigrationState | None = Field(
967 None,
968 description="Migration state tracking (managed by migration system)",
969 )
971 # NewRelic APM
972 newrelic_enabled: bool = Field(False, description="Enable NewRelic APM monitoring for the web process")
973 newrelic_license_key: str | None = Field(None, description="NewRelic license key (ingest key)")
975 @field_validator("restart_policy", mode="before")
976 @classmethod
977 def set_default_restart_policy(cls, v, info):
978 if v is None and "environment_type" in info.data:
979 env_type = info.data["environment_type"]
980 if env_type == FMBenchEnvType.prod:
981 return RestartPolicyEnum.unless_stopped
982 return RestartPolicyEnum.no
983 return v if v is not None else RestartPolicyEnum.no
985 def get_apps_config(self) -> list[AppConfig]:
986 """
987 Convert apps_list to AppConfig objects.
989 Handles conversion from simple format to detailed AppConfig.
991 Returns:
992 List of AppConfig objects
993 """
994 return self.apps_list
996 def get_primary_certificate(self) -> SSLCertificate:
997 """
998 Get the primary SSL certificate (certificate for the bench's primary domain).
1000 Returns the first certificate in ssl_certificates list, which should be
1001 the certificate for the bench's primary domain, or creates a default
1002 disabled certificate if the list is empty.
1004 Note: With individual certificates, each domain has its own entry in
1005 ssl_certificates. The first entry is conventionally the primary domain.
1006 """
1007 if self.ssl_certificates:
1008 return self.ssl_certificates[0]
1010 # Return default disabled certificate
1011 return SSLCertificate(domain=self.name, ssl_type=SUPPORTED_SSL_TYPES.none)
1013 def set_primary_certificate(self, certificate: SSLCertificate):
1014 """
1015 Set the primary SSL certificate (certificate for the bench's primary domain).
1017 Updates the first certificate in the list, or creates a new list with this
1018 certificate. This is typically used when enabling/disabling SSL for the
1019 primary domain.
1021 Note: For managing individual domain certificates, use create_individual_certificates()
1022 or directly manipulate the ssl_certificates list.
1023 """
1024 if self.ssl_certificates:
1025 self.ssl_certificates[0] = certificate
1026 else:
1027 self.ssl_certificates = [certificate]
1029 def create_individual_certificates(self, template_certificate: SSLCertificate) -> None:
1030 """
1031 Create individual certificate entries for primary domain and all alias domains.
1033 This replaces any existing certificates with individual certificate entries
1034 for each domain (primary + aliases), using the template certificate as a base.
1036 Args:
1037 template_certificate: Certificate configuration to use as template
1038 (email, acme_client, ssl_type, etc.)
1039 """
1040 from copy import deepcopy
1042 new_certificates = []
1044 # Create certificate for primary domain
1045 primary_cert = deepcopy(template_certificate)
1046 primary_cert.domain = self.name
1047 new_certificates.append(primary_cert)
1049 # Create individual certificates for each alias domain
1050 for alias_domain in self.alias_domains:
1051 alias_cert = deepcopy(template_certificate)
1052 alias_cert.domain = alias_domain
1053 new_certificates.append(alias_cert)
1055 # Replace all certificates with individual ones
1056 self.ssl_certificates = new_certificates
1058 @property
1059 def container_name_prefix(self):
1060 return get_container_name_prefix(self.name)
1062 def get_all_domains(self) -> list[str]:
1063 """
1064 Get all domains configured for this bench (primary + aliases).
1066 Returns:
1067 List of all domains that can have SSL certificates.
1068 """
1069 all_domains = [self.name]
1070 if self.alias_domains:
1071 all_domains.extend(self.alias_domains)
1072 return all_domains
1074 def export_to_toml(self, path: Path) -> bool:
1075 """
1076 Export bench configuration to TOML file.
1077 """
1078 exclude = {
1079 "root_path",
1080 "mariadb_root_pass",
1081 "userid",
1082 "mariadb_host",
1083 "usergroup",
1084 "apps_list",
1085 "frappe_branch",
1086 "admin_pass",
1087 }
1089 # Convert the BenchConfig instance to a dictionary
1090 bench_dict = self.model_dump(exclude=exclude, exclude_none=True)
1092 # Handle SSL certificates
1093 if self.ssl_certificates:
1094 # Export as array
1095 certs_array = ssl_certificates_to_toml_array(self.ssl_certificates)
1096 if len(certs_array) > 0:
1097 bench_dict["ssl_certificates"] = certs_array
1098 else:
1099 # No active certificates, remove the key
1100 bench_dict.pop("ssl_certificates", None)
1101 else:
1102 # No certificates at all
1103 bench_dict.pop("ssl_certificates", None)
1105 # Handle DNS providers (convert to nested tables)
1106 if self.dns_providers:
1107 dns_providers_toml = tomlkit.table()
1108 for provider_name, provider_config in self.dns_providers.items():
1109 if provider_config.exists:
1110 dns_providers_toml[provider_name] = provider_config.get_toml_doc()
1111 if len(dns_providers_toml) > 0:
1112 bench_dict["dns_providers"] = dns_providers_toml
1113 else:
1114 bench_dict.pop("dns_providers", None)
1115 else:
1116 bench_dict.pop("dns_providers", None)
1118 # Serialize the dictionary to a TOML string
1119 toml_doc = tomlkit.document()
1121 for key, value in bench_dict.items():
1122 if isinstance(value, Path):
1123 toml_doc[key] = str(value.absolute())
1124 else:
1125 toml_doc[key] = value
1127 try:
1128 with open(path, "w") as f:
1129 f.write(tomlkit.dumps(toml_doc))
1130 return True
1131 except Exception as e:
1132 return False
1134 @classmethod
1135 def import_from_toml(cls, path: Path) -> "BenchConfig":
1136 """
1137 Import bench configuration from TOML file.
1139 Uses the multi-certificate format (ssl_certificates array).
1140 """
1141 data = tomlkit.parse(path.read_text())
1142 data["root_path"] = str(path)
1144 domain: str = data.get("name", "")
1145 ssl_certificates_list: list[SSLCertificate] = []
1147 # Parse multi-certificate format
1148 ssl_certificates_data = data.get("ssl_certificates", None)
1149 if ssl_certificates_data and isinstance(ssl_certificates_data, list):
1150 for cert_data in ssl_certificates_data:
1151 cert_domain = cert_data.get("domain", domain)
1152 ssl_cert = ssl_certificate_from_toml_data(cert_data, cert_domain)
1153 ssl_certificates_list.append(ssl_cert)
1155 # If no certificates found, start with empty list
1156 # (default cert will be created via get_primary_certificate() when needed)
1158 # Read alias_domains from root level only
1159 alias_domains_list = data.get("alias_domains", [])
1161 # Parse DNS providers (nested tables)
1162 dns_providers_dict = {}
1163 dns_providers_data = data.get("dns_providers", None)
1164 if dns_providers_data and isinstance(dns_providers_data, dict):
1165 for provider_name, provider_data in dns_providers_data.items():
1166 if isinstance(provider_data, dict):
1167 dns_providers_dict[provider_name] = DNSProviderConfig.import_from_toml_doc(provider_data)
1169 migration_state_data = data.get("migration_state", None)
1170 migration_state_obj = None
1171 if migration_state_data and isinstance(migration_state_data, dict):
1172 migration_state_obj = MigrationState(**migration_state_data)
1174 input_data = {
1175 "name": data.get("name", None),
1176 "developer_mode": data.get("developer_mode", None),
1177 "admin_tools": data.get("admin_tools", False),
1178 "environment_type": data.get("environment_type", None),
1179 "root_path": data.get("root_path", None),
1180 "ssl_certificates": ssl_certificates_list,
1181 "dns_providers": dns_providers_dict if dns_providers_dict else None,
1182 "alias_domains": alias_domains_list,
1183 "upload_limit": data.get("upload_limit", "50M"),
1184 "admin_tools_username": data.get("admin_tools_username", None),
1185 "admin_tools_password": data.get("admin_tools_password", None),
1186 "admin_pass": data.get("admin_pass", "admin"),
1187 "apps_list": data.get("apps_list", []),
1188 "github_token": data.get("github_token", None),
1189 "use_uv": data.get("use_uv", True),
1190 "python_version": data.get("python_version", None),
1191 "node_version": data.get("node_version", None),
1192 "db_name": data.get("db_name"),
1193 "restart_policy": data.get("restart_policy", None),
1194 "migration_state": migration_state_obj,
1195 "newrelic_enabled": data.get("newrelic_enabled", False),
1196 "newrelic_license_key": data.get("newrelic_license_key", None),
1197 }
1199 bench_config_instance = cls(**input_data)
1200 return bench_config_instance
1202 def get_commmon_site_config_data(self, db_server_info: DatabaseServerServiceInfo) -> dict[str, Any]:
1203 common_site_config_data = get_bench_connection_config(
1204 self.container_name_prefix, db_server_info.host, db_server_info.port
1205 )
1206 common_site_config_data.update(
1207 {
1208 "install_apps": [],
1209 "webserver_port": 80,
1210 "socketio_port": 80,
1211 "restart_supervisor_on_update": 0,
1212 "developer_mode": self.developer_mode,
1213 }
1214 )
1216 return common_site_config_data
1218 def get_site_mappings(self) -> dict[str, str]:
1219 mappings = {}
1220 mappings[self.name] = self.name
1221 if self.alias_domains:
1222 for alias in self.alias_domains:
1223 mappings[alias] = self.name
1224 return mappings
1226 def export_to_compose_inputs(self):
1227 all_domains = [self.name]
1228 if self.alias_domains:
1229 all_domains.extend(self.alias_domains)
1230 domains_string = ",".join(all_domains)
1232 environment = {
1233 "frappe": {
1234 "USERID": self.userid,
1235 "USERGROUP": self.usergroup,
1236 "SERVICE_NAME": "frappe",
1237 **(
1238 {"NEWRELIC_ENABLED": "true", "NEWRELIC_LICENSE_KEY": self.newrelic_license_key}
1239 if self.newrelic_enabled and self.newrelic_license_key
1240 else {}
1241 ),
1242 },
1243 "nginx": {
1244 "SITE_MAPPINGS": json.dumps(self.get_site_mappings()),
1245 "VIRTUAL_HOST": domains_string,
1246 "VIRTUAL_PORT": 80,
1247 "HTTPS_METHOD": "noredirect",
1248 "HSTS": self.get_primary_certificate().hsts,
1249 "CLIENT_MAX_BODY_SIZE": self.upload_limit.lower(),
1250 },
1251 "worker": {
1252 "USERID": self.userid,
1253 "USERGROUP": self.usergroup,
1254 },
1255 "schedule": {
1256 "USERID": self.userid,
1257 "USERGROUP": self.usergroup,
1258 "SERVICE_NAME": "schedule",
1259 },
1260 "socketio": {
1261 "USERID": self.userid,
1262 "USERGROUP": self.usergroup,
1263 "SERVICE_NAME": "socketio",
1264 },
1265 }
1267 users: dict = {"nginx": {"uid": self.userid, "gid": self.usergroup}}
1268 template_inputs: dict = {
1269 "environment": environment,
1270 "user": users,
1271 "restart_policy": self.restart_policy.value,
1272 }
1273 return template_inputs