Coverage for frappe_manager / docker / docker_compose.py: 52%
234 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1import inspect
2import itertools
3import shlex
4from collections.abc import Callable, Iterable
5from functools import wraps
6from pathlib import Path
7from subprocess import run
8from typing import Literal, TypeVar, cast, overload
10from frappe_manager.output_manager.base import OutputHandler
11from frappe_manager.utils.docker import (
12 SubprocessOutput,
13 parameters_to_options,
14 run_command_with_exit_code,
15)
17T = TypeVar("T")
20def docker_command(
21 subcommand: str,
22 exclude_params: list[str] | None = None,
23 positional_params: list[str] | None = None,
24 use_original_implementation: bool = False,
25) -> Callable[[T], T]:
26 if exclude_params is None:
27 exclude_params = []
28 if positional_params is None:
29 positional_params = []
31 if "stream" not in exclude_params:
32 exclude_params = exclude_params + ["stream"]
34 def decorator(func: Callable) -> Callable:
35 @wraps(func)
36 def wrapper(self, *args, **kwargs):
37 sig = inspect.signature(func)
38 bound = sig.bind(self, *args, **kwargs)
39 bound.apply_defaults()
41 parameters = dict(bound.arguments)
42 stream_param = parameters.get("stream")
44 should_stream = (
45 stream_param
46 if stream_param is not None
47 else (self.output.should_stream_docker if self.output else False)
48 )
50 if use_original_implementation:
51 full_cmd = func(self, *args, **kwargs)
52 else:
53 cmd: list = [subcommand]
55 for param_name in positional_params:
56 if param_name in parameters:
57 param_value = parameters[param_name]
58 if isinstance(param_value, list):
59 cmd.extend(param_value)
60 elif param_value is not None:
61 cmd.append(str(param_value))
63 cmd += parameters_to_options(parameters, exclude=exclude_params + positional_params)
65 full_cmd = self.docker_compose_cmd + cmd
67 if should_stream:
68 from frappe_manager.logger import log
70 logger = log.get_logger()
71 logger.debug(
72 f"[docker_command] Auto-streaming: should_stream={should_stream}, stream_param={stream_param}, output={self.output is not None}",
73 )
75 stream_result = run_command_with_exit_code(full_cmd, stream=True)
76 iterator = cast("Iterable[tuple[str, bytes]]", stream_result)
77 if self.output and stream_param is None:
78 logger.debug("[docker_command] Entering auto-streaming mode with tee()")
79 display_iter, capture_iter = itertools.tee(iterator, 2)
80 logger.debug("[docker_command] Calling output.live_lines()")
81 self.output.live_lines(display_iter, padding=(0, 0, 0, 2))
82 logger.debug("[docker_command] Converting capture_iter to SubprocessOutput")
83 result = SubprocessOutput.from_output(capture_iter)
84 logger.debug(
85 f"[docker_command] Returning SubprocessOutput: exit_code={result.exit_code}, stdout_lines={len(result.stdout)}, stderr_lines={len(result.stderr)}",
86 )
87 return result
88 logger.debug("[docker_command] Returning raw iterator (explicit stream=True)")
89 return iterator
90 result = run_command_with_exit_code(full_cmd, stream=False)
91 return result
93 return wrapper
95 return decorator
98# Docker Compose version 2.18.1
99class DockerComposeWrapper:
100 """
101 This class provides one to one mapping between docker compose cli each function.
102 Only this args have are different use case.
104 Args:
105 stream (bool, optional): A boolean flag indicating whether to stream the output of the command as it runs.
106 If set to True, the output will be displayed in real-time. If set to False, the output will be
107 displayed after the command completes. Defaults to False.
108 """
110 def __init__(self, path: Path, timeout: int = 100, output: OutputHandler | None = None):
111 self.compose_file_path = path.absolute()
112 self.output = output
114 self.docker_compose_cmd = [
115 "docker",
116 "compose",
117 "-f",
118 self.compose_file_path.as_posix(),
119 ]
121 self._context_services: list[str] | None = None
123 @overload
124 def up(
125 self,
126 services: list[str] = [],
127 detach: bool = True,
128 build: bool = False,
129 remove_orphans: bool = False,
130 no_recreate: bool = False,
131 force_recreate: bool = False,
132 always_recreate_deps: bool = False,
133 quiet_pull: bool = False,
134 pull: Literal["missing", "never", "always"] = "missing",
135 *,
136 stream: Literal[True],
137 ) -> Iterable[tuple[str, bytes]]: ...
139 @overload
140 def up(
141 self,
142 services: list[str] = [],
143 detach: bool = True,
144 build: bool = False,
145 remove_orphans: bool = False,
146 no_recreate: bool = False,
147 force_recreate: bool = False,
148 always_recreate_deps: bool = False,
149 quiet_pull: bool = False,
150 pull: Literal["missing", "never", "always"] = "missing",
151 *,
152 stream: Literal[False] = False,
153 ) -> SubprocessOutput: ...
155 @docker_command("up", positional_params=["services"])
156 def up(
157 self,
158 services: list[str] = [],
159 detach: bool = True,
160 build: bool = False,
161 remove_orphans: bool = False,
162 no_recreate: bool = False,
163 force_recreate: bool = False,
164 always_recreate_deps: bool = False,
165 quiet_pull: bool = False,
166 pull: Literal["missing", "never", "always"] = "missing",
167 stream: bool | None = None,
168 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput:
169 """Start services defined in the compose file.
171 The implementation is handled by the @docker_command decorator which:
172 - Adds 'services' as positional arguments
173 - Converts remaining parameters to CLI options
174 - Executes the docker compose up command
175 """
176 # Implementation handled by decorator
178 @overload
179 def down(
180 self,
181 timeout: int = 100,
182 remove_orphans: bool = False,
183 rmi: bool | Literal["all", "local"] = False,
184 volumes: bool = False,
185 dry_run: bool = False,
186 *,
187 stream: Literal[True],
188 ) -> Iterable[tuple[str, bytes]]: ...
190 @overload
191 def down(
192 self,
193 timeout: int = 100,
194 remove_orphans: bool = False,
195 rmi: bool | Literal["all", "local"] = False,
196 volumes: bool = False,
197 dry_run: bool = False,
198 *,
199 stream: Literal[False] = False,
200 ) -> SubprocessOutput: ...
202 @docker_command(subcommand="down", use_original_implementation=True)
203 def down(
204 self,
205 timeout: int = 100,
206 remove_orphans: bool = False,
207 rmi: bool | Literal["all", "local"] = False,
208 volumes: bool = False,
209 dry_run: bool = False,
210 stream: bool | None = None,
211 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput:
212 parameters: dict = locals()
213 parameters["timeout"] = str(timeout)
215 down_cmd: list[str] = ["down"]
216 remove_parameters = ["stream", "self"]
218 if not rmi:
219 remove_parameters.append("rmi")
220 else:
221 parameters["rmi"] = "all"
223 down_cmd += parameters_to_options(parameters, exclude=remove_parameters)
225 return self.docker_compose_cmd + down_cmd
227 @overload
228 def start(
229 self,
230 services: None | list[str] = None,
231 dry_run: bool = False,
232 *,
233 stream: Literal[True],
234 ) -> Iterable[tuple[str, bytes]]: ...
236 @overload
237 def start(
238 self,
239 services: None | list[str] = None,
240 dry_run: bool = False,
241 *,
242 stream: Literal[False] = False,
243 ) -> SubprocessOutput: ...
245 @docker_command(subcommand="start", positional_params=["services"])
246 def start(
247 self,
248 services: None | list[str] = None,
249 dry_run: bool = False,
250 stream: bool | None = None,
251 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput:
252 pass
254 @docker_command(subcommand="restart", use_original_implementation=True)
255 def restart(
256 self,
257 services: None | list[str] = None,
258 dry_run: bool = False,
259 timeout: int = 100,
260 no_deps: bool = False,
261 stream: bool | None = None,
262 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput:
263 parameters: dict = locals()
264 parameters["timeout"] = str(timeout)
266 restart_cmd: list[str] = ["restart"]
267 remove_parameters = ["services", "stream", "self"]
269 restart_cmd += parameters_to_options(parameters, exclude=remove_parameters)
271 if type(services) == list:
272 restart_cmd += services
274 return self.docker_compose_cmd + restart_cmd
276 @docker_command(subcommand="stop", use_original_implementation=True)
277 def stop(
278 self,
279 services: None | list[str] = None,
280 timeout: int = 100,
281 stream: bool | None = None,
282 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput:
283 parameters: dict = locals()
284 parameters["timeout"] = str(timeout)
286 stop_cmd: list[str] = ["stop"]
287 remove_parameters = ["services", "stream", "self"]
289 stop_cmd += parameters_to_options(parameters, exclude=remove_parameters)
291 if type(services) == list:
292 stop_cmd.extend(services)
294 return self.docker_compose_cmd + stop_cmd
296 @overload
297 def exec(
298 self,
299 service: str,
300 command: str,
301 detach: bool = False,
302 env: None | list[str] = None,
303 no_tty: bool = False,
304 privileged: bool = False,
305 user: None | str = None,
306 workdir: None | str = None,
307 stream: Literal[True] = ...,
308 capture_output: bool = True,
309 use_shlex_split: bool = True,
310 ) -> Iterable[tuple[str, bytes]]: ...
312 @overload
313 def exec(
314 self,
315 service: str,
316 command: str,
317 detach: bool = False,
318 env: None | list[str] = None,
319 no_tty: bool = False,
320 privileged: bool = False,
321 user: None | str = None,
322 workdir: None | str = None,
323 stream: Literal[False] = False,
324 capture_output: bool = True,
325 use_shlex_split: bool = True,
326 ) -> SubprocessOutput: ...
328 @docker_command(subcommand="exec", use_original_implementation=True)
329 def exec(
330 self,
331 service: str,
332 command: str,
333 detach: bool = False,
334 env: None | list[str] = None,
335 no_tty: bool = False,
336 privileged: bool = False,
337 user: None | str = None,
338 workdir: None | str = None,
339 stream: bool | None = None,
340 capture_output: bool = True,
341 use_shlex_split: bool = True,
342 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput:
343 parameters: dict = locals()
345 exec_cmd: list[str] = ["exec"]
347 remove_parameters = [
348 "self",
349 "service",
350 "stream",
351 "command",
352 "env",
353 "use_shlex_split",
354 "capture_output",
355 "no_tty",
356 ]
358 exec_cmd += parameters_to_options(parameters, exclude=remove_parameters)
360 if type(env) == list:
361 for i in env:
362 exec_cmd += ["--env", i]
364 exec_cmd += [service]
366 if use_shlex_split:
367 exec_cmd += shlex.split(command, posix=True)
368 else:
369 exec_cmd += [command]
371 return self.docker_compose_cmd + exec_cmd
373 @docker_command(subcommand="ps", use_original_implementation=True)
374 def ps(
375 self,
376 service: None | list[str] = None,
377 dry_run: bool = False,
378 all: bool = False,
379 services: bool = False,
380 filter: None | Literal["paused", "restarting", "removing", "running", "dead", "created", "exited"] = None,
381 format: None | Literal["table", "json"] = None,
382 status: None | list[Literal["paused", "restarting", "removing", "running", "dead", "created", "exited"]] = None,
383 stream: bool | None = None,
384 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput:
385 parameters: dict = locals()
387 ps_cmd: list[str] = ["ps"]
389 remove_parameters = [
390 "self",
391 "service",
392 "stream",
393 "filter",
394 "status",
395 ]
397 if filter:
398 parameters["filter"] = f"status={filter}"
400 ps_cmd += parameters_to_options(parameters, exclude=remove_parameters)
402 if type(status) == list:
403 for i in status:
404 ps_cmd += ["--status", i]
406 if service:
407 ps_cmd += service
409 return self.docker_compose_cmd + ps_cmd
411 @docker_command(subcommand="logs", use_original_implementation=True)
412 def logs(
413 self,
414 services: None | list[str] = None,
415 dry_run: bool = False,
416 follow: bool = False,
417 no_color: bool = False,
418 no_log_prefix: bool = False,
419 since: None | str = None,
420 tail: None | int = None,
421 until: None | int = None,
422 timestamps: bool = False,
423 stream: bool | None = None,
424 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput:
425 parameters: dict = locals()
427 logs_cmd: list[str] = ["logs"]
429 remove_parameters = ["self", "services", "stream"]
431 logs_cmd += parameters_to_options(parameters, exclude=remove_parameters)
433 if services:
434 logs_cmd += services
436 return self.docker_compose_cmd + logs_cmd
438 def ls(
439 self,
440 all: bool = False,
441 dry_run: bool = False,
442 format: Literal["table", "json"] = "table",
443 ):
444 parameters: dict = locals()
446 ls_cmd: list[str] = ["ls"]
448 ls_cmd += parameters_to_options(parameters)
450 try:
451 output = run(self.docker_compose_cmd + ls_cmd, capture_output=True)
452 output = output.stdout.decode()
453 except Exception:
454 return False
456 return output
458 @overload
459 def pull(
460 self,
461 dry_run: bool = False,
462 ignore_buildable: bool = False,
463 ignore_pull_failures: bool = False,
464 include_deps: bool = False,
465 *,
466 stream: Literal[True],
467 ) -> Iterable[tuple[str, bytes]]: ...
469 @overload
470 def pull(
471 self,
472 dry_run: bool = False,
473 ignore_buildable: bool = False,
474 ignore_pull_failures: bool = False,
475 include_deps: bool = False,
476 *,
477 stream: Literal[False] = False,
478 ) -> SubprocessOutput: ...
480 @docker_command("pull")
481 def pull(
482 self,
483 dry_run: bool = False,
484 ignore_buildable: bool = False,
485 ignore_pull_failures: bool = False,
486 include_deps: bool = False,
487 stream: bool | None = None,
488 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput:
489 """Pull service images defined in the compose file.
491 Implementation handled by @docker_command decorator.
492 """
493 # Implementation handled by decorator
495 @docker_command(subcommand="run", use_original_implementation=True)
496 def run(
497 self,
498 service: str,
499 command: str | None = None,
500 name: str | None = None,
501 user: str | None = None,
502 detach: bool = False,
503 rm: bool = False,
504 entrypoint: str | None = None,
505 use_shlex_split: bool = True,
506 stream: bool | None = None,
507 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput:
508 parameters: dict = locals()
509 run_cmd: list = ["run"]
511 remove_parameters = ["stream", "command", "service", "use_shlex_split", "self"]
513 run_cmd += parameters_to_options(parameters, exclude=remove_parameters)
515 run_cmd += [service]
517 if command:
518 if use_shlex_split:
519 run_cmd += shlex.split(command, posix=True)
520 else:
521 run_cmd += [command]
523 return self.docker_compose_cmd + run_cmd
525 @docker_command(subcommand="cp", use_original_implementation=True)
526 def cp(
527 self,
528 source: str,
529 destination: str,
530 source_container: str | None = None,
531 destination_container: str | None = None,
532 archive: bool = False,
533 follow_link: bool = False,
534 stream: bool | None = None,
535 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput:
536 parameters: dict = locals()
538 cp_cmd: list = ["cp"]
540 remove_parameters = [
541 "stream",
542 "source",
543 "destination",
544 "source_container",
545 "destination_container",
546 "self",
547 ]
549 cp_cmd += parameters_to_options(parameters, exclude=remove_parameters)
551 if source_container:
552 source = f"{source_container}:{source}"
554 if destination_container:
555 destination = f"{destination_container}:{destination}"
557 cp_cmd += [f"{source}"]
558 cp_cmd += [f"{destination}"]
560 return self.docker_compose_cmd + cp_cmd
562 # ==================== NEW: Convenience Methods ====================
564 def is_service_running(self, service: str) -> bool:
565 """
566 Check if a service is running.
568 Args:
569 service: Service name to check
571 Returns:
572 True if service is running, False otherwise
573 """
574 try:
575 output = self.ps(service=[service], format="json", stream=False)
576 if not output.stdout:
577 return False
579 import json
581 for line in output.stdout:
582 status = json.loads(line)
583 if status.get("State") == "running":
584 return True
585 return False
586 except Exception:
587 return False
589 def get_service_status(self, service: str) -> dict | None:
590 """
591 Get detailed status for a service.
593 Args:
594 service: Service name
596 Returns:
597 Dict with status info or None if not found
598 """
599 try:
600 output = self.ps(service=[service], format="json", all=True, stream=False)
601 if output.stdout:
602 import json
604 return json.loads(output.stdout[0])
605 return None
606 except Exception:
607 return None
609 def get_all_services_status(self) -> list[dict]:
610 """
611 Get status for all services in compose file.
613 Returns:
614 List of status dicts
615 """
616 statuses = []
617 try:
618 output = self.ps(format="json", all=True, stream=False)
619 import json
621 for line in output.stdout:
622 statuses.append(json.loads(line))
623 except Exception:
624 pass
625 return statuses
627 def wait_for_service(self, service: str, timeout: int = 30, check_interval: float = 0.5) -> bool:
628 """
629 Wait for a service to be running.
631 Args:
632 service: Service name
633 timeout: Max seconds to wait
634 check_interval: Seconds between checks
636 Returns:
637 True if service started, False if timeout
638 """
639 import time
641 start_time = time.time()
643 while time.time() - start_time < timeout:
644 if self.is_service_running(service):
645 return True
646 time.sleep(check_interval)
648 return False
650 def exec_capture(self, service: str, command: str, **kwargs) -> SubprocessOutput:
651 """
652 Execute command and capture output (stream=False wrapper).
654 Args:
655 service: Service name
656 command: Command to execute
657 **kwargs: Additional exec arguments
659 Returns:
660 SubprocessOutput with stdout/stderr/exit_code
661 """
662 kwargs["stream"] = False
663 return self.exec(service=service, command=command, **kwargs)
665 def exec_stream(self, service: str, command: str, **kwargs) -> Iterable[tuple[str, bytes]]:
666 """
667 Execute command and stream output (stream=True wrapper).
669 Args:
670 service: Service name
671 command: Command to execute
672 **kwargs: Additional exec arguments
674 Returns:
675 Iterator of (source, line) tuples
676 """
677 kwargs["stream"] = True
678 return self.exec(service=service, command=command, **kwargs)
680 # Context Manager Support
682 def __enter__(self) -> "DockerComposeWrapper":
683 """
684 Enter context manager - returns self for use in 'with' statement.
686 Returns:
687 Self for method chaining
689 Example:
690 with DockerComposeWrapper(path).with_auto_cleanup() as compose:
691 compose.up(detach=True, stream=False)
692 # Work with compose...
693 # Auto-cleanup on exit
694 """
695 return self
697 def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
698 """
699 Exit context manager - performs cleanup if services are registered.
701 Args:
702 exc_type: Exception type (if any)
703 exc_val: Exception value (if any)
704 exc_tb: Exception traceback (if any)
706 Returns:
707 False (does not suppress exceptions)
709 Note:
710 If services were registered via with_auto_cleanup(), this will
711 call down() to stop and remove containers. Cleanup errors are
712 silently ignored (best-effort cleanup).
713 """
714 if self._context_services is not None:
715 try:
716 # Best effort cleanup - don't fail if cleanup fails
717 self.down(remove_orphans=True, stream=False)
718 except Exception:
719 pass # Silently ignore cleanup errors
721 # Don't suppress exceptions - return False
722 return False
724 def with_auto_cleanup(self, services: list[str] | None = None) -> "DockerComposeWrapper":
725 """
726 Register services for automatic cleanup when context exits.
728 Args:
729 services: List of service names to cleanup. If None, all services
730 in the compose file will be cleaned up.
732 Returns:
733 Self for method chaining
735 Example:
736 # Cleanup all services
737 with DockerComposeWrapper(path).with_auto_cleanup() as compose:
738 compose.up(detach=True, stream=False)
739 # Auto-cleanup on exit
741 # Cleanup specific services
742 with DockerComposeWrapper(path).with_auto_cleanup(['redis', 'db']) as compose:
743 compose.up(services=['redis', 'db'], detach=True, stream=False)
744 # Auto-cleanup on exit
746 Note:
747 Must be used with context manager (with statement).
748 Cleanup happens even if an exception occurs.
749 """
750 self._context_services = services if services is not None else []
751 return self