Coverage for frappe_manager / utils / helpers.py: 42%

182 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1import grp 

2import importlib 

3import importlib.resources as pkg_resources 

4import json 

5import secrets 

6import sys 

7import time 

8from datetime import datetime 

9from io import StringIO 

10from pathlib import Path 

11 

12import requests 

13from cryptography import x509 

14from cryptography.hazmat.backends import default_backend 

15from rich.console import Console 

16from rich.traceback import Traceback 

17 

18from frappe_manager import CLI_DEFAULT_DELIMETER, CLI_SITE_NAME_DELIMETER 

19from frappe_manager.logger import log 

20from frappe_manager.output_manager import get_global_output_handler 

21from frappe_manager.site_manager import PREBAKED_SITE_APPS 

22from frappe_manager.utils.docker import run_command_with_exit_code 

23 

24 

25def remove_zombie_subprocess_process(process): 

26 """ 

27 This function iterates over a list of process IDs and terminates each process. 

28 

29 Args: 

30 process (list): A list of process IDs to be terminated. 

31 

32 Returns: 

33 None 

34 """ 

35 if process: 

36 logger = log.get_logger() 

37 logger.cleanup("-" * 20) 

38 logger.cleanup(f"PROCESS: USED PROCESS {process}") 

39 

40 import psutil 

41 

42 for pid in process: 

43 try: 

44 process = psutil.Process(pid) 

45 process.terminate() 

46 logger.cleanup(f"Terminated Process {process.cmdline}:{pid}") 

47 except psutil.NoSuchProcess: 

48 logger.cleanup(f"{pid} Process not found") 

49 except psutil.AccessDenied: 

50 logger.cleanup(f"{pid} Permission denied") 

51 logger.cleanup("-" * 20) 

52 

53 

54def generate_random_text(length=50): 

55 """ 

56 Generate a random text of specified length. 

57 

58 Parameters: 

59 length (int): The length of the random text to be generated. Default is 50. 

60 

61 Returns: 

62 str: The randomly generated text. 

63 """ 

64 import random 

65 import string 

66 

67 alphanumeric_chars = string.ascii_letters + string.digits 

68 return "".join(random.choice(alphanumeric_chars) for _ in range(length)) 

69 

70 

71def is_cli_help_called(ctx): 

72 """ 

73 Checks if the help is called for the CLI command. 

74 

75 Args: 

76 ctx (object): The context object representing the CLI command. 

77 

78 Returns: 

79 bool: True if the help command is called, False otherwise. 

80 """ 

81 help_called = False 

82 

83 if "--help" in " ".join(sys.argv[1:]): 

84 return True 

85 

86 try: 

87 subcommand = ctx.command.commands.get(ctx.invoked_subcommand) 

88 if not subcommand: 

89 return False 

90 

91 if hasattr(subcommand, "commands"): 

92 check_command = " ".join(sys.argv[2:]) 

93 if check_command in subcommand.commands: 

94 sub_sub_command = subcommand.commands[check_command] 

95 if sub_sub_command.params and sub_sub_command.no_args_is_help: 

96 help_called = True 

97 elif subcommand.params and ctx.invoked_subcommand == " ".join(sys.argv[1:]): 

98 if subcommand.no_args_is_help: 

99 help_called = True 

100 

101 except (AttributeError, KeyError): 

102 help_called = False 

103 

104 return help_called 

105 

106 

107def get_current_fm_version(): 

108 """ 

109 Get the current version of the frappe-manager package. 

110 

111 Returns: 

112 str: The current version of the frappe-manager package. 

113 """ 

114 from frappe_manager.__about__ import __version__ 

115 

116 return __version__ 

117 

118 

119def get_docker_image_tag(): 

120 """ 

121 Get the Docker image tag to use based on FM version. 

122 

123 Returns version with 'v' prefix for Docker image tags. 

124 Examples: 

125 - '0.19.0' -> 'v0.19.0' 

126 - '0.19.1.dev0' -> 'v0.19.1.dev0' 

127 - '0.20.0.dev1' -> 'v0.20.0.dev1' 

128 

129 Environment variable FM_DOCKER_IMAGE_TAG can override for testing. 

130 

131 Returns: 

132 str: The Docker image tag (e.g., 'v0.19.0' or 'v0.19.1.dev0'). 

133 """ 

134 import os 

135 

136 # Allow environment variable override for testing 

137 if override_tag := os.getenv('FM_DOCKER_IMAGE_TAG'): 

138 return override_tag 

139 

140 version = get_current_fm_version() 

141 

142 # Always prepend 'v' if not already present 

143 if not version.startswith('v'): 

144 return f'v{version}' 

145 

146 return version 

147 

148 

149def check_repo_exists(app_url: str, branch_name: str | None = None, exclude_dict: dict[str, str] = PREBAKED_SITE_APPS): 

