Coverage for frappe_manager / site_manager / modules / bench_workers.py: 24%

158 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1""" 

2Bench Workers Module 

3 

4Provides worker management for the bench including: 

5- Worker compose file generation and management (BenchWorkers) 

6- Worker coordination and lifecycle (BenchWorkerCoordinator) 

7- Supervisor configuration backup and restore 

8- Worker startup checks and service restarts 

9""" 

10 

11from copy import deepcopy 

12from typing import TYPE_CHECKING 

13 

14from frappe_manager import SiteServicesEnum 

15from frappe_manager.docker import ComposeFile, DockerClient, DockerException 

16from frappe_manager.migration_manager.backup_manager import BackupManager 

17from frappe_manager.output_manager import OutputHandler 

18from frappe_manager.output_manager.rich_output import RichOutputHandler 

19from frappe_manager.site_manager.exceptions import ( 

20 BenchOperationException, 

21 BenchWorkersSupervisorConfigurtionNotFoundError, 

22) 

23from frappe_manager.utils.helpers import get_container_name_prefix, get_current_fm_version 

24from frappe_manager.utils.site import is_default_worker 

25 

26if TYPE_CHECKING: 

27 from frappe_manager.site_manager.site import Bench 

28 

29 

30class BenchWorkers: 

31 """ 

32 Manages worker compose file generation and configuration. 

33 

34 Responsibilities: 

35 - Generate worker compose files based on supervisor configuration 

36 - Manage worker service definitions 

37 - Handle custom and default workers 

38 - Clean up worker compose files when no longer needed 

39 """ 

40 

41 def __init__(self, bench: "Bench", verbose: bool = True, output_handler: OutputHandler | None = None): 

42 """ 

43 Initialize BenchWorkers. 

44 

45 Args: 

46 bench: The Bench instance 

47 verbose: Whether to show verbose output 

48 output_handler: Handler for output operations 

49 """ 

50 self.bench = bench 

51 self.compose_path = self.bench.path / "docker-compose.workers.yml" 

52 self.config_dir = self.bench.path / "workspace" / "frappe-bench" / "config" 

53 self.supervisor_config_path = self.config_dir / "supervisor.conf" 

54 self.output = output_handler or RichOutputHandler() 

55 self.compose_file_manager = ComposeFile(self.compose_path, template_name="docker-compose.workers.tmpl") 

56 self.docker_client = DockerClient(compose_file_path=self.compose_path, output=self.output) 

57 

58 def get_expected_workers( 

59 self, 

60 include_default_workers: bool = True, 

61 include_custom_workers: bool = True, 

62 ) -> list[str]: 

63 """ 

64 Get list of expected workers from supervisor configuration. 

65 

66 Args: 

67 include_default_workers: Whether to include default workers (short, long) 

68 include_custom_workers: Whether to include custom workers 

69 

70 Returns: 

71 Sorted list of worker service names 

72 

73 Raises: 

74 BenchWorkersSupervisorConfigurtionNotFoundError: If no worker configs found 

75 """ 

76 self.output.change_head("Checking workers info") 

77 

78 workers_supervisor_conf_paths = [] 

79 

80 for file_path in self.config_dir.iterdir(): 

81 file_path_abs = str(file_path.absolute()) 

82 if file_path.is_file(): 

83 if file_path_abs.endswith(".workers.fm.supervisor.conf"): 

84 workers_supervisor_conf_paths.append(file_path) 

85 

86 if len(workers_supervisor_conf_paths) == 0: 

87 raise BenchWorkersSupervisorConfigurtionNotFoundError(self.bench.name, str(self.config_dir)) 

88 

89 workers_expected_service_names = [] 

90 

91 for worker_name in workers_supervisor_conf_paths: 

92 worker_name = worker_name.name 

93 worker_name = worker_name.replace("frappe-bench-frappe-", "") 

94 worker_name = worker_name.replace(".workers.fm.supervisor.conf", "") 

95 

96 if is_default_worker(worker_name): 

97 if include_default_workers: 

98 workers_expected_service_names.append(worker_name) 

99 elif include_custom_workers: 

100 workers_expected_service_names.append(worker_name) 

101 

102 workers_expected_service_names.sort() 

103 

104 return workers_expected_service_names 

105 

106 def is_new_workers_added(self, include_default_workers: bool = False) -> bool: 

107 """ 

108 Check if worker configuration has changed. 

109 

110 Args: 

111 include_default_workers: Whether to include default workers in comparison 

112 

113 Returns: 

114 True if workers configuration matches expected, False otherwise 

115 """ 

116 if not self.compose_file_manager.is_template_loaded: 

117 prev_workers = self.compose_file_manager.get_services_list() 

118 prev_workers.sort() 

119 expected_workers = self.get_expected_workers(include_default_workers=include_default_workers) 

120 

