Coverage for frappe_manager / utils / docker.py: 35%
143 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1import os
2import shlex
3from collections.abc import Iterable
4from logging import Logger
5from pathlib import Path
6from subprocess import run
8from frappe_manager.docker.docker_exceptions import DockerException
9from frappe_manager.docker.subprocess_output import SubprocessOutput
10from frappe_manager.logger import log
11from frappe_manager.output_manager import get_global_output_handler
12from frappe_manager.utils.subprocess import stream_command_output
14process_opened = []
17def stream_stdout_and_stderr(
18 full_cmd: list,
19 cwd: str | None = None,
20 logger: Logger | None = None,
21 env: dict[str, str] | None = None,
22) -> Iterable[tuple[str, bytes]]:
23 """
24 Executes a Docker command and streams the stdout and stderr outputs.
26 This is a Docker-specific wrapper around the generic stream_command_output()
27 that adds DockerException handling for failed commands.
29 Args:
30 full_cmd (list): The Docker command to be executed.
31 cwd (Optional[str]): Working directory for command execution.
32 logger (Optional[Logger]): Logger instance (unused, kept for compatibility).
33 env (Dict[str, str], optional): Environment variables. Defaults to None.
35 Yields:
36 Tuple[str, bytes]: A tuple containing the source ("stdout" or "stderr") and the output line.
38 Raises:
39 DockerException: If the Docker command returns a non-zero exit code.
41 Returns:
42 Iterable[Tuple[str, bytes]]: An iterable of tuples containing the source and output line.
43 """
44 output = []
46 # Use generic subprocess streaming
47 for source, line in stream_command_output(full_cmd, env=env, cwd=cwd):
48 output.append((source, line))
50 # Check for exit code and raise DockerException if command failed
51 if source == "exit_code":
52 exit_code = int(line.decode())
53 if exit_code != 0:
54 raise DockerException(full_cmd, SubprocessOutput.from_output(output))
56 yield source, line
59def run_command_with_exit_code(
60 full_cmd: list,
61 stream: bool = True,
62 capture_output: bool = True,
63 env: dict[str, str] | None = None,
64 cwd: str | None = None,
65) -> Iterable[tuple[str, bytes]] | SubprocessOutput:
66 """
67 Run a command and return the exit code.
69 Args:
70 full_cmd (list): The command to be executed as a list of strings.
71 env (Dict[str, str], optional): Environment variables to be set for the command. Defaults to None.
72 stream (bool, optional): Flag indicating whether to stream the command output. Defaults to True.
73 """
74 if not stream:
75 if not capture_output:
76 logger = log.get_logger()
77 logger.debug("- -" * 10)
78 logger.debug(f"COMMAND: {' '.join(full_cmd)}")
80 run_output = run(full_cmd, cwd=cwd, env=env)
81 exit_code = run_output.returncode
83 logger.debug(f"RETURN CODE: {exit_code}")
84 logger.debug("- -" * 10)
86 if exit_code != 0:
87 raise DockerException(full_cmd, SubprocessOutput([], [], [], exit_code))
88 return None
90 stream_output: SubprocessOutput = SubprocessOutput.from_output(
91 stream_stdout_and_stderr(full_cmd, cwd=cwd, env=env),
92 )
93 return stream_output
95 output: Iterable[tuple[str, bytes]] = stream_stdout_and_stderr(full_cmd, cwd=cwd, env=env)
96 return output
99def parameter_to_option(param: str) -> str:
100 """Converts a parameter to an option.
102 Args:
103 param (str): The parameter to be converted.
105 Returns:
106 str: The converted option.
107 """
108 option = "--" + param.replace("_", "-")
109 return option
112def parameters_to_options(param: dict, exclude: list = []) -> list:
113 """
114 Convert a dictionary of parameters to a list of options for a command.
116 Args:
117 param (dict): The dictionary of parameters.
118 exclude (list, optional): A list of keys to exclude from the options. Defaults to [].
120 Returns:
121 list: The list of options for the command.
122 """
123 # remove the self parameter
124 temp_param: dict = dict(param)
126 temp_param.pop("self", None)
128 for key in exclude:
129 temp_param.pop(key, None)
131 params: list = []
133 for key in temp_param:
134 value = temp_param[key]
135 key = "--" + key.replace("_", "-")
137 if type(value) == bool:
138 if value:
139 params.append(key)
141 elif type(value) == int:
142 params.append(key)
143 params.append(value)
145 elif type(value) == str:
146 if value:
147 params.append(key)
148 params.append(value)
150 elif type(value) == list:
151 if value:
152 # For each item in the list, add the key and value separately
153 for item in value:
154 params.append(key)
155 params.append(item)
157 return params
160def is_current_user_in_group(group_name) -> bool:
161 """Check if the current user is in the given group.
163 Args:
164 group_name (str): The name of the group to check.
166 Returns:
167 bool: True if the current user is in the group, False otherwise.
168 """
170 import platform
172 if platform.system() == "Linux":
173 import grp
174 import os
175 import pwd
177 current_user = pwd.getpwuid(os.getuid()).pw_name
178 try:
179 docker_gid = grp.getgrnam(group_name).gr_gid
180 docker_group_members = grp.getgrgid(docker_gid).gr_mem
181 if current_user in docker_group_members:
182 return True
183 output = get_global_output_handler()
184 output.display_error(
185 f"Your current user [blue][b] {current_user} [/b][/blue] is not in the 'docker' group. Please add it and restart your terminal.",
186 )
187 return False
188 except KeyError:
189 output = get_global_output_handler()
190 output.display_error(
191 f"The group '{group_name}' does not exist. Please create it and add your current user [blue][b] {current_user} [/b][/blue] to it.",
192 )
193 return False
194 else:
195 return True
198def generate_random_text(length=50):
199 """
200 Generate a random text of specified length.
202 Parameters:
203 length (int): The length of the random text to be generated. Default is 50.
205 Returns:
206 str: The randomly generated text.
207 """
208 import random
209 import string
211 alphanumeric_chars = string.ascii_letters + string.digits
212 return "".join(random.choice(alphanumeric_chars) for _ in range(length))
215def host_run_cp(image: str, source: str, destination: str, docker):
216 """Copy files from source to destination using Docker.
218 Args:
219 image (str): The Docker image to run.
220 source (str): The source file or directory path.
221 destination (str): The destination file or directory path.
222 docker: The Docker client object.
223 verbose (bool, optional): Whether to display verbose output. Defaults to False.
224 """
226 dest_path = Path(destination)
227 output = get_global_output_handler()
228 output.change_head(f"Populating {dest_path.name} directory")
230 try:
231 # Use context manager for automatic cleanup
232 with docker.create_temp_container(image) as container:
233 # Copy from the container
234 docker.cp(
235 source=source,
236 destination=destination,
237 source_container=container.name,
238 stream=False,
239 )
241 # Check if the destination file exists
242 if not Path(destination).exists():
243 raise Exception(f"{destination} not found.")
245 output.change_head(f"Populated {dest_path.name} directory")
247 except DockerException as e:
248 # Clean up destination if copy failed
249 if dest_path.exists():
250 import shutil
252 shutil.rmtree(dest_path)
253 raise Exception(f"Failed to copy files from {source} to {destination}.") from e
256def fix_host_path_ownership(
257 paths: list[Path],
258 uid: int | None = None,
259 gid: int | None = None,
260 image: str | None = None,
261 output=None,
262) -> bool:
263 """Fix ownership of host paths that Docker created as root.
265 When Docker mounts a volume and the host path doesn't exist, it creates
266 the directory as root. This function runs a one-off container as root
267 to chown the paths to the correct user.
269 Idempotent — skips paths already owned by the target uid/gid.
271 Args:
272 paths: List of host paths to fix ownership for.
273 uid: Target user ID. Defaults to current user's UID.
274 gid: Target group ID. Defaults to current user's GID.
275 image: Docker image to use for the chown container.
276 Defaults to the current frappe-manager frappe image.
277 output: Optional output handler for logging.
279 Returns:
280 True if any paths were fixed, False if all were already correct.
281 """
282 if uid is None:
283 uid = os.getuid()
284 if gid is None:
285 gid = os.getgid()
286 if image is None:
287 from frappe_manager.utils.helpers import get_docker_image_tag
289 tag = get_docker_image_tag()
290 image = f"ghcr.io/rtcamp/frappe-manager-frappe:{tag}"
292 # Filter to only paths that need fixing (owned by root)
293 needs_fix = []
294 for path in paths:
295 if path.exists() and (path.stat().st_uid == 0 or path.stat().st_gid == 0):
296 needs_fix.append(path)
298 if not needs_fix:
299 return False
301 if output:
302 dirs_str = ", ".join(str(p.name) for p in needs_fix)
303 output.print(f"Fixing ownership of {dirs_str} (Docker created as root)...")
305 # Build docker run command with volume mounts
306 # Each path gets its own -v mount at the same absolute path inside the container
307 cmd = ["docker", "run", "--rm", "--user", "root", "--entrypoint", ""]
308 for path in needs_fix:
309 abs_path = str(path.resolve())
310 cmd.extend(["-v", f"{abs_path}:{abs_path}"])
312 # Use a lightweight image and chown as root
313 chown_cmd = f"chown -R {uid}:{gid} {' '.join(shlex.quote(str(p.resolve())) for p in needs_fix)}"
314 cmd.extend([image, "bash", "-c", chown_cmd])
316 try:
317 result = run_command_with_exit_code(cmd, stream=False)
318 if isinstance(result, SubprocessOutput) and result.exit_code == 0:
319 if output:
320 output.print("Ownership fixed")
321 return True
322 else:
323 if output:
324 output.warning("Could not fix ownership (non-critical)")
325 return False
326 except Exception as e:
327 if output:
328 output.warning(f"Could not fix ownership: {e}")
329 return False