150 """ 

151 Check if a Frappe app exists on GitHub. 

152 

153 Args: 

154 appname (str): The name of the Frappe app. 

155 branchname (str | None, optional): The name of the branch to check. Defaults to None. 

156 

157 Returns: 

158 dict: A dictionary containing the existence status of the app and branch (if provided). 

159 """ 

160 try: 

161 if app_url in exclude_dict: 

162 app = 200 

163 else: 

164 app = requests.get(app_url).status_code 

165 

166 if branch_name: 

167 if branch_name in exclude_dict.values(): 

168 branch = 200 

169 else: 

170 branch_url = f"{app_url}/tree/{branch_name}" 

171 branch = requests.get(branch_url).status_code 

172 

173 return { 

174 "app": True if app == 200 else False, 

175 "branch": True if branch == 200 else False, 

176 } 

177 return {"app": True if app == 200 else False} 

178 

179 except Exception as e: 

180 output = get_global_output_handler() 

181 output.error(f"Not able to validate app {app_url} for branch [blue]{branch_name}[/blue]", e) 

182 

183 

184def check_frappe_app_exists(app: str, branch_name: str | None = None): 

185 if "github.com" not in app: 

186 app = f"https://github.com/frappe/{app}" 

187 

188 return check_repo_exists(app_url=app, branch_name=branch_name) 

189 

190 

191def represent_null_empty(string_null): 

192 """ 

193 Replaces the string "null" with an empty string. 

194 

195 Args: 

196 string_null (str): The input string. 

197 

198 Returns: 

199 str: The modified string with "null" replaced by an empty string. 

200 """ 

201 return string_null.replace("null", "") 

202 

203 

204def log_file(file, refresh_time: float = 0.1, follow: bool = False): 

205 """ 

206 Generator function that yields new lines in a file 

207 

208 Parameters: 

209 - file: The file object to read from 

210 - refresh_time: The time interval (in seconds) to wait before checking for new lines in the file (default: 0.1) 

211 - follow: If True, the function will continue to yield new lines as they are added to the file (default: False) 

212 

213 Returns: 

214 - A generator that yields each new line in the file 

215 """ 

216 file.seek(0) 

217 

218 # start infinite loop 

219 while True: 

220 # read last line of file 

221 line = file.readline() 

222 if not line: 

223 if not follow: 

224 break 

225 # sleep if file hasn't been updated 

226 time.sleep(refresh_time) 

227 continue 

228 line = line.strip("\n") 

229 yield line 

230 

231 

232def get_container_name_prefix(site_name): 

233 """ 

234 Returns the container name prefix by removing dots from the site name. 

235 

236 Args: 

237 site_name (str): The name of the site. 

238 

239 Returns: 

240 str: The container name prefix. 

241 """ 

242 return "fm" + CLI_DEFAULT_DELIMETER + site_name.replace(".", CLI_SITE_NAME_DELIMETER) 

243 

244 

245def get_redis_cache_addr(container_prefix: str) -> tuple[str, int]: 

246 return f"{container_prefix}{CLI_DEFAULT_DELIMETER}redis-cache", 6379 

247 

248 

249def get_redis_queue_addr(container_prefix: str) -> tuple[str, int]: 

250 return f"{container_prefix}{CLI_DEFAULT_DELIMETER}redis-queue", 6379 

251 

252 

253def get_bench_connection_config(container_prefix: str, db_host: str, db_port: int) -> dict: 

254 cache_host, cache_port = get_redis_cache_addr(container_prefix) 

255 queue_host, queue_port = get_redis_queue_addr(container_prefix) 

256 return { 

257 "bench_id": "workspace-frappe-bench", 

258 "db_host": db_host, 

259 "db_port": db_port, 

260 "redis_cache": f"redis://{cache_host}:{cache_port}", 

261 "redis_queue": f"redis://{queue_host}:{queue_port}", 

262 "redis_socketio": f"redis://{cache_host}:{cache_port}", 

263 } 

264 

265 

266def random_password_generate(password_length=13, symbols=False): 

267 # Define the character set to include symbols 

268 # symbols = "!@#$%^&*()_-+=[]{}|;:,.<>?`~" 

269 symbols = "!@%_-+?" 

270 

271 # Generate a password without symbols using token_urlsafe 

272 

273 generated_password = secrets.token_urlsafe(password_length) 

274 

275 # Replace some characters with symbols in the generated password 

276 if symbols: 

277 password = "".join(c if secrets.choice([True, False]) else secrets.choice(symbols) for c in generated_password) 

278 return password 

279 

280 return generated_password 

281 

282 

283# Retrieve Unix groups and their corresponding integer mappings 

284def get_unix_groups(): 

285 groups = {} 

286 for group_entry in grp.getgrall(): 

287 group_name = group_entry.gr_name 

288 groups[group_name] = group_entry.gr_gid 

289 return groups 

290 

291 

292def install_package(package_name, version): 

