Coverage for frappe_manager / docker / docker_compose.py: 52%

234 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1import inspect 

2import itertools 

3import shlex 

4from collections.abc import Callable, Iterable 

5from functools import wraps 

6from pathlib import Path 

7from subprocess import run 

8from typing import Literal, TypeVar, cast, overload 

9 

10from frappe_manager.output_manager.base import OutputHandler 

11from frappe_manager.utils.docker import ( 

12 SubprocessOutput, 

13 parameters_to_options, 

14 run_command_with_exit_code, 

15) 

16 

17T = TypeVar("T") 

18 

19 

20def docker_command( 

21 subcommand: str, 

22 exclude_params: list[str] | None = None, 

23 positional_params: list[str] | None = None, 

24 use_original_implementation: bool = False, 

25) -> Callable[[T], T]: 

26 if exclude_params is None: 

27 exclude_params = [] 

28 if positional_params is None: 

29 positional_params = [] 

30 

31 if "stream" not in exclude_params: 

32 exclude_params = exclude_params + ["stream"] 

33 

34 def decorator(func: Callable) -> Callable: 

35 @wraps(func) 

36 def wrapper(self, *args, **kwargs): 

37 sig = inspect.signature(func) 

38 bound = sig.bind(self, *args, **kwargs) 

39 bound.apply_defaults() 

40 

41 parameters = dict(bound.arguments) 

42 stream_param = parameters.get("stream") 

43 

44 should_stream = ( 

45 stream_param 

46 if stream_param is not None 

47 else (self.output.should_stream_docker if self.output else False) 

48 ) 

49 

50 if use_original_implementation: 

51 full_cmd = func(self, *args, **kwargs) 

52 else: 

53 cmd: list = [subcommand] 

54 

55 for param_name in positional_params: 

56 if param_name in parameters: 

57 param_value = parameters[param_name] 

58 if isinstance(param_value, list): 

59 cmd.extend(param_value) 

60 elif param_value is not None: 

61 cmd.append(str(param_value)) 

62 

63 cmd += parameters_to_options(parameters, exclude=exclude_params + positional_params) 

64 

65 full_cmd = self.docker_compose_cmd + cmd 

66 

67 if should_stream: 

68 from frappe_manager.logger import log 

69 

70 logger = log.get_logger() 

71 logger.debug( 

72 f"[docker_command] Auto-streaming: should_stream={should_stream}, stream_param={stream_param}, output={self.output is not None}", 

73 ) 

74 

75 stream_result = run_command_with_exit_code(full_cmd, stream=True) 

76 iterator = cast("Iterable[tuple[str, bytes]]", stream_result) 

77 if self.output and stream_param is None: 

78 logger.debug("[docker_command] Entering auto-streaming mode with tee()") 

79 display_iter, capture_iter = itertools.tee(iterator, 2) 

80 logger.debug("[docker_command] Calling output.live_lines()") 

81 self.output.live_lines(display_iter, padding=(0, 0, 0, 2)) 

82 logger.debug("[docker_command] Converting capture_iter to SubprocessOutput") 

83 result = SubprocessOutput.from_output(capture_iter) 

84 logger.debug( 

85 f"[docker_command] Returning SubprocessOutput: exit_code={result.exit_code}, stdout_lines={len(result.stdout)}, stderr_lines={len(result.stderr)}", 

86 ) 

87 return result 

88 logger.debug("[docker_command] Returning raw iterator (explicit stream=True)") 

89 return iterator 

90 result = run_command_with_exit_code(full_cmd, stream=False) 

91 return result 

92 

93 return wrapper 

94 

95 return decorator 

96 

97 

98# Docker Compose version 2.18.1 

99class DockerComposeWrapper: 

100 """ 

101 This class provides one to one mapping between docker compose cli each function. 

102 Only this args have are different use case. 

103 

104 Args: 

105 stream (bool, optional): A boolean flag indicating whether to stream the output of the command as it runs. 

106 If set to True, the output will be displayed in real-time. If set to False, the output will be 

107 displayed after the command completes. Defaults to False. 

108 """ 

109 

110 def __init__(self, path: Path, timeout: int = 100, output: OutputHandler | None = None): 

111 self.compose_file_path = path.absolute() 

112 self.output = output 

113 

114 self.docker_compose_cmd = [ 

115 "docker", 

116 "compose", 

117 "-f", 

118 self.compose_file_path.as_posix(), 

119 ] 

120 

121 self._context_services: list[str] | None = None 

122 

123 @overload 

