Coverage for frappe_manager / site_manager / modules / bench_supervisor.py: 17%

204 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1""" 

2BenchSupervisor - Supervisor Process Management Module 

3 

4This module handles Supervisor process management for bench services. 

5Extracted from the monolithic Bench class for better separation of concerns. 

6""" 

7 

8import time 

9import json 

10import multiprocessing 

11from jinja2 import Template 

12from frappe_manager.utils.helpers import get_template_path 

13from frappe_manager.docker import DockerClient, DockerException 

14from frappe_manager.logger.contextual import ContextualLogger 

15from frappe_manager.output_manager import OutputHandler 

16from frappe_manager.output_manager.rich_output import RichOutputHandler 

17from frappe_manager.site_manager.bench_config import BenchConfig 

18from frappe_manager.site_manager.exceptions import BenchOperationException 

19 

20CONTAINER_BENCH_DIR = "/workspace/frappe-bench" 

21COMMON_SITE_CONFIG_FILE = "common_site_config.json" 

22 

23 

24class BenchSupervisor: 

25 """Manages Supervisor process and worker configuration.""" 

26 

27 def __init__( 

28 self, 

29 logger: ContextualLogger, 

30 docker_client: DockerClient, 

31 config: BenchConfig, 

32 bench_name: str, 

33 output_handler: OutputHandler | None = None, 

34 ): 

35 """ 

36 Initialize BenchSupervisor. 

37 

38 Args: 

39 logger: Contextual logger for audit/debug logging 

40 docker_client: Docker client for operations 

41 config: Bench configuration 

42 bench_name: Name of the bench 

43 output_handler: Optional output handler for displaying information 

44 """ 

45 self.logger = logger.child(component="supervisor") 

46 self.docker_client = docker_client 

47 self.config = config 

48 self.bench_name = bench_name 

49 self.output = output_handler or RichOutputHandler() 

50 

51 def is_supervisord_running(self, interval: int = 2, timeout: int = 30) -> bool: 

52 """ 

53 Check if supervisord is running. 

54 

55 Args: 

56 interval: Check interval in seconds 

57 timeout: Maximum time to wait in seconds 

58 

59 Returns: 

60 True if supervisord is running, False otherwise 

61 """ 

62 for i in range(timeout): 

63 try: 

64 status_command = "supervisorctl -c /opt/user/supervisord.conf status all" 

65 output = self.docker_client.compose.exec("frappe", status_command, user="frappe", stream=False) 

66 return True 

67 except DockerException as e: 

68 if any("frappe-bench" in s for s in e.output.combined): 

69 return True 

70 time.sleep(interval) 

71 continue 

72 return False 

73 

74 def restart_supervisor_service( 

75 self, 

76 service: str, 

77 docker_client_obj: DockerClient | None = None, 

78 timeout: int = 30, 

79 interval: int = 1, 

80 force: bool = False, 

81 ) -> bool: 

82 """ 

83 Restart a supervisor service. 

84 

85 Args: 

86 service: Service name to restart 

87 docker_client_obj: Optional Docker client (uses self.docker_client if None) 

88 timeout: Timeout in seconds (used for socket availability check after restart) 

89 interval: Check interval in seconds 

90 force: If True, stop then start processes (hard restart). If False, use restart command (graceful). 

91 

92 Returns: 

93 True if restarted successfully, False otherwise 

94 

95 Raises: 

96 BenchOperationException: If service not running or restart fails 

97 """ 

98 if not docker_client_obj: 

99 docker_client_obj = self.docker_client 

100 

101 try: 

102 all_statuses = docker_client_obj.compose.get_all_services_status() 

103 service_running = any( 

104 status["Service"] == service and status["State"] == "running" for status in all_statuses 

105 ) 

106 except DockerException: 

107 service_running = False 

108 

109 if not service_running: 

110 self.output.display_error(text=f"Service [blue]{service}[/blue] not running.") 

111 return False 

112 

113 if force: 

114 stop_command = "supervisorctl -c /opt/user/supervisord.conf stop all" 

115 start_command = "supervisorctl -c /opt/user/supervisord.conf start all" 

116 try: 

117 docker_client_obj.compose.exec(service=service, user="frappe", command=stop_command, stream=False) 

