Coverage for frappe_manager / utils / callbacks.py: 14%

167 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1import json 

2from datetime import datetime 

3from pathlib import Path 

4 

5import typer 

6 

7from frappe_manager import ( 

8 CLI_BENCHES_DIRECTORY, 

9 CLI_CACHE_PATH, 

10 CLI_RECENT_USED_SITES_CACHE_PATH, 

11 DEFAULT_EXTENSIONS, 

12) 

13from frappe_manager.output_manager import get_global_output_handler 

14from frappe_manager.site_manager.exceptions import BenchNotFoundError 

15from frappe_manager.utils.helpers import check_frappe_app_exists, get_current_fm_version 

16from frappe_manager.utils.site import get_sitename_from_current_path, is_fqdn, is_wildcard_fqdn, validate_sitename 

17 

18 

19def apps_list_validation_callback(value: list[str] | None): 

20 """ 

21 Parse and validate the list of apps provided, returning AppConfig objects. 

22 

23 Supports formats: 

24 - "erpnext" → frappe/erpnext (default org) 

25 - "erpnext:version-15" → frappe/erpnext:version-15 

26 - "frappe/erpnext:version-15" → frappe/erpnext:version-15 

27 - "rtcamp/custom-app:main" → rtcamp/custom-app:main 

28 - "frappe/frappe:version-15#apps/frappe" → subdirectory app (monorepo) 

29 

30 Validation is lightweight here - actual repo existence is validated 

31 during cloning by AppCloner with proper error messages. 

32 

33 Args: 

34 value (List[str] | None): The list of apps to validate. 

35 

36 Raises: 

37 typer.BadParameter: If format is invalid or 'frappe' app is included. 

38 

39 Returns: 

40 List[AppConfig] | None: The parsed list of apps as AppConfig objects. 

41 """ 

42 from frappe_manager.site_manager.bench_config import AppConfig 

43 

44 apps_list = [] 

45 

46 if value: 

47 for app in value: 

48 # Allow frappe app now - it can be specified via --apps 

49 # No need to check and reject frappe anymore 

50 

51 # Handle HTTP/HTTPS URLs 

52 if "https://" in app or "http://" in app: 

53 appx = app.split(":") 

54 temp_appx = appx 

55 appx = [":".join(appx[:2])] 

56 

57 if len(temp_appx) == 3 or len(temp_appx) > 3: 

58 appx.append(temp_appx[2]) 

59 # Split on ':' for branch/ref (handle subdirectory '#' first) 

60 # e.g., "frappe/payments:version-15#apps/payments" 

61 elif "#" in app: 

62 # Has subdirectory - split carefully 

63 app_part = app.split("#")[0] 

64 appx = app_part.split(":") 

65 # Reconstruct with subdirectory 

66 if len(appx) == 2: 

67 appx = [appx[0], app.split(":", 1)[1]] 

68 else: 

69 appx = [app] 

70 else: 

71 appx = app.split(":") 

72 

73 # Basic format validation 

74 if len(appx) > 2: 

75 output = get_global_output_handler() 

76 output.stop() 

77 msg = ( 

78 "Specify the app in the format:\n" 

79 " <appname>:<branch>\n" 

80 " <org>/<appname>:<branch>\n" 

81 " <org>/<appname>:<branch>#<subdir>\n" 

82 "\nExamples:\n" 

83 " erpnext:version-15\n" 

84 " frappe/helpdesk:v1.9.1\n" 

85 " rtcamp/custom-app:main\n" 

86 " frappe/frappe:version-15#apps/frappe" 

87 ) 

88 raise typer.BadParameter(msg) 

89 

90 app_config = AppConfig.from_string(app) 

91 apps_list.append(app_config) 

92 

93 return apps_list 

94 

95 

96def frappe_branch_validation_callback(value: str): 

97 """ 

98 Validate the given Frappe branch. 

99 

100 Args: 

101 value (str): The Frappe branch to validate. 

102 

103 Returns: 

104 str: The validated Frappe branch. 

105 

106 Raises: 

107 typer.BadParameter: If the Frappe branch is not valid. 

108 """ 