121 # get custom workers from common_site_config.json 

122 common_site_config_data = self.bench.get_common_bench_config() 

123 

124 if "workers" in common_site_config_data: 

125 custom_workers: list[str] = common_site_config_data["workers"].keys() 

126 for worker in custom_workers: 

127 worker = f"{worker}-worker" 

128 if worker not in prev_workers: 

129 return False 

130 return prev_workers == expected_workers 

131 

132 return False 

133 

134 def generate_compose(self, include_default_workers: bool = True, include_custom_workers: bool = True) -> bool: 

135 """ 

136 Generate worker compose file from template. 

137 

138 Args: 

139 include_default_workers: Whether to include default workers 

140 include_custom_workers: Whether to include custom workers 

141 

142 Returns: 

143 True if workers were configured and need starting, False otherwise 

144 """ 

145 self.output.change_head("Generating workers compose configuration") 

146 

147 if not self.compose_path.exists(): 

148 self.output.print("Workers compose not present. Generating new configuration..") 

149 else: 

150 self.output.print("Workers configuration changed. Recreating compose..") 

151 

152 self.compose_file_manager.yml = self.compose_file_manager.load_template() 

153 

154 template_worker_config = self.compose_file_manager.yml["services"]["worker-name"] 

155 del self.compose_file_manager.yml["services"]["worker-name"] 

156 

157 workers_expected_service_names = self.get_expected_workers( 

158 include_default_workers=include_default_workers, 

159 include_custom_workers=include_custom_workers, 

160 ) 

161 

162 if len(workers_expected_service_names) > 0: 

163 import os 

164 

165 for worker in workers_expected_service_names: 

166 worker_config = deepcopy(template_worker_config) 

167 worker_config["environment"]["USERID"] = os.getuid() 

168 worker_config["environment"]["USERGROUP"] = os.getgid() 

169 worker_config["environment"]["WORKER_NAME"] = worker 

170 self.compose_file_manager.yml["services"][worker] = worker_config 

171 

172 self.compose_file_manager.with_prefix( 

173 get_container_name_prefix(self.bench.name), 

174 "site-network", 

175 ).with_version(get_current_fm_version()).with_restart(self.bench.bench_config.restart_policy.value).commit() 

176 

177 self.output.print(f"{' '.join(workers_expected_service_names)} configurations generated") 

178 return True 

179 

180 if self.compose_file_manager.exists(): 

181 self.output.print("No workers found, cleaning up existing configuration") 

182 self.docker_client.compose.down(remove_orphans=True, volumes=False, timeout=5, stream=True) 

183 self.compose_file_manager.compose_path.unlink() 

184 

185 return False 

186 

187 

188class BenchWorkerCoordinator: 

189 """ 

190 Coordinates worker processes for a bench. 

191 

192 Responsibilities: 

193 - Sync worker compose files 

194 - Backup and restore worker supervisor configs 

195 - Ensure workers are running 

196 - Restart worker services 

197 """ 

198 

199 def __init__( 

200 self, 

201 bench_name: str, 

202 workers: "BenchWorkers", 

203 supervisor, 

204 bench_path, 

205 restart_supervisor_service_fn, 

206 is_running_fn, 

207 docker_ops=None, 

208 output_handler: OutputHandler | None = None, 

209 ): 

210 """ 

211 Initialize BenchWorkerCoordinator module. 

212 

213 Args: 

214 bench_name: Name of the bench 

215 workers: BenchWorkers instance (from workers_manager) 

216 supervisor: BenchSupervisor instance for supervisor setup 

217 bench_path: Path to bench directory 

218 restart_supervisor_service_fn: Callable to restart supervisor service 

219 is_running_fn: Callable to check if bench is running 

220 docker_ops: BenchDockerOps instance for docker operations 

221 output_handler: Handler for output operations 

222 """ 

223 self.bench_name = bench_name 

224 self.workers = workers 

225 self.supervisor = supervisor 

226 self.bench_path = bench_path 

227 self.restart_supervisor_service = restart_supervisor_service_fn 

228 self.is_running = is_running_fn 

229 self.docker_ops = docker_ops 

230 self.output = output_handler or RichOutputHandler() 

231 

232 def sync_workers_compose( 

233 self, 

234 force_recreate: bool = False, 

235 setup_supervisor: bool = True, 

236 include_default_workers: bool = True, 

237 include_custom_workers: bool = True, 

238 ): 

239 """ 

240 Synchronize workers compose file and optionally setup supervisor. 

241 

242 Args: 

243 force_recreate: Force recreate containers 

244 setup_supervisor: Whether to setup supervisor 

245 include_default_workers: Include default workers 

246 include_custom_workers: Include custom workers 

247 """ 

248 if setup_supervisor: 

249 workers_backup_manager = self.backup_workers_supervisor_conf() 

250 try: 

