Coverage for frappe_manager / utils / docker.py: 35%

143 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1import os 

2import shlex 

3from collections.abc import Iterable 

4from logging import Logger 

5from pathlib import Path 

6from subprocess import run 

7 

8from frappe_manager.docker.docker_exceptions import DockerException 

9from frappe_manager.docker.subprocess_output import SubprocessOutput 

10from frappe_manager.logger import log 

11from frappe_manager.output_manager import get_global_output_handler 

12from frappe_manager.utils.subprocess import stream_command_output 

13 

14process_opened = [] 

15 

16 

17def stream_stdout_and_stderr( 

18 full_cmd: list, 

19 cwd: str | None = None, 

20 logger: Logger | None = None, 

21 env: dict[str, str] | None = None, 

22) -> Iterable[tuple[str, bytes]]: 

23 """ 

24 Executes a Docker command and streams the stdout and stderr outputs. 

25 

26 This is a Docker-specific wrapper around the generic stream_command_output() 

27 that adds DockerException handling for failed commands. 

28 

29 Args: 

30 full_cmd (list): The Docker command to be executed. 

31 cwd (Optional[str]): Working directory for command execution. 

32 logger (Optional[Logger]): Logger instance (unused, kept for compatibility). 

33 env (Dict[str, str], optional): Environment variables. Defaults to None. 

34 

35 Yields: 

36 Tuple[str, bytes]: A tuple containing the source ("stdout" or "stderr") and the output line. 

37 

38 Raises: 

39 DockerException: If the Docker command returns a non-zero exit code. 

40 

41 Returns: 

42 Iterable[Tuple[str, bytes]]: An iterable of tuples containing the source and output line. 

43 """ 

44 output = [] 

45 

46 # Use generic subprocess streaming 

47 for source, line in stream_command_output(full_cmd, env=env, cwd=cwd): 

48 output.append((source, line)) 

49 

50 # Check for exit code and raise DockerException if command failed 

51 if source == "exit_code": 

52 exit_code = int(line.decode()) 

53 if exit_code != 0: 

54 raise DockerException(full_cmd, SubprocessOutput.from_output(output)) 

55 

56 yield source, line 

57 

58 

59def run_command_with_exit_code( 

60 full_cmd: list, 

61 stream: bool = True, 

62 capture_output: bool = True, 

63 env: dict[str, str] | None = None, 

64 cwd: str | None = None, 

65) -> Iterable[tuple[str, bytes]] | SubprocessOutput: 

66 """ 

67 Run a command and return the exit code. 

68 

69 Args: 

70 full_cmd (list): The command to be executed as a list of strings. 

71 env (Dict[str, str], optional): Environment variables to be set for the command. Defaults to None. 

72 stream (bool, optional): Flag indicating whether to stream the command output. Defaults to True. 

73 """ 

74 if not stream: 

75 if not capture_output: 

76 logger = log.get_logger() 

77 logger.debug("- -" * 10) 

78 logger.debug(f"COMMAND: {' '.join(full_cmd)}") 

79 

80 run_output = run(full_cmd, cwd=cwd, env=env) 

81 exit_code = run_output.returncode 

82 

83 logger.debug(f"RETURN CODE: {exit_code}") 

84 logger.debug("- -" * 10) 

85 

86 if exit_code != 0: 

87 raise DockerException(full_cmd, SubprocessOutput([], [], [], exit_code)) 

88 return None 

89 

90 stream_output: SubprocessOutput = SubprocessOutput.from_output( 

91 stream_stdout_and_stderr(full_cmd, cwd=cwd, env=env), 

92 ) 

93 return stream_output 

94 

95 output: Iterable[tuple[str, bytes]] = stream_stdout_and_stderr(full_cmd, cwd=cwd, env=env) 

96 return output 

97 

98 

99def parameter_to_option(param: str) -> str: 

100 """Converts a parameter to an option. 

101 

102 Args: 

103 param (str): The parameter to be converted. 

104 

105 Returns: 

106 str: The converted option. 

107 """ 

108 option = "--" + param.replace("_", "-") 

109 return option 

110 

111 

112def parameters_to_options(param: dict, exclude: list = []) -> list: 

113 """ 

114 Convert a dictionary of parameters to a list of options for a command. 

115 

116 Args: 

117 param (dict): The dictionary of parameters. 

118 exclude (list, optional): A list of keys to exclude from the options. Defaults to []. 

119 

120 Returns: 

121 list: The list of options for the command. 

122 """ 

123 # remove the self parameter 

124 temp_param: dict = dict(param) 

125 

126 temp_param.pop("self", None) 

127 

128 for key in exclude: 

129 temp_param.pop(key, None) 

130 

131 params: list = [] 

132 

133 for key in temp_param: 

134 value = temp_param[key] 

135 key = "--" + key.replace("_", "-") 

136 

137 if type(value) == bool: 

138 if value: 

139 params.append(key) 

140 

141 elif type(value) == int: 

142 params.append(key) 

143 params.append(value) 

144 

145 elif type(value) == str: 

146 if value: 

147 params.append(key) 

148 params.append(value) 

149 

150 elif type(value) == list: 

151 if value: 

152 # For each item in the list, add the key and value separately 

153 for item in value: 

154 params.append(key) 

155 params.append(item) 

156 

157 return params 

158 

159 

160def is_current_user_in_group(group_name) -> bool: 

161 """Check if the current user is in the given group. 

162 

163 Args: 

164 group_name (str): The name of the group to check. 

165 

166 Returns: 

167 bool: True if the current user is in the group, False otherwise. 

168 """ 

169 

170 import platform 

