Coverage for frappe_manager / utils / site.py: 22%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1import json 

2import re 

3from pathlib import Path 

4 

5from rich.table import Table 

6 

7from frappe_manager import CLI_BENCHES_DIRECTORY 

8from frappe_manager.docker import DockerVolumeMount, DockerVolumeType 

9from frappe_manager.output_manager import get_global_output_handler 

10from frappe_manager.site_manager.exceptions import BenchException 

11 

12 

13def generate_services_table(services_status: dict): 

14 # running site services status 

15 services_table = Table( 

16 show_lines=False, 

17 show_edge=False, 

18 pad_edge=False, 

19 show_header=False, 

20 expand=True, 

21 box=None, 

22 ) 

23 

24 services_table.add_column("Service Status", ratio=1, no_wrap=True, width=None, min_width=20) 

25 services_table.add_column("Service Status", ratio=1, no_wrap=True, width=None, min_width=20) 

26 

27 for index in range(0, len(services_status), 2): 

28 first_service_table = None 

29 second_service_table = None 

30 

31 try: 

32 first_service = list(services_status.keys())[index] 

33 first_service_table = create_service_element(first_service, services_status[first_service]) 

34 except IndexError: 

35 pass 

36 

37 try: 

38 second_service = list(services_status.keys())[index + 1] 

39 second_service_table = create_service_element(second_service, services_status[second_service]) 

40 except IndexError: 

41 pass 

42 

43 services_table.add_row(first_service_table, second_service_table) 

44 

45 return services_table 

46 

47 

48def create_service_element(service, running_status): 

49 service_table = Table( 

50 show_lines=False, 

51 show_header=False, 

52 highlight=True, 

53 expand=True, 

54 box=None, 

55 ) 

56 service_table.add_column("Service", justify="left", no_wrap=True) 

57 service_table.add_column("Status", justify="right", no_wrap=True) 

58 service_status = "\u2713" if running_status == "running" else "\u2718" 

59 service_table.add_row( 

60 f"{service}", 

61 f"{service_status}", 

62 ) 

63 return service_table 

64 

65 

66def parse_docker_volume(volume_string: str, root_volumes: dict, compose_path: Path): 

67 string_parts = volume_string.split(":") 

68 

69 if len(string_parts) > 1: 

70 src = string_parts[0] 

71 dest = string_parts[0] 

72 

73 is_bind_mount = True 

74 

75 if string_parts[0] in root_volumes: 

76 is_bind_mount = False 

77 

78 if len(string_parts) > 1: 

79 dest = string_parts[1] 

80 

81 volume_type = DockerVolumeType.bind 

82 

83 if not is_bind_mount: 

84 volume_type = DockerVolumeType.volume 

85 

86 docker_volume = DockerVolumeMount(src, dest, volume_type, compose_path) 

87 

88 return docker_volume 

89 

90 

91def is_fqdn(hostname: str) -> bool: 

92 """ 

93 https://en.m.wikipedia.org/wiki/Fully_qualified_domain_name 

94 """ 

95 if not 1 < len(hostname) < 253: 

96 return False 

97 

98 # Remove trailing dot 

99 if hostname[-1] == ".": 

100 hostname = hostname[0:-1] 

101 

102 # Split hostname into list of DNS labels 

103 labels = hostname.split(".") 

104 

105 # Define pattern of DNS label 

106 # Can begin and end with a number or letter only 

107 # Can contain hyphens, a-z, A-Z, 0-9 

108 # 1 - 63 chars allowed 

109 fqdn = re.compile(r"^[a-z0-9]([a-z-0-9-]{0,61}[a-z0-9])?$", re.IGNORECASE) 

110 

111 # Check that all labels match that pattern. 

112 return all(fqdn.match(label) for label in labels) 

113 

114 

115def is_wildcard_fqdn(hostname: str) -> bool: 

116 """ 

117 Check if the hostname is a fully qualified domain name (FQDN) with optional wildcard. 

118 

119 A wildcard domain can be specified with a leading asterisk in the first label (e.g., *.example.com). 

120 https://en.m.wikipedia.org/wiki/Fully_qualified_domain_name 

121 """ 

122 if not 1 < len(hostname) < 253: 

123 return False 

124 

125 # Remove trailing dot 

126 if hostname[-1] == ".": 

127 hostname = hostname[:-1] 

128 

129 # Split hostname into list of DNS labels 

130 labels = hostname.split(".") 

131 

132 # Define pattern for a standard DNS label 

133 fqdn_pattern = re.compile(r"^[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?$", re.IGNORECASE) 

134 

135 # Define pattern for a wildcard DNS label (only valid in the first label) 

136 wildcard_pattern = re.compile(r"^\*\.?$", re.IGNORECASE) 

137 

138 status = (wildcard_pattern.match(labels[0])) and all(fqdn_pattern.match(label) for label in labels[1:]) 

139 

140 if status == None: 