251 self.supervisor.setup_supervisor(self.bench_path, force=True) 

252 except BenchOperationException as e: 

253 self.backup_restore_workers_supervisor(workers_backup_manager) 

254 

255 are_workers_not_changed = self.workers.is_new_workers_added(include_default_workers=include_default_workers) 

256 

257 if are_workers_not_changed: 

258 self.output.print("Workers configuration remains unchanged") 

259 return 

260 

261 start_required = self.workers.generate_compose( 

262 include_default_workers=include_default_workers, 

263 include_custom_workers=include_custom_workers, 

264 ) 

265 

266 if start_required: 

267 self.workers.docker_client.compose.up( 

268 services=[], 

269 detach=True, 

270 pull="never", 

271 force_recreate=force_recreate, 

272 ) 

273 

274 def backup_restore_workers_supervisor(self, backup_manager: BackupManager): 

275 """ 

276 Restore workers supervisor configuration from backup. 

277 

278 Args: 

279 backup_manager: BackupManager instance containing backups 

280 """ 

281 self.output.print("Rolling back to previous workers configuration") 

282 for backup in backup_manager.backups: 

283 backup_manager.restore(backup, force=True) 

284 

285 def backup_workers_supervisor_conf(self) -> BackupManager: 

286 """ 

287 Backup workers supervisor configuration files. 

288 

289 Returns: 

290 BackupManager: Manager containing backed up files 

291 """ 

292 backup_workers_manager = BackupManager(name="workers", backup_group_name="workers") 

293 backup_workers_manager.backup(self.workers.supervisor_config_path, bench_name=self.bench_name) 

294 

295 if self.workers.supervisor_config_path.exists(): 

296 for file_path in self.workers.config_dir.iterdir(): 

297 file_path_abs = str(file_path.absolute()) 

298 if not file_path.is_file(): 

299 continue 

300 if file_path_abs.endswith(".fm.supervisor.conf"): 

301 from_path = file_path 

302 backup_workers_manager.backup(from_path, bench_name=self.bench_name) 

303 file_path.unlink() 

304 return backup_workers_manager 

305 

306 def regenerate_workers_supervisor_conf(self): 

307 """Regenerate workers supervisor configuration by backing up existing config.""" 

308 self.backup_workers_supervisor_conf() 

309 

310 def ensure_workers_running_if_available(self): 

311 """Ensure workers are running if compose file exists and bench is running.""" 

312 if self.workers.compose_file_manager.exists(): 

313 services = self.workers.compose_file_manager.get_services_list() 

314 containers = self.workers.compose_file_manager.get_container_names().values() 

315 try: 

316 all_statuses = self.workers.docker_client.compose.get_all_services_status() 

317 running_statuses = { 

318 status["Service"]: status["State"] for status in all_statuses if status.get("Name") in containers 

319 } 

320 workers_running = all(running_statuses.get(s) == "running" for s in services) 

321 except DockerException: 

322 workers_running = False 

323 

324 if not workers_running: 

325 if self.is_running(): 

326 self.workers.docker_client.compose.up( 

327 services=[], 

328 detach=True, 

329 pull="never", 

330 force_recreate=False, 

331 ) 

332 

333 def restart_workers_containers_services(self, use_container_restart: bool = False, force: bool = False): 

334 """ 

335 Restart workers and schedule containers. 

336 

337 Args: 

338 use_container_restart: If True, restart entire containers. If False, restart supervisor processes. 

339 force: If True, use aggressive restart (timeout=0 for container, stop+start for supervisor). 

340 """ 

341 scheduler_service = [SiteServicesEnum.schedule.value] 

342 

343 if use_container_restart: 

344 self.docker_ops.restart_services(scheduler_service, force=force) 

345 else: 

346 for service in scheduler_service: 

347 self.output.change_head(f"Restarting worker service - {service}") 

348 is_restarted = self.restart_supervisor_service(service, force=force) 

349 if is_restarted: 

350 action = "Stopped and started" if force else "Restarted" 

351 self.output.print(f"{action} supervisor processes - {service}") 

352 

353 worker_services = self.workers.compose_file_manager.get_services_list() 

354 for service in worker_services: 

355 self.output.change_head(f"Restarting worker service - {service}") 

356 

357 if use_container_restart: 

358 timeout = 0 if force else 100 

359 self.workers.docker_client.compose.restart(services=[service], timeout=timeout) 

360 action = "Force restarted" if force else "Restarted" 

361 self.output.print(f"{action} container - {service}") 

362 else: 

363 is_restarted = self.restart_supervisor_service( 

364 service, 

365 docker_client_obj=self.workers.docker_client, 

366 force=force, 

367 ) 

368 if is_restarted: 

369 action = "Stopped and started" if force else "Restarted" 

370 self.output.print(f"{action} supervisor processes - {service}")