Coverage for frappe_manager / site_manager / modules / bench_workers.py: 24%
158 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1"""
2Bench Workers Module
4Provides worker management for the bench including:
5- Worker compose file generation and management (BenchWorkers)
6- Worker coordination and lifecycle (BenchWorkerCoordinator)
7- Supervisor configuration backup and restore
8- Worker startup checks and service restarts
9"""
11from copy import deepcopy
12from typing import TYPE_CHECKING
14from frappe_manager import SiteServicesEnum
15from frappe_manager.docker import ComposeFile, DockerClient, DockerException
16from frappe_manager.migration_manager.backup_manager import BackupManager
17from frappe_manager.output_manager import OutputHandler
18from frappe_manager.output_manager.rich_output import RichOutputHandler
19from frappe_manager.site_manager.exceptions import (
20 BenchOperationException,
21 BenchWorkersSupervisorConfigurtionNotFoundError,
22)
23from frappe_manager.utils.helpers import get_container_name_prefix, get_current_fm_version
24from frappe_manager.utils.site import is_default_worker
26if TYPE_CHECKING:
27 from frappe_manager.site_manager.site import Bench
30class BenchWorkers:
31 """
32 Manages worker compose file generation and configuration.
34 Responsibilities:
35 - Generate worker compose files based on supervisor configuration
36 - Manage worker service definitions
37 - Handle custom and default workers
38 - Clean up worker compose files when no longer needed
39 """
41 def __init__(self, bench: "Bench", verbose: bool = True, output_handler: OutputHandler | None = None):
42 """
43 Initialize BenchWorkers.
45 Args:
46 bench: The Bench instance
47 verbose: Whether to show verbose output
48 output_handler: Handler for output operations
49 """
50 self.bench = bench
51 self.compose_path = self.bench.path / "docker-compose.workers.yml"
52 self.config_dir = self.bench.path / "workspace" / "frappe-bench" / "config"
53 self.supervisor_config_path = self.config_dir / "supervisor.conf"
54 self.output = output_handler or RichOutputHandler()
55 self.compose_file_manager = ComposeFile(self.compose_path, template_name="docker-compose.workers.tmpl")
56 self.docker_client = DockerClient(compose_file_path=self.compose_path, output=self.output)
58 def get_expected_workers(
59 self,
60 include_default_workers: bool = True,
61 include_custom_workers: bool = True,
62 ) -> list[str]:
63 """
64 Get list of expected workers from supervisor configuration.
66 Args:
67 include_default_workers: Whether to include default workers (short, long)
68 include_custom_workers: Whether to include custom workers
70 Returns:
71 Sorted list of worker service names
73 Raises:
74 BenchWorkersSupervisorConfigurtionNotFoundError: If no worker configs found
75 """
76 self.output.change_head("Checking workers info")
78 workers_supervisor_conf_paths = []
80 for file_path in self.config_dir.iterdir():
81 file_path_abs = str(file_path.absolute())
82 if file_path.is_file():
83 if file_path_abs.endswith(".workers.fm.supervisor.conf"):
84 workers_supervisor_conf_paths.append(file_path)
86 if len(workers_supervisor_conf_paths) == 0:
87 raise BenchWorkersSupervisorConfigurtionNotFoundError(self.bench.name, str(self.config_dir))
89 workers_expected_service_names = []
91 for worker_name in workers_supervisor_conf_paths:
92 worker_name = worker_name.name
93 worker_name = worker_name.replace("frappe-bench-frappe-", "")
94 worker_name = worker_name.replace(".workers.fm.supervisor.conf", "")
96 if is_default_worker(worker_name):
97 if include_default_workers:
98 workers_expected_service_names.append(worker_name)
99 elif include_custom_workers:
100 workers_expected_service_names.append(worker_name)
102 workers_expected_service_names.sort()
104 return workers_expected_service_names
106 def is_new_workers_added(self, include_default_workers: bool = False) -> bool:
107 """
108 Check if worker configuration has changed.
110 Args:
111 include_default_workers: Whether to include default workers in comparison
113 Returns:
114 True if workers configuration matches expected, False otherwise
115 """
116 if not self.compose_file_manager.is_template_loaded:
117 prev_workers = self.compose_file_manager.get_services_list()
118 prev_workers.sort()
119 expected_workers = self.get_expected_workers(include_default_workers=include_default_workers)
121 # get custom workers from common_site_config.json
122 common_site_config_data = self.bench.get_common_bench_config()
124 if "workers" in common_site_config_data:
125 custom_workers: list[str] = common_site_config_data["workers"].keys()
126 for worker in custom_workers:
127 worker = f"{worker}-worker"
128 if worker not in prev_workers:
129 return False
130 return prev_workers == expected_workers
132 return False
134 def generate_compose(self, include_default_workers: bool = True, include_custom_workers: bool = True) -> bool:
135 """
136 Generate worker compose file from template.
138 Args:
139 include_default_workers: Whether to include default workers
140 include_custom_workers: Whether to include custom workers
142 Returns:
143 True if workers were configured and need starting, False otherwise
144 """
145 self.output.change_head("Generating workers compose configuration")
147 if not self.compose_path.exists():
148 self.output.print("Workers compose not present. Generating new configuration..")
149 else:
150 self.output.print("Workers configuration changed. Recreating compose..")
152 self.compose_file_manager.yml = self.compose_file_manager.load_template()
154 template_worker_config = self.compose_file_manager.yml["services"]["worker-name"]
155 del self.compose_file_manager.yml["services"]["worker-name"]
157 workers_expected_service_names = self.get_expected_workers(
158 include_default_workers=include_default_workers,
159 include_custom_workers=include_custom_workers,
160 )
162 if len(workers_expected_service_names) > 0:
163 import os
165 for worker in workers_expected_service_names:
166 worker_config = deepcopy(template_worker_config)
167 worker_config["environment"]["USERID"] = os.getuid()
168 worker_config["environment"]["USERGROUP"] = os.getgid()
169 worker_config["environment"]["WORKER_NAME"] = worker
170 self.compose_file_manager.yml["services"][worker] = worker_config
172 self.compose_file_manager.with_prefix(
173 get_container_name_prefix(self.bench.name),
174 "site-network",
175 ).with_version(get_current_fm_version()).with_restart(self.bench.bench_config.restart_policy.value).commit()
177 self.output.print(f"{' '.join(workers_expected_service_names)} configurations generated")
178 return True
180 if self.compose_file_manager.exists():
181 self.output.print("No workers found, cleaning up existing configuration")
182 self.docker_client.compose.down(remove_orphans=True, volumes=False, timeout=5, stream=True)
183 self.compose_file_manager.compose_path.unlink()
185 return False
188class BenchWorkerCoordinator:
189 """
190 Coordinates worker processes for a bench.
192 Responsibilities:
193 - Sync worker compose files
194 - Backup and restore worker supervisor configs
195 - Ensure workers are running
196 - Restart worker services
197 """
199 def __init__(
200 self,
201 bench_name: str,
202 workers: "BenchWorkers",
203 supervisor,
204 bench_path,
205 restart_supervisor_service_fn,
206 is_running_fn,
207 docker_ops=None,
208 output_handler: OutputHandler | None = None,
209 ):
210 """
211 Initialize BenchWorkerCoordinator module.
213 Args:
214 bench_name: Name of the bench
215 workers: BenchWorkers instance (from workers_manager)
216 supervisor: BenchSupervisor instance for supervisor setup
217 bench_path: Path to bench directory
218 restart_supervisor_service_fn: Callable to restart supervisor service
219 is_running_fn: Callable to check if bench is running
220 docker_ops: BenchDockerOps instance for docker operations
221 output_handler: Handler for output operations
222 """
223 self.bench_name = bench_name
224 self.workers = workers
225 self.supervisor = supervisor
226 self.bench_path = bench_path
227 self.restart_supervisor_service = restart_supervisor_service_fn
228 self.is_running = is_running_fn
229 self.docker_ops = docker_ops
230 self.output = output_handler or RichOutputHandler()
232 def sync_workers_compose(
233 self,
234 force_recreate: bool = False,
235 setup_supervisor: bool = True,
236 include_default_workers: bool = True,
237 include_custom_workers: bool = True,
238 ):
239 """
240 Synchronize workers compose file and optionally setup supervisor.
242 Args:
243 force_recreate: Force recreate containers
244 setup_supervisor: Whether to setup supervisor
245 include_default_workers: Include default workers
246 include_custom_workers: Include custom workers
247 """
248 if setup_supervisor:
249 workers_backup_manager = self.backup_workers_supervisor_conf()
250 try:
251 self.supervisor.setup_supervisor(self.bench_path, force=True)
252 except BenchOperationException as e:
253 self.backup_restore_workers_supervisor(workers_backup_manager)
255 are_workers_not_changed = self.workers.is_new_workers_added(include_default_workers=include_default_workers)
257 if are_workers_not_changed:
258 self.output.print("Workers configuration remains unchanged")
259 return
261 start_required = self.workers.generate_compose(
262 include_default_workers=include_default_workers,
263 include_custom_workers=include_custom_workers,
264 )
266 if start_required:
267 self.workers.docker_client.compose.up(
268 services=[],
269 detach=True,
270 pull="never",
271 force_recreate=force_recreate,
272 )
274 def backup_restore_workers_supervisor(self, backup_manager: BackupManager):
275 """
276 Restore workers supervisor configuration from backup.
278 Args:
279 backup_manager: BackupManager instance containing backups
280 """
281 self.output.print("Rolling back to previous workers configuration")
282 for backup in backup_manager.backups:
283 backup_manager.restore(backup, force=True)
285 def backup_workers_supervisor_conf(self) -> BackupManager:
286 """
287 Backup workers supervisor configuration files.
289 Returns:
290 BackupManager: Manager containing backed up files
291 """
292 backup_workers_manager = BackupManager(name="workers", backup_group_name="workers")
293 backup_workers_manager.backup(self.workers.supervisor_config_path, bench_name=self.bench_name)
295 if self.workers.supervisor_config_path.exists():
296 for file_path in self.workers.config_dir.iterdir():
297 file_path_abs = str(file_path.absolute())
298 if not file_path.is_file():
299 continue
300 if file_path_abs.endswith(".fm.supervisor.conf"):
301 from_path = file_path
302 backup_workers_manager.backup(from_path, bench_name=self.bench_name)
303 file_path.unlink()
304 return backup_workers_manager
306 def regenerate_workers_supervisor_conf(self):
307 """Regenerate workers supervisor configuration by backing up existing config."""
308 self.backup_workers_supervisor_conf()
310 def ensure_workers_running_if_available(self):
311 """Ensure workers are running if compose file exists and bench is running."""
312 if self.workers.compose_file_manager.exists():
313 services = self.workers.compose_file_manager.get_services_list()
314 containers = self.workers.compose_file_manager.get_container_names().values()
315 try:
316 all_statuses = self.workers.docker_client.compose.get_all_services_status()
317 running_statuses = {
318 status["Service"]: status["State"] for status in all_statuses if status.get("Name") in containers
319 }
320 workers_running = all(running_statuses.get(s) == "running" for s in services)
321 except DockerException:
322 workers_running = False
324 if not workers_running:
325 if self.is_running():
326 self.workers.docker_client.compose.up(
327 services=[],
328 detach=True,
329 pull="never",
330 force_recreate=False,
331 )
333 def restart_workers_containers_services(self, use_container_restart: bool = False, force: bool = False):
334 """
335 Restart workers and schedule containers.
337 Args:
338 use_container_restart: If True, restart entire containers. If False, restart supervisor processes.
339 force: If True, use aggressive restart (timeout=0 for container, stop+start for supervisor).
340 """
341 scheduler_service = [SiteServicesEnum.schedule.value]
343 if use_container_restart:
344 self.docker_ops.restart_services(scheduler_service, force=force)
345 else:
346 for service in scheduler_service:
347 self.output.change_head(f"Restarting worker service - {service}")
348 is_restarted = self.restart_supervisor_service(service, force=force)
349 if is_restarted:
350 action = "Stopped and started" if force else "Restarted"
351 self.output.print(f"{action} supervisor processes - {service}")
353 worker_services = self.workers.compose_file_manager.get_services_list()
354 for service in worker_services:
355 self.output.change_head(f"Restarting worker service - {service}")
357 if use_container_restart:
358 timeout = 0 if force else 100
359 self.workers.docker_client.compose.restart(services=[service], timeout=timeout)
360 action = "Force restarted" if force else "Restarted"
361 self.output.print(f"{action} container - {service}")
362 else:
363 is_restarted = self.restart_supervisor_service(
364 service,
365 docker_client_obj=self.workers.docker_client,
366 force=force,
367 )
368 if is_restarted:
369 action = "Stopped and started" if force else "Restarted"
370 self.output.print(f"{action} supervisor processes - {service}")