109 if value: 

110 exists = check_frappe_app_exists("frappe", value) 

111 if exists["branch"]: 

112 return value 

113 raise typer.BadParameter(f"Frappe branch -> {value} is not valid!! ") 

114 

115 

116def version_callback(version: bool | None = None): 

117 """ 

118 Callback function to handle version option. 

119 

120 Args: 

121 version (bool, optional): If True, prints the current FM version and exits. Defaults to None. 

122 """ 

123 if version: 

124 fm_version = get_current_fm_version() 

125 output = get_global_output_handler() 

126 output.print(fm_version, emoji_code="") 

127 raise typer.Exit() 

128 

129 

130def sites_autocompletion_callback() -> list[Path]: 

131 sites_list = [] 

132 for dir in CLI_BENCHES_DIRECTORY.iterdir(): 

133 if dir.is_dir(): 

134 dir = dir / "docker-compose.yml" 

135 if dir.exists() and dir.is_file(): 

136 sites_list.append(dir) 

137 return sites_list 

138 

139 

140def val(answers, current): 

141 print(answers, current) 

142 

143 

144def sitename_callback(sitename: str | None): 

145 if not sitename: 

146 sitename = get_sitename_from_current_path() 

147 

148 if not sitename: 

149 sites_list = [site_name.parent.name for site_name in sites_autocompletion_callback()] 

150 

151 if sites_list: 

152 output = get_global_output_handler() 

153 sorted_sites = get_sorted_sites_list(sites_list) 

154 

155 try: 

156 sitename = output.prompt_fuzzy( 

157 prompt="Select bench (↑↓ navigate, type to search)", 

158 choices=sorted_sites, 

159 vi_mode=True, 

160 mandatory=True, 

161 qmark="🤔", 

162 amark="🤔", 

163 ) 

164 

165 if sitename: 

166 update_sites_cache(sitename) 

167 except Exception: 

168 output.error( 

169 "Bench name is required in non-interactive mode", 

170 exception=Exception( 

171 "Specify bench name as positional argument. Use 'fm list' to see available benches.", 

172 ), 

173 ) 

174 raise typer.Exit(1) 

175 

176 if sitename is None: 

177 raise typer.BadParameter("Invalid selection. Must match existing sites") 

178 

179 sitename = validate_sitename(sitename) 

180 

181 bench_path = CLI_BENCHES_DIRECTORY / sitename 

182 

183 if not bench_path.exists(): 

184 raise BenchNotFoundError(sitename, bench_path) 

185 

186 return sitename 

187 

188 

189def get_cache_file() -> Path: 

190 """Returns the path to the cache file for recently used sites""" 

191 CLI_CACHE_PATH.mkdir(parents=True, exist_ok=True) 

192 return CLI_RECENT_USED_SITES_CACHE_PATH 

193 

194 

195def update_sites_cache(sitename: str) -> None: 

196 """Updates the cache with the most recently used site""" 

197 cache_file = get_cache_file() 

198 try: 

199 if cache_file.exists(): 

200 with open(cache_file) as f: 

201 cache = json.load(f) 

202 else: 

203 cache = {"sites": []} 

204 

205 # Remove if exists and add to front 

206 cache["sites"] = [s for s in cache["sites"] if s["name"] != sitename] 

207 cache["sites"].insert(0, {"name": sitename, "last_used": datetime.now().isoformat()}) 

208 

209 # Keep only last 10 entries 

210 cache["sites"] = cache["sites"][:10] 

211 

212 with open(cache_file, "w") as f: 

213 json.dump(cache, f) 

214 except Exception: 

215 # Fail silently if cache operations fail 

216 pass 

217 

218 

219def get_sorted_sites_list(sites_list: list[str]) -> list[str]: 

220 """Returns sites list with recently used sites first, but only for sites that actually exist""" 

221 cache_file = get_cache_file() 

222 try: 

223 if cache_file.exists(): 

224 with open(cache_file) as f: 

225 cache = json.load(f) 

226 

227 # Get cached site names, but only if they exist in the actual sites_list 

228 cached_sites = [s["name"] for s in cache["sites"] if s["name"] in sites_list] 