141 status = False 

142 

143 # Check the first label for wildcard pattern, then check all labels for standard pattern 

144 return status 

145 

146 

147def domain_level(domain): 

148 # Split the domain name into individual parts 

149 parts = domain.split(".") 

150 

151 # Return the number of parts minus 1 (excluding the TLD) 

152 return len(parts) - 1 

153 

154 

155def validate_sitename(sitename: str | None) -> str: 

156 if sitename is None: 

157 raise ValueError("Sitename cannot be None") 

158 

159 match = is_fqdn(sitename) 

160 

161 if domain_level(sitename) == 0: 

162 sitename = sitename + ".localhost" 

163 

164 if not match: 

165 output = get_global_output_handler() 

166 output.error( 

167 f"The {sitename} must follow Fully Qualified Domain Name (FQDN) format.", 

168 exception=BenchException(sitename, "Valid FQDN site name not provided."), 

169 ) 

170 

171 return sitename 

172 

173 

174def get_bench_db_connection_info(bench_name: str, bench_path: Path): 

175 db_info = {} 

176 site_config_file = bench_path / "workspace" / "frappe-bench" / "sites" / bench_name / "site_config.json" 

177 common_site_config_file = bench_path / "workspace" / "frappe-bench" / "sites" / "common_site_config.json" 

178 

179 db_info["password"] = None 

180 

181 if common_site_config_file.exists(): 

182 with open(common_site_config_file) as f: 

183 common_site_config = json.load(f) 

184 if common_site_config: 

185 db_info["host"] = common_site_config.get("db_host") 

186 db_info["port"] = common_site_config.get("db_port") 

187 

188 if site_config_file.exists(): 

189 with open(site_config_file) as f: 

190 site_config = json.load(f) 

191 if site_config: 

192 db_info["name"] = site_config["db_name"] 

193 db_info["user"] = site_config["db_name"] 

194 db_info["password"] = site_config["db_password"] 

195 if "db_host" in site_config: 

196 db_info["host"] = site_config["db_host"] 

197 if "db_port" in site_config: 

198 db_info["port"] = site_config["db_port"] 

199 

200 return db_info 

201 

202 

203def get_all_docker_images(): 

204 from frappe_manager.docker import ComposeFile 

205 

206 temp_bench_compose_file_manager = ComposeFile(loadfile=Path("/dev/null/docker-compose.yml")) 

207 services_manager_compose_file_manager = ComposeFile( 

208 loadfile=Path("/dev/null/docker-compose.yml"), 

209 template_name="docker-compose.services.tmpl", 

210 ) 

211 admin_tools_manager_compose_file_manager = ComposeFile( 

212 loadfile=Path("/dev/null/docker-compose.yml"), 

213 template_name="docker-compose.admin-tools.tmpl", 

214 ) 

215 

216 images = temp_bench_compose_file_manager.get_all_images() 

217 

218 images.update(services_manager_compose_file_manager.get_all_images()) 

219 images.update(admin_tools_manager_compose_file_manager.get_all_images()) 

220 

221 return images 

222 

223 

224def pull_docker_images() -> bool: 

225 from frappe_manager.docker import DockerClient, DockerException 

226 

227 docker = DockerClient() 

228 images = get_all_docker_images() 

229 images_list = [] 

230 

231 for _service, image_info in images.items(): 

232 image = f"{image_info['name']}:{image_info['tag']}" 

233 images_list.append(image) 

234 

235 # remove duplicates 

236 images_list = list(dict.fromkeys(images_list)) 

237 

238 no_error = True 

239 for image in images_list: 

240 output = get_global_output_handler() 

241 status = f"[blue]Pulling image[/blue] [bold][yellow]{image}[/yellow][/bold]" 

242 output.change_head(status, style=None) 

243 try: 

244 pull_output = docker.pull(container_name=image, stream=True) 

245 output.live_lines(pull_output, padding=(0, 0, 0, 2)) 

246 except DockerException as e: 

247 no_error = False 

248 output.error(f"[bold][red]Error [/bold][/red]: Failed to pull {image}", e) 

249 output.print(f"[green]Pulled[/green] [blue]{image}[/blue]") 

250 

251 return no_error 

252 

253 

254def get_sitename_from_current_path() -> str | None: 

255 current_path = Path().absolute() 

256 sites_path = CLI_BENCHES_DIRECTORY.absolute() 

257 

258 if not current_path.is_relative_to(sites_path): 

259 return None 

260 

261 sitename_list = list(current_path.relative_to(sites_path).parts) 

262 

263 if not sitename_list: 

264 return None 

265 

266 sitename = sitename_list[0] 

267 if is_fqdn(sitename): 

268 return sitename 

269 

270 

271def is_default_worker(worker_name: str) -> bool: 

272 default_workers = ["long-worker", "short-worker"] 

273 

274 for dw in default_workers: 

275 if dw == worker_name: 

276 return True 

277 

278 return False