Coverage for frappe_manager / docker / docker_client.py: 25%
154 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1import json
2import shlex
3from pathlib import Path
4from typing import Literal
6from frappe_manager.docker.docker_compose import DockerComposeWrapper
7from frappe_manager.docker.docker_exceptions import DockerException
8from frappe_manager.output_manager.base import OutputHandler
9from frappe_manager.utils.docker import (
10 SubprocessOutput,
11 is_current_user_in_group,
12 parameters_to_options,
13 run_command_with_exit_code,
14)
17class DockerClient:
18 """
19 This class provide one to one mapping to the docker command.
21 Only this args have are different use case.
22 stream (bool, optional): A boolean flag indicating whether to stream the output of the command as it runs.
23 If set to True, the output will be displayed in real-time. If set to False, the output will be
24 displayed after the command completes. Defaults to False.
25 stream_only_exit_code (bool, optional): A boolean flag indicating whether to only stream the exit code of the
26 command. Defaults to False.
27 """
29 def __init__(self, compose_file_path: Path | None = None, output: OutputHandler | None = None):
30 """
31 Initializes a DockerClient object.
32 Args:
33 compose_file_path (Optional[Path]): The path to the Docker Compose file. Defaults to None.
34 output (Optional[OutputHandler]): Output handler for docker operations. Defaults to None.
35 """
36 self.docker_cmd = ["docker"]
37 self.output = output
38 self.compose: DockerComposeWrapper | None = None
39 if compose_file_path:
40 self.compose = DockerComposeWrapper(compose_file_path, output=output)
42 def version(self) -> dict:
43 """
44 Retrieves the version information of the Docker client.
46 Returns:
47 A dictionary containing the version information.
48 """
49 parameters: dict = locals()
51 parameters["format"] = "json"
53 ver_cmd: list = ["version"]
55 ver_cmd += parameters_to_options(parameters)
57 try:
58 output: SubprocessOutput = run_command_with_exit_code(self.docker_cmd + ver_cmd, stream=False)
59 version: dict = json.loads(" ".join(output.stdout))
60 return version
61 except DockerException as e:
62 version: dict = json.loads(" ".join(e.output.stdout))
63 return version
65 def server_running(self) -> bool:
66 """
67 Checks if the Docker server is running.
69 Returns:
70 bool: True if the Docker server is running, False otherwise.
71 """
72 docker_info = self.version()
74 if "Server" in docker_info:
75 if docker_info["Server"]:
76 return True
77 return False
78 # check if the current user in the docker group and notify the user
79 is_current_user_in_group("docker")
80 return False
82 def cp(
83 self,
84 source: str,
85 destination: str,
86 source_container: str | None = None,
87 destination_container: str | None = None,
88 archive: bool = False,
89 follow_link: bool = False,
90 stream: bool = False,
91 ):
92 parameters: dict = locals()
93 cp_cmd: list = ["cp"]
95 remove_parameters = [
96 "stream",
97 "source",
98 "destination",
99 "source_container",
100 "destination_container",
101 ]
103 cp_cmd += parameters_to_options(parameters, exclude=remove_parameters)
105 if source_container:
106 source = f"{source_container}:{source}"
108 if destination_container:
109 destination = f"{destination_container}:{destination}"
111 cp_cmd += [f"{source}"]
112 cp_cmd += [f"{destination}"]
114 iterator = run_command_with_exit_code(self.docker_cmd + cp_cmd, stream=stream)
115 return iterator
117 def kill(
118 self,
119 container: str,
120 signal: str | None = None,
121 stream: bool = False,
122 ):
123 parameters: dict = locals()
124 kill_cmd: list = ["kill"]
126 remove_parameters = ["stream", "container"]
128 kill_cmd += parameters_to_options(parameters, exclude=remove_parameters)
129 kill_cmd += [f"{container}"]
131 iterator = run_command_with_exit_code(self.docker_cmd + kill_cmd, stream=stream)
132 return iterator
134 def rm(
135 self,
136 container: str,
137 force: bool = False,
138 link: bool = False,
139 volumes: bool = False,
140 stream: bool = False,
141 ):
142 parameters: dict = locals()
143 rm_cmd: list = ["rm"]
145 remove_parameters = ["stream", "container"]
147 rm_cmd += parameters_to_options(parameters, exclude=remove_parameters)
148 rm_cmd += [f"{container}"]
150 iterator = run_command_with_exit_code(self.docker_cmd + rm_cmd, stream=stream)
151 return iterator
153 def run(
154 self,
155 image: str,
156 command: str | None = None,
157 env: dict[str, str] | None = None,
158 name: str | None = None,
159 user: str | None = None,
160 volume: list[str] | None = None,
161 detach: bool = False,
162 entrypoint: str | None = None,
163 workdir: str | None = None,
164 platform: str | None = None,
165 pull: Literal["missing", "never", "always"] = "missing",
166 use_shlex_split: bool = True,
167 stream: bool = False,
168 rm: bool = False,
169 ):
170 parameters: dict = locals()
171 run_cmd: list = ["run"]
173 remove_parameters = ["stream", "command", "image", "use_shlex_split", "env"]
175 run_cmd += parameters_to_options(parameters, exclude=remove_parameters)
177 if isinstance(env, dict):
178 for i, v in env.items():
179 run_cmd += ["--env", f"{i}={v}"]
181 run_cmd += [f"{image}"]
183 if command:
184 if use_shlex_split:
185 run_cmd += shlex.split(command, posix=True)
186 else:
187 run_cmd += [command]
189 iterator = run_command_with_exit_code(self.docker_cmd + run_cmd, stream=stream)
190 return iterator
192 def pull(
193 self,
194 container_name: str,
195 all_tags: bool = False,
196 platform: str | None = None,
197 stream: bool = False,
198 ):
199 parameters: dict = locals()
201 pull_cmd: list[str] = ["pull"]
203 remove_parameters = ["stream", "container_name"]
205 pull_cmd += parameters_to_options(parameters, exclude=remove_parameters)
206 pull_cmd += [container_name]
208 iterator = run_command_with_exit_code(
209 self.docker_cmd + pull_cmd,
210 stream=stream,
211 )
212 return iterator
214 def images(
215 self,
216 format: Literal["json"] = "json",
217 ):
218 parameters: dict = locals()
220 images_cmd: list[str] = ["images"]
221 remove_parameters = []
223 images_cmd += parameters_to_options(parameters, exclude=remove_parameters)
225 output: SubprocessOutput = run_command_with_exit_code(
226 self.docker_cmd + images_cmd,
227 stream=False,
228 )
230 images = []
232 if output.stdout:
233 for image in output.stdout:
234 images.append(json.loads(image))
236 return images
238 def tag(
239 self,
240 source_image: str,
241 target_image: str,
242 stream: bool = False,
243 ):
244 """
245 Tag a source image with a new target name/tag.
247 Args:
248 source_image: Source image (e.g., "alpine:3.18" or image ID)
249 target_image: Target image tag (e.g., "myrepo/alpine:latest")
250 stream: Whether to stream output (default: False)
252 Returns:
253 SubprocessOutput or iterator if streaming
255 Example:
256 docker.tag("alpine:3.18", "myrepo/alpine:prod")
257 """
258 parameters: dict = locals()
260 tag_cmd: list[str] = ["tag"]
261 remove_parameters = ["stream", "source_image", "target_image"]
263 tag_cmd += parameters_to_options(parameters, exclude=remove_parameters)
264 tag_cmd += [source_image, target_image]
266 iterator = run_command_with_exit_code(
267 self.docker_cmd + tag_cmd,
268 stream=stream,
269 )
270 return iterator
272 def rmi(
273 self,
274 image: str | list[str],
275 force: bool = False,
276 no_prune: bool = False,
277 stream: bool = False,
278 ):
279 """
280 Remove one or more images.
282 Args:
283 image: Image name/ID or list of images to remove
284 force: Force removal of the image (default: False)
285 no_prune: Do not delete untagged parents (default: False)
286 stream: Whether to stream output (default: False)
288 Returns:
289 SubprocessOutput or iterator if streaming
291 Example:
292 docker.rmi("myrepo/alpine:old")
293 docker.rmi(["img1", "img2"], force=True)
294 """
295 parameters: dict = locals()
297 rmi_cmd: list[str] = ["rmi"]
298 remove_parameters = ["stream", "image"]
300 rmi_cmd += parameters_to_options(parameters, exclude=remove_parameters)
302 if isinstance(image, str):
303 rmi_cmd += [image]
304 else:
305 rmi_cmd += image
307 iterator = run_command_with_exit_code(
308 self.docker_cmd + rmi_cmd,
309 stream=stream,
310 )
311 return iterator
313 def create_temp_container(self, image: str, name: str | None = None, **run_kwargs) -> "TempContainer":
314 """
315 Create a temporary container (context manager).
317 Args:
318 image: Image to use
319 name: Container name (random if not provided)
320 **run_kwargs: Additional run arguments
322 Returns:
323 TempContainer context manager
325 Example:
326 with docker.create_temp_container("alpine") as container:
327 docker.cp(source="/etc/hosts", destination="./",
328 source_container=container.name)
329 """
330 if name is None:
331 from frappe_manager.utils.docker import generate_random_text
333 name = f"temp_{generate_random_text(10)}"
335 return TempContainer(self, image, name, run_kwargs)
337 def is_running(self) -> bool:
338 """Alias for server_running() for better readability"""
339 return self.server_running()
341 def __enter__(self) -> "DockerClient":
342 """
343 Enter context manager - enables automatic cleanup of compose environment.
345 Returns:
346 Self for use in 'with' statement
348 Example:
349 with DockerClient(compose_path) as docker:
350 docker.compose.up(detach=True)
351 # Work with docker...
352 # Compose environment auto-cleaned up on exit
354 Note:
355 If a compose file path was provided during initialization, the compose
356 environment will be automatically cleaned up when the context exits.
357 """
358 # If compose is available, enter its context
359 if self.compose:
360 self.compose.__enter__()
361 return self
363 def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
364 """
365 Exit context manager - cleans up compose environment if present.
367 Args:
368 exc_type: Exception type (if any)
369 exc_val: Exception value (if any)
370 exc_tb: Exception traceback (if any)
372 Returns:
373 False (does not suppress exceptions)
375 Note:
376 If a compose file path was provided during initialization, this will
377 delegate to the compose wrapper's cleanup logic. Any cleanup errors
378 are silently ignored (best-effort cleanup).
379 """
380 # If compose is available, exit its context
381 if self.compose:
382 self.compose.__exit__(exc_type, exc_val, exc_tb)
384 # Don't suppress exceptions - return False
385 return False
388class TempContainer:
389 """Context manager for temporary Docker containers"""
391 def __init__(self, docker: DockerClient, image: str, name: str, run_kwargs: dict):
392 self.docker = docker
393 self.image = image
394 self.name = name
395 self.run_kwargs = run_kwargs
396 self._started = False
398 def __enter__(self) -> "TempContainer":
399 """Start the temporary container"""
400 from frappe_manager.docker import DockerException
402 try:
403 self.docker.run(
404 image=self.image,
405 name=self.name,
406 detach=True,
407 entrypoint="bash",
408 command="tail -f /dev/null",
409 stream=False,
410 **self.run_kwargs,
411 )
412 self._started = True
413 except DockerException as e:
414 raise RuntimeError(f"Failed to create temp container: {e}")
415 return self
417 def __exit__(self, exc_type, exc_val, exc_tb):
418 """Remove the temporary container"""
419 from frappe_manager.docker import DockerException
421 if self._started:
422 try:
423 self.docker.rm(container=self.name, force=True, stream=False)
424 except DockerException:
425 pass # Best effort cleanup
426 return False