Coverage for frappe_manager / commands / self / stop.py: 25%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1import typer 

2from typer_examples import example 

3 

4from frappe_manager import CLI_BENCHES_DIRECTORY 

5from frappe_manager.output_manager import get_global_output_handler, spinner 

6from frappe_manager.services_manager import ServicesEnum 

7from frappe_manager.services_manager.services import ServicesManager 

8from frappe_manager.site_manager.bench_service import BenchService 

9 

10 

11@example( 

12 "Stop everything (all benches + global services)", 

13 "", 

14 detail="Stops all running benches and all global services (global-db, global-nginx-proxy).", 

15) 

16@example( 

17 "Stop only global services", 

18 "--global-only", 

19 detail="Stops only global services, leaves benches running.", 

20) 

21@example( 

22 "Stop only benches", 

23 "--benches-only", 

24 detail="Stops only benches, leaves global services running.", 

25) 

26def stop( 

27 ctx: typer.Context, 

28 global_only: bool = typer.Option(False, "--global-only", help="Stop only global services"), 

29 benches_only: bool = typer.Option(False, "--benches-only", help="Stop only benches"), 

30): 

31 """ 

32 Stop everything managed by FM. 

33 

34 Stops all running benches and global services (global-db, global-nginx-proxy). 

35 Use --global-only or --benches-only to stop only a subset. 

36 """ 

37 services_manager: ServicesManager = ctx.obj["services"] 

38 verbose = ctx.obj["verbose"] 

39 output = get_global_output_handler() 

40 

41 if global_only and benches_only: 

42 output.print("[red]Error:[/red] --global-only and --benches-only are mutually exclusive. Choose one.") 

43 raise typer.Exit(code=1) 

44 

45 stop_global = not benches_only 

46 stop_benches = not global_only 

47 

48 if stop_global: 

49 with spinner(output, "Stopping global services"): 

50 for service in ServicesEnum: 

51 if service == ServicesEnum.all: 

52 continue 

53 

54 if not services_manager.is_service_running(service.value): 

55 output.print(f"Skipping already stopped service {service.value}") 

56 continue 

57 

58 services_manager.stop_service(services=[service.value]) 

59 output.print(f"Stopped service {service.value}") 

60 

61 if stop_benches: 

62 bench_service = BenchService(CLI_BENCHES_DIRECTORY, services_manager, verbose=verbose, output_handler=output) 

63 bench_names = bench_service.get_bench_names() 

64 

65 if not bench_names: 

66 output.print("No benches found") 

67 else: 

68 for bench_name in bench_names: 

69 try: 

70 bench = bench_service.get_bench(bench_name, workers_check=False, admin_tools_check=False) 

71 if bench.running: 

72 with spinner(output, f"Stopping {bench_name}"): 

73 bench.stop() 

74 output.print(f"Stopped bench {bench_name}") 

75 else: 

76 output.print(f"Skipping already stopped bench {bench_name}") 

77 except Exception as e: 

78 output.warning(f"Failed to stop {bench_name}: {e}")