Coverage for frappe_manager / docker / docker_client.py: 25%

154 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1import json 

2import shlex 

3from pathlib import Path 

4from typing import Literal 

5 

6from frappe_manager.docker.docker_compose import DockerComposeWrapper 

7from frappe_manager.docker.docker_exceptions import DockerException 

8from frappe_manager.output_manager.base import OutputHandler 

9from frappe_manager.utils.docker import ( 

10 SubprocessOutput, 

11 is_current_user_in_group, 

12 parameters_to_options, 

13 run_command_with_exit_code, 

14) 

15 

16 

17class DockerClient: 

18 """ 

19 This class provide one to one mapping to the docker command. 

20 

21 Only this args have are different use case. 

22 stream (bool, optional): A boolean flag indicating whether to stream the output of the command as it runs. 

23 If set to True, the output will be displayed in real-time. If set to False, the output will be 

24 displayed after the command completes. Defaults to False. 

25 stream_only_exit_code (bool, optional): A boolean flag indicating whether to only stream the exit code of the 

26 command. Defaults to False. 

27 """ 

28 

29 def __init__(self, compose_file_path: Path | None = None, output: OutputHandler | None = None): 

30 """ 

31 Initializes a DockerClient object. 

32 Args: 

33 compose_file_path (Optional[Path]): The path to the Docker Compose file. Defaults to None. 

34 output (Optional[OutputHandler]): Output handler for docker operations. Defaults to None. 

35 """ 

36 self.docker_cmd = ["docker"] 

37 self.output = output 

38 self.compose: DockerComposeWrapper | None = None 

39 if compose_file_path: 

40 self.compose = DockerComposeWrapper(compose_file_path, output=output) 

41 

42 def version(self) -> dict: 

43 """ 

44 Retrieves the version information of the Docker client. 

45 

46 Returns: 

47 A dictionary containing the version information. 

48 """ 

49 parameters: dict = locals() 

50 

51 parameters["format"] = "json" 

52 

53 ver_cmd: list = ["version"] 

54 

55 ver_cmd += parameters_to_options(parameters) 

56 

57 try: 

58 output: SubprocessOutput = run_command_with_exit_code(self.docker_cmd + ver_cmd, stream=False) 

59 version: dict = json.loads(" ".join(output.stdout)) 

60 return version 

61 except DockerException as e: 

62 version: dict = json.loads(" ".join(e.output.stdout)) 

63 return version 

64 

65 def server_running(self) -> bool: 

66 """ 

67 Checks if the Docker server is running. 

68 

69 Returns: 

70 bool: True if the Docker server is running, False otherwise. 

71 """ 

72 docker_info = self.version() 

73 

74 if "Server" in docker_info: 

75 if docker_info["Server"]: 

76 return True 

77 return False 

78 # check if the current user in the docker group and notify the user 

79 is_current_user_in_group("docker") 

80 return False 

81 

82 def cp( 

83 self, 

84 source: str, 

85 destination: str, 

86 source_container: str | None = None, 

87 destination_container: str | None = None, 

88 archive: bool = False, 

89 follow_link: bool = False, 

90 stream: bool = False, 

91 ): 

92 parameters: dict = locals() 

93 cp_cmd: list = ["cp"] 

94 

95 remove_parameters = [ 

96 "stream", 

97 "source", 

98 "destination", 

99 "source_container", 

100 "destination_container", 

101 ] 

102 

103 cp_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

104 

105 if source_container: 

106 source = f"{source_container}:{source}" 

107 

108 if destination_container: 

109 destination = f"{destination_container}:{destination}" 

110 

111 cp_cmd += [f"{source}"] 

112 cp_cmd += [f"{destination}"] 

113 

114 iterator = run_command_with_exit_code(self.docker_cmd + cp_cmd, stream=stream) 

115 return iterator 

116 

117 def kill( 

118 self, 

119 container: str, 

120 signal: str | None = None, 

121 stream: bool = False, 

122 ): 

123 parameters: dict = locals() 

124 kill_cmd: list = ["kill"] 

125 

126 remove_parameters = ["stream", "container"] 

127 

128 kill_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

129 kill_cmd += [f"{container}"] 

130 

131 iterator = run_command_with_exit_code(self.docker_cmd + kill_cmd, stream=stream) 

132 return iterator 

133 