229 

230 # Get remaining sites that aren't in cache 

231 remaining_sites = [s for s in sites_list if s not in cached_sites] 

232 

233 # Return cached sites first, then remaining sites 

234 return cached_sites + remaining_sites 

235 except Exception: 

236 pass 

237 

238 return sites_list 

239 

240 

241def prompt_for_bench_selection(current_value: str | None) -> str | None: 

242 if current_value: 

243 return current_value 

244 

245 from frappe_manager.output_manager import get_global_output_handler 

246 

247 benchname = get_sitename_from_current_path() 

248 if benchname: 

249 return benchname 

250 

251 sites_list = [site_name.parent.name for site_name in sites_autocompletion_callback()] 

252 

253 if not sites_list: 

254 return None 

255 

256 output = get_global_output_handler() 

257 sorted_sites = get_sorted_sites_list(sites_list) 

258 

259 try: 

260 selected = output.prompt_fuzzy( 

261 prompt="Select bench (↑↓ navigate, type to search)", 

262 choices=sorted_sites, 

263 vi_mode=True, 

264 mandatory=True, 

265 qmark="🤔", 

266 amark="🤔", 

267 ) 

268 

269 if selected: 

270 update_sites_cache(selected) 

271 return selected 

272 except Exception: 

273 # Silently fail - caller will handle None return 

274 pass 

275 

276 return None 

277 

278 

279def code_command_extensions_callback(extensions: list[str]) -> list[str]: 

280 extx = extensions + DEFAULT_EXTENSIONS 

281 unique_ext: set = set(extx) 

282 unique_ext_list: list[str] = [x for x in unique_ext] 

283 return unique_ext_list 

284 

285 

286def create_command_sitename_callback(sitename: str): 

287 # validate the site 

288 sitename = validate_sitename(sitename) 

289 

290 # check if already exists 

291 bench_path = CLI_BENCHES_DIRECTORY / sitename 

292 

293 if bench_path.exists(): 

294 raise typer.BadParameter(f"The bench '{sitename}' already exists at {bench_path}. Aborting operation.") 

295 

296 return sitename 

297 

298 

299def alias_domains_validation_callback(value: str | None) -> list[str]: 

300 """ 

301 Validate the comma-separated list of alias domains. 

302 

303 Args: 

304 value (Optional[str]): Comma-separated list of alias domains 

305 

306 Returns: 

307 List[str]: List of validated alias domains 

308 

309 Raises: 

310 typer.BadParameter: If any domain is invalid 

311 """ 

312 if not value: 

313 return [] 

314 

315 # Split by comma and strip whitespace 

316 domains = [domain.strip() for domain in value.split(",") if domain.strip()] 

317 

318 if not domains: 

319 return [] 

320 

321 validated_domains = [] 

322 

323 for domain in domains: 

324 # Check if it's a wildcard domain 

325 if domain.startswith("*."): 

326 if not is_wildcard_fqdn(domain): 

327 output = get_global_output_handler() 

328 output.stop() 

329 raise typer.BadParameter( 

330 f"Invalid wildcard domain '{domain}'. Wildcard domains must be in format '*.example.com'.", 

331 ) 

332 validated_domains.append(domain) 

333 else: 

334 # Regular domain validation 

335 if not is_fqdn(domain): 

336 output = get_global_output_handler() 

337 output.stop() 

338 raise typer.BadParameter( 

339 f"Invalid domain '{domain}'. Domain must be a valid FQDN (e.g., 'www.example.com').", 

340 ) 

341 # Additional check: domain must have at least one dot (TLD) 

342 if "." not in domain: 

343 output = get_global_output_handler() 

344 output.stop() 

345 raise typer.BadParameter(f"Invalid domain '{domain}'. Domain must include a TLD (e.g., 'example.com').") 

346 validated_domains.append(domain) 

347 

348 # Check for duplicates 

349 if len(validated_domains) != len(set(validated_domains)): 

350 output = get_global_output_handler() 

351 output.stop() 

352 raise typer.BadParameter("Duplicate domains found in alias domains list.") 

353 

354 return validated_domains