171 

172 if platform.system() == "Linux": 

173 import grp 

174 import os 

175 import pwd 

176 

177 current_user = pwd.getpwuid(os.getuid()).pw_name 

178 try: 

179 docker_gid = grp.getgrnam(group_name).gr_gid 

180 docker_group_members = grp.getgrgid(docker_gid).gr_mem 

181 if current_user in docker_group_members: 

182 return True 

183 output = get_global_output_handler() 

184 output.display_error( 

185 f"Your current user [blue][b] {current_user} [/b][/blue] is not in the 'docker' group. Please add it and restart your terminal.", 

186 ) 

187 return False 

188 except KeyError: 

189 output = get_global_output_handler() 

190 output.display_error( 

191 f"The group '{group_name}' does not exist. Please create it and add your current user [blue][b] {current_user} [/b][/blue] to it.", 

192 ) 

193 return False 

194 else: 

195 return True 

196 

197 

198def generate_random_text(length=50): 

199 """ 

200 Generate a random text of specified length. 

201 

202 Parameters: 

203 length (int): The length of the random text to be generated. Default is 50. 

204 

205 Returns: 

206 str: The randomly generated text. 

207 """ 

208 import random 

209 import string 

210 

211 alphanumeric_chars = string.ascii_letters + string.digits 

212 return "".join(random.choice(alphanumeric_chars) for _ in range(length)) 

213 

214 

215def host_run_cp(image: str, source: str, destination: str, docker): 

216 """Copy files from source to destination using Docker. 

217 

218 Args: 

219 image (str): The Docker image to run. 

220 source (str): The source file or directory path. 

221 destination (str): The destination file or directory path. 

222 docker: The Docker client object. 

223 verbose (bool, optional): Whether to display verbose output. Defaults to False. 

224 """ 

225 

226 dest_path = Path(destination) 

227 output = get_global_output_handler() 

228 output.change_head(f"Populating {dest_path.name} directory") 

229 

230 try: 

231 # Use context manager for automatic cleanup 

232 with docker.create_temp_container(image) as container: 

233 # Copy from the container 

234 docker.cp( 

235 source=source, 

236 destination=destination, 

237 source_container=container.name, 

238 stream=False, 

239 ) 

240 

241 # Check if the destination file exists 

242 if not Path(destination).exists(): 

243 raise Exception(f"{destination} not found.") 

244 

245 output.change_head(f"Populated {dest_path.name} directory") 

246 

247 except DockerException as e: 

248 # Clean up destination if copy failed 

249 if dest_path.exists(): 

250 import shutil 

251 

252 shutil.rmtree(dest_path) 

253 raise Exception(f"Failed to copy files from {source} to {destination}.") from e 

254 

255 

256def fix_host_path_ownership( 

257 paths: list[Path], 

258 uid: int | None = None, 

259 gid: int | None = None, 

260 image: str | None = None, 

261 output=None, 

262) -> bool: 

263 """Fix ownership of host paths that Docker created as root. 

264 

265 When Docker mounts a volume and the host path doesn't exist, it creates 

266 the directory as root. This function runs a one-off container as root 

267 to chown the paths to the correct user. 

268 

269 Idempotent — skips paths already owned by the target uid/gid. 

270 

271 Args: 

272 paths: List of host paths to fix ownership for. 

273 uid: Target user ID. Defaults to current user's UID. 

274 gid: Target group ID. Defaults to current user's GID. 

275 image: Docker image to use for the chown container. 

276 Defaults to the current frappe-manager frappe image. 

277 output: Optional output handler for logging. 

278 

279 Returns: 

280 True if any paths were fixed, False if all were already correct. 

281 """ 

282 if uid is None: 

283 uid = os.getuid() 

284 if gid is None: 

285 gid = os.getgid() 

286 if image is None: 

287 from frappe_manager.utils.helpers import get_docker_image_tag 

288 

289 tag = get_docker_image_tag() 

290 image = f"ghcr.io/rtcamp/frappe-manager-frappe:{tag}" 

291 

292 # Filter to only paths that need fixing (owned by root) 

293 needs_fix = [] 

294 for path in paths: 

295 if path.exists() and (path.stat().st_uid == 0 or path.stat().st_gid == 0): 

296 needs_fix.append(path) 

297 

298 if not needs_fix: 

299 return False 

300 

301 if output: 

302 dirs_str = ", ".join(str(p.name) for p in needs_fix) 

303 output.print(f"Fixing ownership of {dirs_str} (Docker created as root)...") 

304 

305 # Build docker run command with volume mounts 

306 # Each path gets its own -v mount at the same absolute path inside the container 

307 cmd = ["docker", "run", "--rm", "--user", "root", "--entrypoint", ""] 

308 for path in needs_fix: 

309 abs_path = str(path.resolve()) 

310 cmd.extend(["-v", f"{abs_path}:{abs_path}"]) 

311 

312 # Use a lightweight image and chown as root 

313 chown_cmd = f"chown -R {uid}:{gid} {' '.join(shlex.quote(str(p.resolve())) for p in needs_fix)}" 

314 cmd.extend([image, "bash", "-c", chown_cmd]) 

315 

316 try: 

317 result = run_command_with_exit_code(cmd, stream=False) 

318 if isinstance(result, SubprocessOutput) and result.exit_code == 0: 

319 if output: 

320 output.print("Ownership fixed") 

321 return True 

322 else: 

323 if output: 

324 output.warning("Could not fix ownership (non-critical)") 

325 return False 

326 except Exception as e: 

327 if output: 

328 output.warning(f"Could not fix ownership: {e}") 

329 return False