134 def rm( 

135 self, 

136 container: str, 

137 force: bool = False, 

138 link: bool = False, 

139 volumes: bool = False, 

140 stream: bool = False, 

141 ): 

142 parameters: dict = locals() 

143 rm_cmd: list = ["rm"] 

144 

145 remove_parameters = ["stream", "container"] 

146 

147 rm_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

148 rm_cmd += [f"{container}"] 

149 

150 iterator = run_command_with_exit_code(self.docker_cmd + rm_cmd, stream=stream) 

151 return iterator 

152 

153 def run( 

154 self, 

155 image: str, 

156 command: str | None = None, 

157 env: dict[str, str] | None = None, 

158 name: str | None = None, 

159 user: str | None = None, 

160 volume: list[str] | None = None, 

161 detach: bool = False, 

162 entrypoint: str | None = None, 

163 workdir: str | None = None, 

164 platform: str | None = None, 

165 pull: Literal["missing", "never", "always"] = "missing", 

166 use_shlex_split: bool = True, 

167 stream: bool = False, 

168 rm: bool = False, 

169 ): 

170 parameters: dict = locals() 

171 run_cmd: list = ["run"] 

172 

173 remove_parameters = ["stream", "command", "image", "use_shlex_split", "env"] 

174 

175 run_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

176 

177 if isinstance(env, dict): 

178 for i, v in env.items(): 

179 run_cmd += ["--env", f"{i}={v}"] 

180 

181 run_cmd += [f"{image}"] 

182 

183 if command: 

184 if use_shlex_split: 

185 run_cmd += shlex.split(command, posix=True) 

186 else: 

187 run_cmd += [command] 

188 

189 iterator = run_command_with_exit_code(self.docker_cmd + run_cmd, stream=stream) 

190 return iterator 

191 

192 def pull( 

193 self, 

194 container_name: str, 

195 all_tags: bool = False, 

196 platform: str | None = None, 

197 stream: bool = False, 

198 ): 

199 parameters: dict = locals() 

200 

201 pull_cmd: list[str] = ["pull"] 

202 

203 remove_parameters = ["stream", "container_name"] 

204 

205 pull_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

206 pull_cmd += [container_name] 

207 

208 iterator = run_command_with_exit_code( 

209 self.docker_cmd + pull_cmd, 

210 stream=stream, 

211 ) 

212 return iterator 

213 

214 def images( 

215 self, 

216 format: Literal["json"] = "json", 

217 ): 

218 parameters: dict = locals() 

219 

220 images_cmd: list[str] = ["images"] 

221 remove_parameters = [] 

222 

223 images_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

224 

225 output: SubprocessOutput = run_command_with_exit_code( 

226 self.docker_cmd + images_cmd, 

227 stream=False, 

228 ) 

229 

230 images = [] 

231 

232 if output.stdout: 

233 for image in output.stdout: 

234 images.append(json.loads(image)) 

235 

236 return images 

237 

238 def tag( 

239 self, 

240 source_image: str, 

241 target_image: str, 

242 stream: bool = False, 

243 ): 

244 """ 

245 Tag a source image with a new target name/tag. 

246 

247 Args: 

248 source_image: Source image (e.g., "alpine:3.18" or image ID) 

249 target_image: Target image tag (e.g., "myrepo/alpine:latest") 

250 stream: Whether to stream output (default: False) 

251 

252 Returns: 

253 SubprocessOutput or iterator if streaming 

254 

255 Example: 

256 docker.tag("alpine:3.18", "myrepo/alpine:prod") 

257 """ 

258 parameters: dict = locals() 

259 

260 tag_cmd: list[str] = ["tag"] 

261 remove_parameters = ["stream", "source_image", "target_image"] 

262 

263 tag_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

264 tag_cmd += [source_image, target_image] 

265 

266 iterator = run_command_with_exit_code( 

267 self.docker_cmd + tag_cmd, 

268 stream=stream, 

269 ) 

270 return iterator 

271 

272 def rmi( 

273 self, 

274 image: str | list[str], 

275 force: bool = False, 

276 no_prune: bool = False, 

277 stream: bool = False, 

278 ): 

279 """ 

280 Remove one or more images. 

281 

282 Args: 

283 image: Image name/ID or list of images to remove 

284 force: Force removal of the image (default: False) 

285 no_prune: Do not delete untagged parents (default: False) 

286 stream: Whether to stream output (default: False) 

287 

288 Returns: 

289 SubprocessOutput or iterator if streaming 

290 

291 Example: 

292 docker.rmi("myrepo/alpine:old") 

293 docker.rmi(["img1", "img2"], force=True) 

294 """ 

