Coverage for frappe_manager / utils / site.py: 22%
153 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1import json
2import re
3from pathlib import Path
5from rich.table import Table
7from frappe_manager import CLI_BENCHES_DIRECTORY
8from frappe_manager.docker import DockerVolumeMount, DockerVolumeType
9from frappe_manager.output_manager import get_global_output_handler
10from frappe_manager.site_manager.exceptions import BenchException
13def generate_services_table(services_status: dict):
14 # running site services status
15 services_table = Table(
16 show_lines=False,
17 show_edge=False,
18 pad_edge=False,
19 show_header=False,
20 expand=True,
21 box=None,
22 )
24 services_table.add_column("Service Status", ratio=1, no_wrap=True, width=None, min_width=20)
25 services_table.add_column("Service Status", ratio=1, no_wrap=True, width=None, min_width=20)
27 for index in range(0, len(services_status), 2):
28 first_service_table = None
29 second_service_table = None
31 try:
32 first_service = list(services_status.keys())[index]
33 first_service_table = create_service_element(first_service, services_status[first_service])
34 except IndexError:
35 pass
37 try:
38 second_service = list(services_status.keys())[index + 1]
39 second_service_table = create_service_element(second_service, services_status[second_service])
40 except IndexError:
41 pass
43 services_table.add_row(first_service_table, second_service_table)
45 return services_table
48def create_service_element(service, running_status):
49 service_table = Table(
50 show_lines=False,
51 show_header=False,
52 highlight=True,
53 expand=True,
54 box=None,
55 )
56 service_table.add_column("Service", justify="left", no_wrap=True)
57 service_table.add_column("Status", justify="right", no_wrap=True)
58 service_status = "\u2713" if running_status == "running" else "\u2718"
59 service_table.add_row(
60 f"{service}",
61 f"{service_status}",
62 )
63 return service_table
66def parse_docker_volume(volume_string: str, root_volumes: dict, compose_path: Path):
67 string_parts = volume_string.split(":")
69 if len(string_parts) > 1:
70 src = string_parts[0]
71 dest = string_parts[0]
73 is_bind_mount = True
75 if string_parts[0] in root_volumes:
76 is_bind_mount = False
78 if len(string_parts) > 1:
79 dest = string_parts[1]
81 volume_type = DockerVolumeType.bind
83 if not is_bind_mount:
84 volume_type = DockerVolumeType.volume
86 docker_volume = DockerVolumeMount(src, dest, volume_type, compose_path)
88 return docker_volume
91def is_fqdn(hostname: str) -> bool:
92 """
93 https://en.m.wikipedia.org/wiki/Fully_qualified_domain_name
94 """
95 if not 1 < len(hostname) < 253:
96 return False
98 # Remove trailing dot
99 if hostname[-1] == ".":
100 hostname = hostname[0:-1]
102 # Split hostname into list of DNS labels
103 labels = hostname.split(".")
105 # Define pattern of DNS label
106 # Can begin and end with a number or letter only
107 # Can contain hyphens, a-z, A-Z, 0-9
108 # 1 - 63 chars allowed
109 fqdn = re.compile(r"^[a-z0-9]([a-z-0-9-]{0,61}[a-z0-9])?$", re.IGNORECASE)
111 # Check that all labels match that pattern.
112 return all(fqdn.match(label) for label in labels)
115def is_wildcard_fqdn(hostname: str) -> bool:
116 """
117 Check if the hostname is a fully qualified domain name (FQDN) with optional wildcard.
119 A wildcard domain can be specified with a leading asterisk in the first label (e.g., *.example.com).
120 https://en.m.wikipedia.org/wiki/Fully_qualified_domain_name
121 """
122 if not 1 < len(hostname) < 253:
123 return False
125 # Remove trailing dot
126 if hostname[-1] == ".":
127 hostname = hostname[:-1]
129 # Split hostname into list of DNS labels
130 labels = hostname.split(".")
132 # Define pattern for a standard DNS label
133 fqdn_pattern = re.compile(r"^[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?$", re.IGNORECASE)
135 # Define pattern for a wildcard DNS label (only valid in the first label)
136 wildcard_pattern = re.compile(r"^\*\.?$", re.IGNORECASE)
138 status = (wildcard_pattern.match(labels[0])) and all(fqdn_pattern.match(label) for label in labels[1:])
140 if status == None:
141 status = False
143 # Check the first label for wildcard pattern, then check all labels for standard pattern
144 return status
147def domain_level(domain):
148 # Split the domain name into individual parts
149 parts = domain.split(".")
151 # Return the number of parts minus 1 (excluding the TLD)
152 return len(parts) - 1
155def validate_sitename(sitename: str | None) -> str:
156 if sitename is None:
157 raise ValueError("Sitename cannot be None")
159 match = is_fqdn(sitename)
161 if domain_level(sitename) == 0:
162 sitename = sitename + ".localhost"
164 if not match:
165 output = get_global_output_handler()
166 output.error(
167 f"The {sitename} must follow Fully Qualified Domain Name (FQDN) format.",
168 exception=BenchException(sitename, "Valid FQDN site name not provided."),
169 )
171 return sitename
174def get_bench_db_connection_info(bench_name: str, bench_path: Path):
175 db_info = {}
176 site_config_file = bench_path / "workspace" / "frappe-bench" / "sites" / bench_name / "site_config.json"
177 common_site_config_file = bench_path / "workspace" / "frappe-bench" / "sites" / "common_site_config.json"
179 db_info["password"] = None
181 if common_site_config_file.exists():
182 with open(common_site_config_file) as f:
183 common_site_config = json.load(f)
184 if common_site_config:
185 db_info["host"] = common_site_config.get("db_host")
186 db_info["port"] = common_site_config.get("db_port")
188 if site_config_file.exists():
189 with open(site_config_file) as f:
190 site_config = json.load(f)
191 if site_config:
192 db_info["name"] = site_config["db_name"]
193 db_info["user"] = site_config["db_name"]
194 db_info["password"] = site_config["db_password"]
195 if "db_host" in site_config:
196 db_info["host"] = site_config["db_host"]
197 if "db_port" in site_config:
198 db_info["port"] = site_config["db_port"]
200 return db_info
203def get_all_docker_images():
204 from frappe_manager.docker import ComposeFile
206 temp_bench_compose_file_manager = ComposeFile(loadfile=Path("/dev/null/docker-compose.yml"))
207 services_manager_compose_file_manager = ComposeFile(
208 loadfile=Path("/dev/null/docker-compose.yml"),
209 template_name="docker-compose.services.tmpl",
210 )
211 admin_tools_manager_compose_file_manager = ComposeFile(
212 loadfile=Path("/dev/null/docker-compose.yml"),
213 template_name="docker-compose.admin-tools.tmpl",
214 )
216 images = temp_bench_compose_file_manager.get_all_images()
218 images.update(services_manager_compose_file_manager.get_all_images())
219 images.update(admin_tools_manager_compose_file_manager.get_all_images())
221 return images
224def pull_docker_images() -> bool:
225 from frappe_manager.docker import DockerClient, DockerException
227 docker = DockerClient()
228 images = get_all_docker_images()
229 images_list = []
231 for _service, image_info in images.items():
232 image = f"{image_info['name']}:{image_info['tag']}"
233 images_list.append(image)
235 # remove duplicates
236 images_list = list(dict.fromkeys(images_list))
238 no_error = True
239 for image in images_list:
240 output = get_global_output_handler()
241 status = f"[blue]Pulling image[/blue] [bold][yellow]{image}[/yellow][/bold]"
242 output.change_head(status, style=None)
243 try:
244 pull_output = docker.pull(container_name=image, stream=True)
245 output.live_lines(pull_output, padding=(0, 0, 0, 2))
246 except DockerException as e:
247 no_error = False
248 output.error(f"[bold][red]Error [/bold][/red]: Failed to pull {image}", e)
249 output.print(f"[green]Pulled[/green] [blue]{image}[/blue]")
251 return no_error
254def get_sitename_from_current_path() -> str | None:
255 current_path = Path().absolute()
256 sites_path = CLI_BENCHES_DIRECTORY.absolute()
258 if not current_path.is_relative_to(sites_path):
259 return None
261 sitename_list = list(current_path.relative_to(sites_path).parts)
263 if not sitename_list:
264 return None
266 sitename = sitename_list[0]
267 if is_fqdn(sitename):
268 return sitename
271def is_default_worker(worker_name: str) -> bool:
272 default_workers = ["long-worker", "short-worker"]
274 for dw in default_workers:
275 if dw == worker_name:
276 return True
278 return False