124 def up( 

125 self, 

126 services: list[str] = [], 

127 detach: bool = True, 

128 build: bool = False, 

129 remove_orphans: bool = False, 

130 no_recreate: bool = False, 

131 force_recreate: bool = False, 

132 always_recreate_deps: bool = False, 

133 quiet_pull: bool = False, 

134 pull: Literal["missing", "never", "always"] = "missing", 

135 *, 

136 stream: Literal[True], 

137 ) -> Iterable[tuple[str, bytes]]: ... 

138 

139 @overload 

140 def up( 

141 self, 

142 services: list[str] = [], 

143 detach: bool = True, 

144 build: bool = False, 

145 remove_orphans: bool = False, 

146 no_recreate: bool = False, 

147 force_recreate: bool = False, 

148 always_recreate_deps: bool = False, 

149 quiet_pull: bool = False, 

150 pull: Literal["missing", "never", "always"] = "missing", 

151 *, 

152 stream: Literal[False] = False, 

153 ) -> SubprocessOutput: ... 

154 

155 @docker_command("up", positional_params=["services"]) 

156 def up( 

157 self, 

158 services: list[str] = [], 

159 detach: bool = True, 

160 build: bool = False, 

161 remove_orphans: bool = False, 

162 no_recreate: bool = False, 

163 force_recreate: bool = False, 

164 always_recreate_deps: bool = False, 

165 quiet_pull: bool = False, 

166 pull: Literal["missing", "never", "always"] = "missing", 

167 stream: bool | None = None, 

168 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput: 

169 """Start services defined in the compose file. 

170 

171 The implementation is handled by the @docker_command decorator which: 

172 - Adds 'services' as positional arguments 

173 - Converts remaining parameters to CLI options 

174 - Executes the docker compose up command 

175 """ 

176 # Implementation handled by decorator 

177 

178 @overload 

179 def down( 

180 self, 

181 timeout: int = 100, 

182 remove_orphans: bool = False, 

183 rmi: bool | Literal["all", "local"] = False, 

184 volumes: bool = False, 

185 dry_run: bool = False, 

186 *, 

187 stream: Literal[True], 

188 ) -> Iterable[tuple[str, bytes]]: ... 

189 

190 @overload 

191 def down( 

192 self, 

193 timeout: int = 100, 

194 remove_orphans: bool = False, 

195 rmi: bool | Literal["all", "local"] = False, 

196 volumes: bool = False, 

197 dry_run: bool = False, 

198 *, 

199 stream: Literal[False] = False, 

200 ) -> SubprocessOutput: ... 

201 

202 @docker_command(subcommand="down", use_original_implementation=True) 

203 def down( 

204 self, 

205 timeout: int = 100, 

206 remove_orphans: bool = False, 

207 rmi: bool | Literal["all", "local"] = False, 

208 volumes: bool = False, 

209 dry_run: bool = False, 

210 stream: bool | None = None, 

211 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput: 

212 parameters: dict = locals() 

213 parameters["timeout"] = str(timeout) 

214 

215 down_cmd: list[str] = ["down"] 

216 remove_parameters = ["stream", "self"] 

217 

218 if not rmi: 

219 remove_parameters.append("rmi") 

220 else: 

221 parameters["rmi"] = "all" 

222 

223 down_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

224 

225 return self.docker_compose_cmd + down_cmd 

226 

227 @overload 

228 def start( 

229 self, 

230 services: None | list[str] = None, 

231 dry_run: bool = False, 

232 *, 

233 stream: Literal[True], 

234 ) -> Iterable[tuple[str, bytes]]: ... 

235 

236 @overload 

237 def start( 

238 self, 

239 services: None | list[str] = None, 

240 dry_run: bool = False, 

241 *, 

242 stream: Literal[False] = False, 

243 ) -> SubprocessOutput: ... 

244 

245 @docker_command(subcommand="start", positional_params=["services"]) 

246 def start( 

247 self, 

248 services: None | list[str] = None, 

249 dry_run: bool = False, 

250 stream: bool | None = None, 

251 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput: 

252 pass 

253 

254 @docker_command(subcommand="restart", use_original_implementation=True) 

255 def restart( 

256 self, 

257 services: None | list[str] = None, 

258 dry_run: bool = False, 

259 timeout: int = 100, 

260 no_deps: bool = False, 

261 stream: bool | None = None, 

262 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput: 

263 parameters: dict = locals() 

264 parameters["timeout"] = str(timeout) 

265 

266 restart_cmd: list[str] = ["restart"] 

267 remove_parameters = ["services", "stream", "self"] 

268 

269 restart_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

270 

271 if type(services) == list: 

272 restart_cmd += services 

273 

274 return self.docker_compose_cmd + restart_cmd 

275 

276 @docker_command(subcommand="stop", use_original_implementation=True) 

277 def stop( 

278 self, 

279 services: None | list[str] = None, 

280 timeout: int = 100, 

281 stream: bool | None = None, 

282 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput: 

283 parameters: dict = locals() 

284 parameters["timeout"] = str(timeout) 

285 

286 stop_cmd: list[str] = ["stop"] 

287 remove_parameters = ["services", "stream", "self"] 

288 

289 stop_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

290 

291 if type(services) == list: 

292 stop_cmd.extend(services) 

293 

294 return self.docker_compose_cmd + stop_cmd 

295 

296 @overload 

297 def exec( 

298 self, 

299 service: str, 

300 command: str, 

301 detach: bool = False, 

302 env: None | list[str] = None, 

303 no_tty: bool = False, 

304 privileged: bool = False, 

305 user: None | str = None, 

306 workdir: None | str = None, 

307 stream: Literal[True] = ..., 

308 capture_output: bool = True, 

309 use_shlex_split: bool = True, 

310 ) -> Iterable[tuple[str, bytes]]: ... 

311 

312 @overload 

313 def exec( 

314 self, 

315 service: str, 

316 command: str, 

317 detach: bool = False, 

318 env: None | list[str] = None, 

319 no_tty: bool = False, 

320 privileged: bool = False, 

321 user: None | str = None, 

322 workdir: None | str = None, 

323 stream: Literal[False] = False, 

324 capture_output: bool = True, 

325 use_shlex_split: bool = True, 

326 ) -> SubprocessOutput: ... 

327 

328 @docker_command(subcommand="exec", use_original_implementation=True) 

329 def exec( 

330 self, 

331 service: str, 

332 command: str, 

333 detach: bool = False, 

334 env: None | list[str] = None, 

335 no_tty: bool = False, 

336 privileged: bool = False, 

337 user: None | str = None, 

338 workdir: None | str = None, 

339 stream: bool | None = None, 

340 capture_output: bool = True, 

341 use_shlex_split: bool = True, 

342 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput: 

343 parameters: dict = locals() 

344 

345 exec_cmd: list[str] = ["exec"] 

346 

347 remove_parameters = [ 

348 "self", 

349 "service", 

350 "stream", 

351 "command", 

352 "env", 

353 "use_shlex_split", 

354 "capture_output", 

355 "no_tty", 

356 ] 

357 

358 exec_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

359 

360 if type(env) == list: 

361 for i in env: 

362 exec_cmd += ["--env", i] 

363 

364 exec_cmd += [service] 

365 

366 if use_shlex_split: 

367 exec_cmd += shlex.split(command, posix=True) 

368 else: 

369 exec_cmd += [command] 

370 

371 return self.docker_compose_cmd + exec_cmd 

372 

373 @docker_command(subcommand="ps", use_original_implementation=True) 

374 def ps( 

375 self, 

376 service: None | list[str] = None, 

377 dry_run: bool = False, 

378 all: bool = False, 

379 services: bool = False, 

380 filter: None | Literal["paused", "restarting", "removing", "running", "dead", "created", "exited"] = None, 

381 format: None | Literal["table", "json"] = None, 

382 status: None | list[Literal["paused", "restarting", "removing", "running", "dead", "created", "exited"]] = None, 

383 stream: bool | None = None, 

384 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput: 

385 parameters: dict = locals() 

386 

387 ps_cmd: list[str] = ["ps"] 

388 

389 remove_parameters = [ 

390 "self", 

391 "service", 

392 "stream", 

393 "filter", 

394 "status", 

395 ] 

396 

397 if filter: 

398 parameters["filter"] = f"status={filter}" 

399 

400 ps_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

401 

402 if type(status) == list: 

403 for i in status: 

404 ps_cmd += ["--status", i] 

405 

406 if service: 

407 ps_cmd += service 

408 

409 return self.docker_compose_cmd + ps_cmd 

410 

411 @docker_command(subcommand="logs", use_original_implementation=True) 

412 def logs( 

413 self, 

414 services: None | list[str] = None, 

415 dry_run: bool = False, 

416 follow: bool = False, 

417 no_color: bool = False, 

418 no_log_prefix: bool = False, 

419 since: None | str = None, 

420 tail: None | int = None, 

421 until: None | int = None, 

422 timestamps: bool = False, 

423 stream: bool | None = None, 

424 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput: 

425 parameters: dict = locals() 

426 

427 logs_cmd: list[str] = ["logs"] 

428 

429 remove_parameters = ["self", "services", "stream"] 

430 

431 logs_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

432 

433 if services: 

434 logs_cmd += services 

435 

436 return self.docker_compose_cmd + logs_cmd 

437 

438 def ls( 

439 self, 

440 all: bool = False, 

441 dry_run: bool = False, 

442 format: Literal["table", "json"] = "table", 

443 ): 

444 parameters: dict = locals() 

445 

446 ls_cmd: list[str] = ["ls"] 

447 

448 ls_cmd += parameters_to_options(parameters) 

449 

450 try: 

451 output = run(self.docker_compose_cmd + ls_cmd, capture_output=True) 

452 output = output.stdout.decode() 

453 except Exception: 

454 return False 

455 

456 return output 

457 

458 @overload 

459 def pull( 

460 self, 

461 dry_run: bool = False, 

462 ignore_buildable: bool = False, 

463 ignore_pull_failures: bool = False, 

464 include_deps: bool = False, 

465 *, 

466 stream: Literal[True], 

467 ) -> Iterable[tuple[str, bytes]]: ... 

468 

469 @overload 

470 def pull( 

471 self, 

472 dry_run: bool = False, 

473 ignore_buildable: bool = False, 

474 ignore_pull_failures: bool = False, 

475 include_deps: bool = False, 

476 *, 

477 stream: Literal[False] = False, 

478 ) -> SubprocessOutput: ... 

479 

480 @docker_command("pull") 

481 def pull( 

482 self, 

483 dry_run: bool = False, 

484 ignore_buildable: bool = False, 

485 ignore_pull_failures: bool = False, 

486 include_deps: bool = False, 

487 stream: bool | None = None, 

488 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput: 

489 """Pull service images defined in the compose file. 

490 

491 Implementation handled by @docker_command decorator. 

492 """ 

493 # Implementation handled by decorator 

494 

495 @docker_command(subcommand="run", use_original_implementation=True) 

496 def run( 

497 self, 

498 service: str, 

499 command: str | None = None, 

500 name: str | None = None, 

501 user: str | None = None, 

502 detach: bool = False, 

503 rm: bool = False, 

504 entrypoint: str | None = None, 

505 use_shlex_split: bool = True, 

506 stream: bool | None = None, 

507 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput: 

508 parameters: dict = locals() 

509 run_cmd: list = ["run"] 

510 

511 remove_parameters = ["stream", "command", "service", "use_shlex_split", "self"] 

512 

513 run_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

514 

515 run_cmd += [service] 

516 

517 if command: 

518 if use_shlex_split: 

519 run_cmd += shlex.split(command, posix=True) 

520 else: 

521 run_cmd += [command] 

522 

523 return self.docker_compose_cmd + run_cmd 

524 

525 @docker_command(subcommand="cp", use_original_implementation=True) 

526 def cp( 

527 self, 

528 source: str, 

529 destination: str, 

530 source_container: str | None = None, 

531 destination_container: str | None = None, 

532 archive: bool = False, 

533 follow_link: bool = False, 

534 stream: bool | None = None, 

535 ) -> Iterable[tuple[str, bytes]] | SubprocessOutput: 

536 parameters: dict = locals() 

537 

538 cp_cmd: list = ["cp"] 

539 

540 remove_parameters = [ 

541 "stream", 

542 "source", 

543 "destination", 

544 "source_container", 

545 "destination_container", 

546 "self", 

547 ] 

548 

549 cp_cmd += parameters_to_options(parameters, exclude=remove_parameters) 

550 

551 if source_container: 

552 source = f"{source_container}:{source}" 

553 

554 if destination_container: 

555 destination = f"{destination_container}:{destination}" 

556 

557 cp_cmd += [f"{source}"] 

558 cp_cmd += [f"{destination}"] 

559 

560 return self.docker_compose_cmd + cp_cmd 

561 

562 # ==================== NEW: Convenience Methods ==================== 

563 

564 def is_service_running(self, service: str) -> bool: 

565 """ 

566 Check if a service is running. 

567 

568 Args: 

569 service: Service name to check 

570 

571 Returns: 

572 True if service is running, False otherwise 

573 """ 

574 try: 

575 output = self.ps(service=[service], format="json", stream=False) 

576 if not output.stdout: 

577 return False 

578 

579 import json 

580 

581 for line in output.stdout: 

582 status = json.loads(line) 

583 if status.get("State") == "running": 

584 return True 

585 return False 

586 except Exception: 

587 return False 

588 

589 def get_service_status(self, service: str) -> dict | None: 

590 """ 

591 Get detailed status for a service. 

592 

593 Args: 

594 service: Service name 

595 

596 Returns: 

597 Dict with status info or None if not found 

598 """ 

599 try: 

600 output = self.ps(service=[service], format="json", all=True, stream=False) 

601 if output.stdout: 

602 import json 

603 

604 return json.loads(output.stdout[0]) 

605 return None 

606 except Exception: 

607 return None 

608 

609 def get_all_services_status(self) -> list[dict]: 

610 """ 

611 Get status for all services in compose file. 

612 

613 Returns: 

614 List of status dicts 

615 """ 

616 statuses = [] 

617 try: 

618 output = self.ps(format="json", all=True, stream=False) 

619 import json 

620 

621 for line in output.stdout: 

622 statuses.append(json.loads(line)) 

623 except Exception: 

624 pass 

625 return statuses 

626 

627 def wait_for_service(self, service: str, timeout: int = 30, check_interval: float = 0.5) -> bool: 

628 """ 

629 Wait for a service to be running. 

630 

631 Args: 

632 service: Service name 

633 timeout: Max seconds to wait 

634 check_interval: Seconds between checks 

635 

636 Returns: 

637 True if service started, False if timeout 

638 """ 

639 import time 

640 

641 start_time = time.time() 

642 

643 while time.time() - start_time < timeout: 

644 if self.is_service_running(service): 

645 return True 

646 time.sleep(check_interval) 

647 

648 return False 

649 

650 def exec_capture(self, service: str, command: str, **kwargs) -> SubprocessOutput: 

651 """ 

652 Execute command and capture output (stream=False wrapper). 

653 

654 Args: 

655 service: Service name 

656 command: Command to execute 

657 **kwargs: Additional exec arguments 

658 

659 Returns: 

660 SubprocessOutput with stdout/stderr/exit_code 

661 """ 

662 kwargs["stream"] = False 

663 return self.exec(service=service, command=command, **kwargs) 

664 

665 def exec_stream(self, service: str, command: str, **kwargs) -> Iterable[tuple[str, bytes]]: 

666 """ 

667 Execute command and stream output (stream=True wrapper). 

668 

669 Args: 

670 service: Service name 

671 command: Command to execute 

672 **kwargs: Additional exec arguments 

673 

674 Returns: 

675 Iterator of (source, line) tuples 

676 """ 

677 kwargs["stream"] = True 

678 return self.exec(service=service, command=command, **kwargs) 

679 

680 # Context Manager Support 

681 

682 def __enter__(self) -> "DockerComposeWrapper": 

683 """ 

684 Enter context manager - returns self for use in 'with' statement. 

685 

686 Returns: 

687 Self for method chaining 

688 

689 Example: 

690 with DockerComposeWrapper(path).with_auto_cleanup() as compose: 

691 compose.up(detach=True, stream=False) 

692 # Work with compose... 

693 # Auto-cleanup on exit 

694 """ 

695 return self 

696 

697 def __exit__(self, exc_type, exc_val, exc_tb) -> bool: 

698 """ 

699 Exit context manager - performs cleanup if services are registered. 

700 

701 Args: 

702 exc_type: Exception type (if any) 

703 exc_val: Exception value (if any) 

704 exc_tb: Exception traceback (if any) 

705 

706 Returns: 

707 False (does not suppress exceptions) 

708 

709 Note: 

710 If services were registered via with_auto_cleanup(), this will 

711 call down() to stop and remove containers. Cleanup errors are 

712 silently ignored (best-effort cleanup). 

713 """ 

714 if self._context_services is not None: 

715 try: 

716 # Best effort cleanup - don't fail if cleanup fails 

717 self.down(remove_orphans=True, stream=False) 

718 except Exception: 

719 pass # Silently ignore cleanup errors 

720 

721 # Don't suppress exceptions - return False 

722 return False 

723 

724 def with_auto_cleanup(self, services: list[str] | None = None) -> "DockerComposeWrapper": 

725 """ 

726 Register services for automatic cleanup when context exits. 

727 

728 Args: 

729 services: List of service names to cleanup. If None, all services 

730 in the compose file will be cleaned up. 

731 

732 Returns: 

733 Self for method chaining 

734 

735 Example: 

736 # Cleanup all services 

737 with DockerComposeWrapper(path).with_auto_cleanup() as compose: 

738 compose.up(detach=True, stream=False) 

739 # Auto-cleanup on exit 

740 

741 # Cleanup specific services 

742 with DockerComposeWrapper(path).with_auto_cleanup(['redis', 'db']) as compose: 

743 compose.up(services=['redis', 'db'], detach=True, stream=False) 

744 # Auto-cleanup on exit 

745 

746 Note: 

747 Must be used with context manager (with statement). 

748 Cleanup happens even if an exception occurs. 

749 """ 

750 self._context_services = services if services is not None else [] 

751 return self