295 parameters: dict = locals() 

296 

297 rmi_cmd: list[str] = ["rmi"] 

298 remove_parameters = ["stream", "image"] 

299 

300 rmi_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

301 

302 if isinstance(image, str): 

303 rmi_cmd += [image] 

304 else: 

305 rmi_cmd += image 

306 

307 iterator = run_command_with_exit_code( 

308 self.docker_cmd + rmi_cmd, 

309 stream=stream, 

310 ) 

311 return iterator 

312 

313 def create_temp_container(self, image: str, name: str | None = None, **run_kwargs) -> "TempContainer": 

314 """ 

315 Create a temporary container (context manager). 

316 

317 Args: 

318 image: Image to use 

319 name: Container name (random if not provided) 

320 **run_kwargs: Additional run arguments 

321 

322 Returns: 

323 TempContainer context manager 

324 

325 Example: 

326 with docker.create_temp_container("alpine") as container: 

327 docker.cp(source="/etc/hosts", destination="./", 

328 source_container=container.name) 

329 """ 

330 if name is None: 

331 from frappe_manager.utils.docker import generate_random_text 

332 

333 name = f"temp_{generate_random_text(10)}" 

334 

335 return TempContainer(self, image, name, run_kwargs) 

336 

337 def is_running(self) -> bool: 

338 """Alias for server_running() for better readability""" 

339 return self.server_running() 

340 

341 def __enter__(self) -> "DockerClient": 

342 """ 

343 Enter context manager - enables automatic cleanup of compose environment. 

344 

345 Returns: 

346 Self for use in 'with' statement 

347 

348 Example: 

349 with DockerClient(compose_path) as docker: 

350 docker.compose.up(detach=True) 

351 # Work with docker... 

352 # Compose environment auto-cleaned up on exit 

353 

354 Note: 

355 If a compose file path was provided during initialization, the compose 

356 environment will be automatically cleaned up when the context exits. 

357 """ 

358 # If compose is available, enter its context 

359 if self.compose: 

360 self.compose.__enter__() 

361 return self 

362 

363 def __exit__(self, exc_type, exc_val, exc_tb) -> bool: 

364 """ 

365 Exit context manager - cleans up compose environment if present. 

366 

367 Args: 

368 exc_type: Exception type (if any) 

369 exc_val: Exception value (if any) 

370 exc_tb: Exception traceback (if any) 

371 

372 Returns: 

373 False (does not suppress exceptions) 

374 

375 Note: 

376 If a compose file path was provided during initialization, this will 

377 delegate to the compose wrapper's cleanup logic. Any cleanup errors 

378 are silently ignored (best-effort cleanup). 

379 """ 

380 # If compose is available, exit its context 

381 if self.compose: 

382 self.compose.__exit__(exc_type, exc_val, exc_tb) 

383 

384 # Don't suppress exceptions - return False 

385 return False 

386 

387 

388class TempContainer: 

389 """Context manager for temporary Docker containers""" 

390 

391 def __init__(self, docker: DockerClient, image: str, name: str, run_kwargs: dict): 

392 self.docker = docker 

393 self.image = image 

394 self.name = name 

395 self.run_kwargs = run_kwargs 

396 self._started = False 

397 

398 def __enter__(self) -> "TempContainer": 

399 """Start the temporary container""" 

400 from frappe_manager.docker import DockerException 

401 

402 try: 

403 self.docker.run( 

404 image=self.image, 

405 name=self.name, 

406 detach=True, 

407 entrypoint="bash", 

408 command="tail -f /dev/null", 

409 stream=False, 

410 **self.run_kwargs, 

411 ) 

412 self._started = True 

413 except DockerException as e: 

414 raise RuntimeError(f"Failed to create temp container: {e}") 

415 return self 

416 

417 def __exit__(self, exc_type, exc_val, exc_tb): 

418 """Remove the temporary container""" 

419 from frappe_manager.docker import DockerException 

420 

421 if self._started: 

422 try: 

423 self.docker.rm(container=self.name, force=True, stream=False) 

424 except DockerException: 

425 pass # Best effort cleanup 

426 return False