118 docker_client_obj.compose.exec(service=service, user="frappe", command=start_command, stream=False) 

119 except DockerException as e: 

120 raise BenchOperationException( 

121 self.bench_name, 

122 message=f"Failed to force restart supervisor for {service} service: {e!s}", 

123 ) 

124 else: 

125 restart_supervisor_command = "supervisorctl -c /opt/user/supervisord.conf restart all" 

126 try: 

127 docker_client_obj.compose.exec( 

128 service=service, 

129 user="frappe", 

130 command=restart_supervisor_command, 

131 stream=False, 

132 ) 

133 except DockerException as e: 

134 raise BenchOperationException( 

135 self.bench_name, 

136 message=f"Failed to restart supervisor for {service} service: {e!s}", 

137 ) 

138 

139 # Verify supervisor socket was created after restart 

140 socket_path = f"/fm-sockets/{service}.sock" 

141 for _ in range(timeout): 

142 try: 

143 self.docker_client.compose.exec( 

144 service=service, 

145 user="frappe", 

146 command=f"test -e {socket_path}", 

147 stream=False, 

148 ) 

149 return True 

150 except DockerException: 

151 time.sleep(interval) 

152 

153 # Socket not found, but don't fail - it might be dev mode where socket isn't used 

154 self.output.warning( 

155 f"Supervisor socket {socket_path} not found after restart, but services may still be running", 

156 ) 

157 return True 

158 

159 def _run_frappe_command(self, command: str) -> None: 

160 """ 

161 Run a command in the frappe service. 

162 

163 Args: 

164 command: Command to execute 

165 

166 Raises: 

167 DockerException: If command fails 

168 """ 

169 try: 

170 self.docker_client.compose.exec("frappe", command, user="frappe", stream=False) 

171 except DockerException as e: 

172 from frappe_manager.site_manager.exceptions import BenchException 

173 

174 raise BenchException("frappe", f"Failed to run {command} in frappe service.") 

175 

176 def _get_gunicorn_workers(self) -> int: 

177 import psutil 

178 

179 cpus = multiprocessing.cpu_count() 

180 ram_gb = psutil.virtual_memory().total / (1024**3) 

181 ram_based = max(1, int(ram_gb * 1024 / 256)) 

182 return min(cpus, ram_based) 

183 

184 def _get_gunicorn_threads(self) -> int: 

185 cpus = multiprocessing.cpu_count() 

186 return max(2, min(cpus, 4)) 

187 

188 def _get_default_max_requests(self, workers: int) -> int: 

189 return 1000 

190 

191 def _compute_max_requests_jitter(self, max_requests: int) -> int: 

192 return int(max_requests * 0.1) 

193 

194 def _can_enable_multi_queue_consumption(self, bench_path) -> bool: 

195 return True 

196 

197 def generate_supervisor_config(self, bench_path, user="frappe", skip_redis=True) -> tuple[str, dict]: 

198 from pathlib import Path 

199 

200 host_bench_dir = Path(bench_path).resolve() / "workspace" / "frappe-bench" 

201 common_site_config_path = host_bench_dir / "sites" / COMMON_SITE_CONFIG_FILE 

202 

203 config = {} 

204 if common_site_config_path.exists(): 

205 try: 

206 config = json.loads(common_site_config_path.read_text()) 

207 except Exception: 

208 pass 

209 

210 web_worker_count = config.get("gunicorn_workers", self._get_gunicorn_workers()) 

211 max_requests = config.get("gunicorn_max_requests", self._get_default_max_requests(web_worker_count)) 

212 # gthread worker class: each worker handles multiple concurrent requests via threads. 

213 # Frappe is IO-bound (DB/Redis heavy) and its concurrency_limiter explicitly reads 

214 # --threads from the gunicorn master cmdline, so gthread is the intended worker type. 

215 # Default 2 threads per worker; overridable via common_site_config.json. 

216 gunicorn_threads = config.get("gunicorn_threads", self._get_gunicorn_threads()) 

217 