293 output_lines = run_command_with_exit_code( 

294 [sys.executable, "-m", "pip", "install", f"{package_name}=={version}"], 

295 stream=True, 

296 ) 

297 output = get_global_output_handler() 

298 output.live_lines(output_lines) 

299 

300 

301def create_class_from_dict(class_name, attributes_dict): 

302 """ 

303 Dynamically creates a class with properties based on the provided attributes dictionary. 

304 

305 Parameters: 

306 class_name (str): The name of the class to be created. 

307 attributes_dict (dict): A dictionary where keys are the names of the properties and values are their default values. 

308 

309 Returns: 

310 A new class with the specified properties and their default values. 

311 """ 

312 return type(class_name, (object,), attributes_dict) 

313 

314 

315def create_symlink(source: Path, dest: Path): 

316 """ 

317 Create a symbolic link pointing from dest to source. 

318 

319 Parameters: 

320 - source (str): The source path that the symlink will point to. 

321 - dest (str): The destination path where the symlink will be created. 

322 

323 Note: The function will overwrite the destination if a symlink already exists there. 

324 """ 

325 

326 # Convert the source and destination to Path objects 

327 

328 if dest.exists() or dest.is_symlink(): 

329 dest.unlink() 

330 

331 dest.symlink_to(source) 

332 

333 

334def get_template_path(file_name: str, template_dir: str = "templates") -> Path: 

335 """ 

336 Get the file path of a template. 

337 

338 Args: 

339 file_name (str): The name of the template file. 

340 template_directory (str, optional): The directory where the templates are located. Defaults to "templates". 

341 

342 Returns: 

343 Optional[str]: The file path of the template, or None if the template is not found. 

344 """ 

345 template_path: str = f"{template_dir}/{file_name}" 

346 return get_frappe_manager_own_files(template_path) 

347 

348 

349def get_frappe_manager_own_files(file_path: str): 

350 return Path(str(pkg_resources.files("frappe_manager").joinpath(file_path))) 

351 

352 

353def rich_object_to_string(obj) -> str: 

354 """Convert a rich Traceback object to a string.""" 

355 

356 capture_buffer = StringIO() 

357 

358 fake_console = Console(force_terminal=False, file=capture_buffer) 

359 fake_console.print(obj, crop=False, overflow="ignore") 

360 

361 captured_str = capture_buffer.getvalue() # Retrieve the captured output as a string 

362 capture_buffer.close() 

363 return captured_str 

364 

365 

366def capture_and_format_exception(traceback_max_frames: int = 100) -> str: 

367 """Capture the current exception and return a formatted traceback string.""" 

368 

369 exc_type, exc_value, exc_traceback = sys.exc_info() 

370 

371 traceback = Traceback.from_exception( 

372 exc_type, 

373 exc_value, 

374 exc_traceback, 

375 show_locals=True, 

376 max_frames=traceback_max_frames, 

377 ) 

378 

379 # Convert the Traceback object to a formatted string 

380 formatted_traceback = rich_object_to_string(traceback) 

381 

382 return formatted_traceback 

383 

384 

385def pluralise(singular, count): 

386 return "{} {}{}".format(count, singular, "" if count == 1 else "s") 

387 

388 

389def format_ssl_certificate_time_remaining(expiry_date: datetime): 

390 today_date = datetime.now(expiry_date.tzinfo) 

391 time_remaining = expiry_date - today_date 

392 day_count = time_remaining.days 

393 seconds_per_minute = 60 

394 seconds_per_hour = seconds_per_minute * 60 

395 seconds_unaccounted_for = time_remaining.seconds 

396 

397 hours = int(seconds_unaccounted_for / seconds_per_hour) 

398 seconds_unaccounted_for -= hours * seconds_per_hour 

399 

400 minutes = int(seconds_unaccounted_for / seconds_per_minute) 

401 

402 return "{} {} {}".format(pluralise("day", day_count), pluralise("hour", hours), pluralise("min", minutes)) 

403 

404 

405def get_certificate_expiry_date(fullchain_path: Path) -> datetime: 

406 cert_content = fullchain_path.read_bytes() 

407 cert = x509.load_pem_x509_certificate(cert_content, default_backend()) 

408 if hasattr(cert, "not_valid_after_utc"): 

409 expiry_date: datetime = cert.not_valid_after_utc 

410 else: 

411 expiry_date: datetime = cert.not_valid_after 

412 return expiry_date 

413 

414 

415def save_dict_to_file(config: dict, json_file_path: Path): 

416 """ 

417 Sets the config value in the json_file_path file. 

418 

419 Args: 

420 config (dict): A dictionary containing the key-value pairs. 

421 """ 

422 

423 final_config = {} 

424 with open(json_file_path) as f: 

425 final_config = json.load(f) 

426 for key, value in config.items(): 

427 final_config[key] = value 

428 with open(json_file_path, "w") as f: 

429 json.dump(final_config, f)