Coverage for frappe_manager / utils / helpers.py: 42%
182 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1import grp
2import importlib
3import importlib.resources as pkg_resources
4import json
5import secrets
6import sys
7import time
8from datetime import datetime
9from io import StringIO
10from pathlib import Path
12import requests
13from cryptography import x509
14from cryptography.hazmat.backends import default_backend
15from rich.console import Console
16from rich.traceback import Traceback
18from frappe_manager import CLI_DEFAULT_DELIMETER, CLI_SITE_NAME_DELIMETER
19from frappe_manager.logger import log
20from frappe_manager.output_manager import get_global_output_handler
21from frappe_manager.site_manager import PREBAKED_SITE_APPS
22from frappe_manager.utils.docker import run_command_with_exit_code
25def remove_zombie_subprocess_process(process):
26 """
27 This function iterates over a list of process IDs and terminates each process.
29 Args:
30 process (list): A list of process IDs to be terminated.
32 Returns:
33 None
34 """
35 if process:
36 logger = log.get_logger()
37 logger.cleanup("-" * 20)
38 logger.cleanup(f"PROCESS: USED PROCESS {process}")
40 import psutil
42 for pid in process:
43 try:
44 process = psutil.Process(pid)
45 process.terminate()
46 logger.cleanup(f"Terminated Process {process.cmdline}:{pid}")
47 except psutil.NoSuchProcess:
48 logger.cleanup(f"{pid} Process not found")
49 except psutil.AccessDenied:
50 logger.cleanup(f"{pid} Permission denied")
51 logger.cleanup("-" * 20)
54def generate_random_text(length=50):
55 """
56 Generate a random text of specified length.
58 Parameters:
59 length (int): The length of the random text to be generated. Default is 50.
61 Returns:
62 str: The randomly generated text.
63 """
64 import random
65 import string
67 alphanumeric_chars = string.ascii_letters + string.digits
68 return "".join(random.choice(alphanumeric_chars) for _ in range(length))
71def is_cli_help_called(ctx):
72 """
73 Checks if the help is called for the CLI command.
75 Args:
76 ctx (object): The context object representing the CLI command.
78 Returns:
79 bool: True if the help command is called, False otherwise.
80 """
81 help_called = False
83 if "--help" in " ".join(sys.argv[1:]):
84 return True
86 try:
87 subcommand = ctx.command.commands.get(ctx.invoked_subcommand)
88 if not subcommand:
89 return False
91 if hasattr(subcommand, "commands"):
92 check_command = " ".join(sys.argv[2:])
93 if check_command in subcommand.commands:
94 sub_sub_command = subcommand.commands[check_command]
95 if sub_sub_command.params and sub_sub_command.no_args_is_help:
96 help_called = True
97 elif subcommand.params and ctx.invoked_subcommand == " ".join(sys.argv[1:]):
98 if subcommand.no_args_is_help:
99 help_called = True
101 except (AttributeError, KeyError):
102 help_called = False
104 return help_called
107def get_current_fm_version():
108 """
109 Get the current version of the frappe-manager package.
111 Returns:
112 str: The current version of the frappe-manager package.
113 """
114 from frappe_manager.__about__ import __version__
116 return __version__
119def get_docker_image_tag():
120 """
121 Get the Docker image tag to use based on FM version.
123 Returns version with 'v' prefix for Docker image tags.
124 Examples:
125 - '0.19.0' -> 'v0.19.0'
126 - '0.19.1.dev0' -> 'v0.19.1.dev0'
127 - '0.20.0.dev1' -> 'v0.20.0.dev1'
129 Environment variable FM_DOCKER_IMAGE_TAG can override for testing.
131 Returns:
132 str: The Docker image tag (e.g., 'v0.19.0' or 'v0.19.1.dev0').
133 """
134 import os
136 # Allow environment variable override for testing
137 if override_tag := os.getenv('FM_DOCKER_IMAGE_TAG'):
138 return override_tag
140 version = get_current_fm_version()
142 # Always prepend 'v' if not already present
143 if not version.startswith('v'):
144 return f'v{version}'
146 return version
149def check_repo_exists(app_url: str, branch_name: str | None = None, exclude_dict: dict[str, str] = PREBAKED_SITE_APPS):
150 """
151 Check if a Frappe app exists on GitHub.
153 Args:
154 appname (str): The name of the Frappe app.
155 branchname (str | None, optional): The name of the branch to check. Defaults to None.
157 Returns:
158 dict: A dictionary containing the existence status of the app and branch (if provided).
159 """
160 try:
161 if app_url in exclude_dict:
162 app = 200
163 else:
164 app = requests.get(app_url).status_code
166 if branch_name:
167 if branch_name in exclude_dict.values():
168 branch = 200
169 else:
170 branch_url = f"{app_url}/tree/{branch_name}"
171 branch = requests.get(branch_url).status_code
173 return {
174 "app": True if app == 200 else False,
175 "branch": True if branch == 200 else False,
176 }
177 return {"app": True if app == 200 else False}
179 except Exception as e:
180 output = get_global_output_handler()
181 output.error(f"Not able to validate app {app_url} for branch [blue]{branch_name}[/blue]", e)
184def check_frappe_app_exists(app: str, branch_name: str | None = None):
185 if "github.com" not in app:
186 app = f"https://github.com/frappe/{app}"
188 return check_repo_exists(app_url=app, branch_name=branch_name)
191def represent_null_empty(string_null):
192 """
193 Replaces the string "null" with an empty string.
195 Args:
196 string_null (str): The input string.
198 Returns:
199 str: The modified string with "null" replaced by an empty string.
200 """
201 return string_null.replace("null", "")
204def log_file(file, refresh_time: float = 0.1, follow: bool = False):
205 """
206 Generator function that yields new lines in a file
208 Parameters:
209 - file: The file object to read from
210 - refresh_time: The time interval (in seconds) to wait before checking for new lines in the file (default: 0.1)
211 - follow: If True, the function will continue to yield new lines as they are added to the file (default: False)
213 Returns:
214 - A generator that yields each new line in the file
215 """
216 file.seek(0)
218 # start infinite loop
219 while True:
220 # read last line of file
221 line = file.readline()
222 if not line:
223 if not follow:
224 break
225 # sleep if file hasn't been updated
226 time.sleep(refresh_time)
227 continue
228 line = line.strip("\n")
229 yield line
232def get_container_name_prefix(site_name):
233 """
234 Returns the container name prefix by removing dots from the site name.
236 Args:
237 site_name (str): The name of the site.
239 Returns:
240 str: The container name prefix.
241 """
242 return "fm" + CLI_DEFAULT_DELIMETER + site_name.replace(".", CLI_SITE_NAME_DELIMETER)
245def get_redis_cache_addr(container_prefix: str) -> tuple[str, int]:
246 return f"{container_prefix}{CLI_DEFAULT_DELIMETER}redis-cache", 6379
249def get_redis_queue_addr(container_prefix: str) -> tuple[str, int]:
250 return f"{container_prefix}{CLI_DEFAULT_DELIMETER}redis-queue", 6379
253def get_bench_connection_config(container_prefix: str, db_host: str, db_port: int) -> dict:
254 cache_host, cache_port = get_redis_cache_addr(container_prefix)
255 queue_host, queue_port = get_redis_queue_addr(container_prefix)
256 return {
257 "bench_id": "workspace-frappe-bench",
258 "db_host": db_host,
259 "db_port": db_port,
260 "redis_cache": f"redis://{cache_host}:{cache_port}",
261 "redis_queue": f"redis://{queue_host}:{queue_port}",
262 "redis_socketio": f"redis://{cache_host}:{cache_port}",
263 }
266def random_password_generate(password_length=13, symbols=False):
267 # Define the character set to include symbols
268 # symbols = "!@#$%^&*()_-+=[]{}|;:,.<>?`~"
269 symbols = "!@%_-+?"
271 # Generate a password without symbols using token_urlsafe
273 generated_password = secrets.token_urlsafe(password_length)
275 # Replace some characters with symbols in the generated password
276 if symbols:
277 password = "".join(c if secrets.choice([True, False]) else secrets.choice(symbols) for c in generated_password)
278 return password
280 return generated_password
283# Retrieve Unix groups and their corresponding integer mappings
284def get_unix_groups():
285 groups = {}
286 for group_entry in grp.getgrall():
287 group_name = group_entry.gr_name
288 groups[group_name] = group_entry.gr_gid
289 return groups
292def install_package(package_name, version):
293 output_lines = run_command_with_exit_code(
294 [sys.executable, "-m", "pip", "install", f"{package_name}=={version}"],
295 stream=True,
296 )
297 output = get_global_output_handler()
298 output.live_lines(output_lines)
301def create_class_from_dict(class_name, attributes_dict):
302 """
303 Dynamically creates a class with properties based on the provided attributes dictionary.
305 Parameters:
306 class_name (str): The name of the class to be created.
307 attributes_dict (dict): A dictionary where keys are the names of the properties and values are their default values.
309 Returns:
310 A new class with the specified properties and their default values.
311 """
312 return type(class_name, (object,), attributes_dict)
315def create_symlink(source: Path, dest: Path):
316 """
317 Create a symbolic link pointing from dest to source.
319 Parameters:
320 - source (str): The source path that the symlink will point to.
321 - dest (str): The destination path where the symlink will be created.
323 Note: The function will overwrite the destination if a symlink already exists there.
324 """
326 # Convert the source and destination to Path objects
328 if dest.exists() or dest.is_symlink():
329 dest.unlink()
331 dest.symlink_to(source)
334def get_template_path(file_name: str, template_dir: str = "templates") -> Path:
335 """
336 Get the file path of a template.
338 Args:
339 file_name (str): The name of the template file.
340 template_directory (str, optional): The directory where the templates are located. Defaults to "templates".
342 Returns:
343 Optional[str]: The file path of the template, or None if the template is not found.
344 """
345 template_path: str = f"{template_dir}/{file_name}"
346 return get_frappe_manager_own_files(template_path)
349def get_frappe_manager_own_files(file_path: str):
350 return Path(str(pkg_resources.files("frappe_manager").joinpath(file_path)))
353def rich_object_to_string(obj) -> str:
354 """Convert a rich Traceback object to a string."""
356 capture_buffer = StringIO()
358 fake_console = Console(force_terminal=False, file=capture_buffer)
359 fake_console.print(obj, crop=False, overflow="ignore")
361 captured_str = capture_buffer.getvalue() # Retrieve the captured output as a string
362 capture_buffer.close()
363 return captured_str
366def capture_and_format_exception(traceback_max_frames: int = 100) -> str:
367 """Capture the current exception and return a formatted traceback string."""
369 exc_type, exc_value, exc_traceback = sys.exc_info()
371 traceback = Traceback.from_exception(
372 exc_type,
373 exc_value,
374 exc_traceback,
375 show_locals=True,
376 max_frames=traceback_max_frames,
377 )
379 # Convert the Traceback object to a formatted string
380 formatted_traceback = rich_object_to_string(traceback)
382 return formatted_traceback
385def pluralise(singular, count):
386 return "{} {}{}".format(count, singular, "" if count == 1 else "s")
389def format_ssl_certificate_time_remaining(expiry_date: datetime):
390 today_date = datetime.now(expiry_date.tzinfo)
391 time_remaining = expiry_date - today_date
392 day_count = time_remaining.days
393 seconds_per_minute = 60
394 seconds_per_hour = seconds_per_minute * 60
395 seconds_unaccounted_for = time_remaining.seconds
397 hours = int(seconds_unaccounted_for / seconds_per_hour)
398 seconds_unaccounted_for -= hours * seconds_per_hour
400 minutes = int(seconds_unaccounted_for / seconds_per_minute)
402 return "{} {} {}".format(pluralise("day", day_count), pluralise("hour", hours), pluralise("min", minutes))
405def get_certificate_expiry_date(fullchain_path: Path) -> datetime:
406 cert_content = fullchain_path.read_bytes()
407 cert = x509.load_pem_x509_certificate(cert_content, default_backend())
408 if hasattr(cert, "not_valid_after_utc"):
409 expiry_date: datetime = cert.not_valid_after_utc
410 else:
411 expiry_date: datetime = cert.not_valid_after
412 return expiry_date
415def save_dict_to_file(config: dict, json_file_path: Path):
416 """
417 Sets the config value in the json_file_path file.
419 Args:
420 config (dict): A dictionary containing the key-value pairs.
421 """
423 final_config = {}
424 with open(json_file_path) as f:
425 final_config = json.load(f)
426 for key, value in config.items():
427 final_config[key] = value
428 with open(json_file_path, "w") as f:
429 json.dump(final_config, f)