218 context = { 

219 "bench_dir": CONTAINER_BENCH_DIR, 

220 "sites_dir": f"{CONTAINER_BENCH_DIR}/sites", 

221 "user": user, 

222 "use_rq": True, 

223 "http_timeout": config.get("http_timeout", 120), 

224 "node": "/workspace/frappe-bench/.fnm/aliases/default/bin/node", 

225 "webserver_port": config.get("webserver_port", 80), 

226 "gunicorn_workers": web_worker_count, 

227 "gunicorn_max_requests": max_requests, 

228 "gunicorn_max_requests_jitter": self._compute_max_requests_jitter(max_requests), 

229 "gunicorn_threads": gunicorn_threads, 

230 "bench_name": "frappe-bench", 

231 "background_workers": config.get("background_workers") or 1, 

232 "bench_cmd": "/opt/user/.bin/bench", 

233 "workers": config.get("workers", {}), 

234 "multi_queue_consumption": self._can_enable_multi_queue_consumption(bench_path), 

235 "supervisor_startretries": 10, 

236 } 

237 

238 template_path = get_template_path("supervisor.conf.tmpl") 

239 return Template(template_path.read_text()).render(**context), context 

240 

241 def setup_supervisor(self, bench_path, force: bool = False, use_run: bool = False) -> None: 

242 import configparser 

243 import io 

244 from pathlib import Path 

245 

246 bench_dir = Path(bench_path).resolve() / "workspace" / "frappe-bench" 

247 config_dir = bench_dir / "config" 

248 

249 self.output.change_head("Checking supervisor configuration") 

250 

251 fm_confs_exist = any(config_dir.glob("*.fm.supervisor.conf")) if config_dir.exists() else False 

252 if fm_confs_exist and not force: 

253 return 

254 

255 self.output.change_head("Configuring supervisor configs") 

256 try: 

257 rendered, context = self.generate_supervisor_config(bench_path) 

258 except Exception as e: 

259 raise BenchOperationException(self.bench_name, f"Failed to configure supervisor: {e}") 

260 

261 config_dir.mkdir(parents=True, exist_ok=True) 

262 parsed = configparser.ConfigParser(allow_no_value=True, strict=False, interpolation=None) 

263 parsed.read_string(rendered) 

264 self._write_split_configs(parsed, config_dir) 

265 self._write_gunicorn_wrapper(config_dir, context) 

266 if self.config.newrelic_enabled and self.config.newrelic_license_key: 

267 self._write_newrelic_config(config_dir) 

268 self.output.print("Configured supervisor configs") 

269 

270 def setup_newrelic(self, bench_path) -> None: 

271 from pathlib import Path 

272 

273 bench_dir = Path(bench_path).resolve() / "workspace" / "frappe-bench" 

274 config_dir = bench_dir / "config" 

275 config_dir.mkdir(parents=True, exist_ok=True) 

276 

277 self.output.change_head("Configuring supervisor configs") 

278 

279 try: 

280 _, context = self.generate_supervisor_config(bench_path) 

281 except Exception as e: 

282 raise BenchOperationException(self.bench_name, f"Failed to read bench config for NewRelic setup: {e}") 

283 

284 self._write_gunicorn_wrapper(config_dir, context) 

285 

286 if self.config.newrelic_enabled and self.config.newrelic_license_key: 

287 self._write_newrelic_config(config_dir) 

288 

289 self.output.print("Configured supervisor configs") 

290 

291 def _write_split_configs(self, config, config_dir) -> None: 

292 import configparser 

293 import io 

294 from pathlib import Path 

295 

296 for section_name in config.sections(): 

297 if section_name.startswith("group:"): 

298 continue 

299 

300 section_config = configparser.ConfigParser(allow_no_value=True, strict=False, interpolation=None) 

301 section_config.add_section(section_name) 

302 for key, value in config.items(section_name): 

303 section_config.set(section_name, key, value) 

304 

305 delimiter = "-node-" if "-node-" in section_name else "-frappe-" 

306 file_name_prefix = section_name.split(delimiter)[-1] 

307 file_name = ( 

308 file_name_prefix + ".workers.fm.supervisor.conf" 

309 if "worker" in section_name 

310 else file_name_prefix + ".fm.supervisor.conf" 

311 ) 

312 

313 buf = io.StringIO() 

314 section_config.write(buf) 

315 Path(config_dir / file_name).write_text(buf.getvalue()) 

316 self.logger.info(f"Split supervisor conf {section_name} => {file_name}") 

317 

318 def _write_newrelic_config(self, config_dir) -> None: 

319 import configparser 

320 import io 

321 from pathlib import Path 

322 

323 cfg = configparser.RawConfigParser() 

324 

325 cfg.add_section("newrelic") 

326 cfg.set("newrelic", "license_key", self.config.newrelic_license_key or "") 

327 cfg.set("newrelic", "app_name", f"Frappe - {self.bench_name}") 

328 cfg.set("newrelic", "monitor_mode", "true") 

329 cfg.set("newrelic", "high_security", "false") 

330 cfg.set("newrelic", "log_file", f"{CONTAINER_BENCH_DIR}/logs/newrelic-agent.log") 

331 cfg.set("newrelic", "log_level", "info") 

332 cfg.set("newrelic", "startup_timeout", "0.0") 

333 cfg.set("newrelic", "shutdown_timeout", "2.5") 

334 cfg.set("newrelic", "distributed_tracing.enabled", "true") 

335 cfg.set("newrelic", "span_events.max_samples_stored", "2000") 

336 cfg.set("newrelic", "datastore_tracer.instance_reporting.enabled", "true") 

337 cfg.set("newrelic", "datastore_tracer.database_name_reporting.enabled", "true") 

338 

339 cfg.add_section("transaction_tracer") 

340 cfg.set("transaction_tracer", "enabled", "true") 

341 cfg.set("transaction_tracer", "transaction_threshold", "apdex_f") 

342 cfg.set("transaction_tracer", "record_sql", "obfuscated") 

343 cfg.set("transaction_tracer", "stack_trace_threshold", "0.5") 

344 cfg.set("transaction_tracer", "explain_enabled", "true") 

345 cfg.set("transaction_tracer", "explain_threshold", "0.5") 

346 cfg.set("transaction_tracer", "attributes.enabled", "true") 

347 cfg.set( 

348 "transaction_tracer", 

349 "attributes.include", 

350 "request.headers.x-frappe-request-id request.uri request.method", 

351 ) 

352 cfg.set( 

353 "transaction_tracer", 

354 "attributes.exclude", 

355 "request.headers.authorization request.headers.cookie", 

356 ) 

357 

358 cfg.add_section("error_collector") 

359 cfg.set("error_collector", "enabled", "true") 

360 cfg.set("error_collector", "ignore_status_codes", "100-102 200-208 226 300-308 404") 

361 cfg.set( 

362 "error_collector", 

363 "expected_classes", 

364 ( 

365 "frappe.exceptions.ValidationError " 

366 "frappe.exceptions.PermissionError " 

367 "frappe.exceptions.DoesNotExistError" 

368 ), 

369 ) 

370 cfg.set("error_collector", "attributes.enabled", "true") 

371 cfg.set("error_collector", "max_event_samples_stored", "100") 

372 

373 buf = io.StringIO() 

374 cfg.write(buf) 

375 Path(config_dir / "newrelic.ini").write_text(buf.getvalue()) 

376 self.logger.info("Generated newrelic.ini") 

377 

378 def _write_gunicorn_wrapper(self, config_dir, context: dict) -> None: 

379 from pathlib import Path 

380 

381 gunicorn_args = ( 

382 f"-b 0.0.0.0:{context['webserver_port']}" 

383 f" -w {context['gunicorn_workers']}" 

384 f" --worker-class=gthread" 

385 f" --threads {context['gunicorn_threads']}" 

386 f" --max-requests {context['gunicorn_max_requests']}" 

387 f" --max-requests-jitter {context['gunicorn_max_requests_jitter']}" 

388 f" -t {context['http_timeout']}" 

389 f" --graceful-timeout 30" 

390 f" frappe.app:application --preload" 

391 ) 

392 

393 template_path = get_template_path("fm-web-server.sh.tmpl") 

394 script = Template(template_path.read_text()).render( 

395 bench_dir=context["bench_dir"], 

396 gunicorn_args=gunicorn_args, 

397 bench_name=self.bench_name, 

398 ) 

399 

400 wrapper_path = Path(config_dir) / "fm-web-server.sh" 

401 wrapper_path.write_text(script) 

402 wrapper_path.chmod(0o755) 

403 self.logger.info("Generated gunicorn.sh wrapper")