Coverage for frappe_manager / site_manager / modules / bench_supervisor.py: 17%
204 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1"""
2BenchSupervisor - Supervisor Process Management Module
4This module handles Supervisor process management for bench services.
5Extracted from the monolithic Bench class for better separation of concerns.
6"""
8import time
9import json
10import multiprocessing
11from jinja2 import Template
12from frappe_manager.utils.helpers import get_template_path
13from frappe_manager.docker import DockerClient, DockerException
14from frappe_manager.logger.contextual import ContextualLogger
15from frappe_manager.output_manager import OutputHandler
16from frappe_manager.output_manager.rich_output import RichOutputHandler
17from frappe_manager.site_manager.bench_config import BenchConfig
18from frappe_manager.site_manager.exceptions import BenchOperationException
20CONTAINER_BENCH_DIR = "/workspace/frappe-bench"
21COMMON_SITE_CONFIG_FILE = "common_site_config.json"
24class BenchSupervisor:
25 """Manages Supervisor process and worker configuration."""
27 def __init__(
28 self,
29 logger: ContextualLogger,
30 docker_client: DockerClient,
31 config: BenchConfig,
32 bench_name: str,
33 output_handler: OutputHandler | None = None,
34 ):
35 """
36 Initialize BenchSupervisor.
38 Args:
39 logger: Contextual logger for audit/debug logging
40 docker_client: Docker client for operations
41 config: Bench configuration
42 bench_name: Name of the bench
43 output_handler: Optional output handler for displaying information
44 """
45 self.logger = logger.child(component="supervisor")
46 self.docker_client = docker_client
47 self.config = config
48 self.bench_name = bench_name
49 self.output = output_handler or RichOutputHandler()
51 def is_supervisord_running(self, interval: int = 2, timeout: int = 30) -> bool:
52 """
53 Check if supervisord is running.
55 Args:
56 interval: Check interval in seconds
57 timeout: Maximum time to wait in seconds
59 Returns:
60 True if supervisord is running, False otherwise
61 """
62 for i in range(timeout):
63 try:
64 status_command = "supervisorctl -c /opt/user/supervisord.conf status all"
65 output = self.docker_client.compose.exec("frappe", status_command, user="frappe", stream=False)
66 return True
67 except DockerException as e:
68 if any("frappe-bench" in s for s in e.output.combined):
69 return True
70 time.sleep(interval)
71 continue
72 return False
74 def restart_supervisor_service(
75 self,
76 service: str,
77 docker_client_obj: DockerClient | None = None,
78 timeout: int = 30,
79 interval: int = 1,
80 force: bool = False,
81 ) -> bool:
82 """
83 Restart a supervisor service.
85 Args:
86 service: Service name to restart
87 docker_client_obj: Optional Docker client (uses self.docker_client if None)
88 timeout: Timeout in seconds (used for socket availability check after restart)
89 interval: Check interval in seconds
90 force: If True, stop then start processes (hard restart). If False, use restart command (graceful).
92 Returns:
93 True if restarted successfully, False otherwise
95 Raises:
96 BenchOperationException: If service not running or restart fails
97 """
98 if not docker_client_obj:
99 docker_client_obj = self.docker_client
101 try:
102 all_statuses = docker_client_obj.compose.get_all_services_status()
103 service_running = any(
104 status["Service"] == service and status["State"] == "running" for status in all_statuses
105 )
106 except DockerException:
107 service_running = False
109 if not service_running:
110 self.output.display_error(text=f"Service [blue]{service}[/blue] not running.")
111 return False
113 if force:
114 stop_command = "supervisorctl -c /opt/user/supervisord.conf stop all"
115 start_command = "supervisorctl -c /opt/user/supervisord.conf start all"
116 try:
117 docker_client_obj.compose.exec(service=service, user="frappe", command=stop_command, stream=False)
118 docker_client_obj.compose.exec(service=service, user="frappe", command=start_command, stream=False)
119 except DockerException as e:
120 raise BenchOperationException(
121 self.bench_name,
122 message=f"Failed to force restart supervisor for {service} service: {e!s}",
123 )
124 else:
125 restart_supervisor_command = "supervisorctl -c /opt/user/supervisord.conf restart all"
126 try:
127 docker_client_obj.compose.exec(
128 service=service,
129 user="frappe",
130 command=restart_supervisor_command,
131 stream=False,
132 )
133 except DockerException as e:
134 raise BenchOperationException(
135 self.bench_name,
136 message=f"Failed to restart supervisor for {service} service: {e!s}",
137 )
139 # Verify supervisor socket was created after restart
140 socket_path = f"/fm-sockets/{service}.sock"
141 for _ in range(timeout):
142 try:
143 self.docker_client.compose.exec(
144 service=service,
145 user="frappe",
146 command=f"test -e {socket_path}",
147 stream=False,
148 )
149 return True
150 except DockerException:
151 time.sleep(interval)
153 # Socket not found, but don't fail - it might be dev mode where socket isn't used
154 self.output.warning(
155 f"Supervisor socket {socket_path} not found after restart, but services may still be running",
156 )
157 return True
159 def _run_frappe_command(self, command: str) -> None:
160 """
161 Run a command in the frappe service.
163 Args:
164 command: Command to execute
166 Raises:
167 DockerException: If command fails
168 """
169 try:
170 self.docker_client.compose.exec("frappe", command, user="frappe", stream=False)
171 except DockerException as e:
172 from frappe_manager.site_manager.exceptions import BenchException
174 raise BenchException("frappe", f"Failed to run {command} in frappe service.")
176 def _get_gunicorn_workers(self) -> int:
177 import psutil
179 cpus = multiprocessing.cpu_count()
180 ram_gb = psutil.virtual_memory().total / (1024**3)
181 ram_based = max(1, int(ram_gb * 1024 / 256))
182 return min(cpus, ram_based)
184 def _get_gunicorn_threads(self) -> int:
185 cpus = multiprocessing.cpu_count()
186 return max(2, min(cpus, 4))
188 def _get_default_max_requests(self, workers: int) -> int:
189 return 1000
191 def _compute_max_requests_jitter(self, max_requests: int) -> int:
192 return int(max_requests * 0.1)
194 def _can_enable_multi_queue_consumption(self, bench_path) -> bool:
195 return True
197 def generate_supervisor_config(self, bench_path, user="frappe", skip_redis=True) -> tuple[str, dict]:
198 from pathlib import Path
200 host_bench_dir = Path(bench_path).resolve() / "workspace" / "frappe-bench"
201 common_site_config_path = host_bench_dir / "sites" / COMMON_SITE_CONFIG_FILE
203 config = {}
204 if common_site_config_path.exists():
205 try:
206 config = json.loads(common_site_config_path.read_text())
207 except Exception:
208 pass
210 web_worker_count = config.get("gunicorn_workers", self._get_gunicorn_workers())
211 max_requests = config.get("gunicorn_max_requests", self._get_default_max_requests(web_worker_count))
212 # gthread worker class: each worker handles multiple concurrent requests via threads.
213 # Frappe is IO-bound (DB/Redis heavy) and its concurrency_limiter explicitly reads
214 # --threads from the gunicorn master cmdline, so gthread is the intended worker type.
215 # Default 2 threads per worker; overridable via common_site_config.json.
216 gunicorn_threads = config.get("gunicorn_threads", self._get_gunicorn_threads())
218 context = {
219 "bench_dir": CONTAINER_BENCH_DIR,
220 "sites_dir": f"{CONTAINER_BENCH_DIR}/sites",
221 "user": user,
222 "use_rq": True,
223 "http_timeout": config.get("http_timeout", 120),
224 "node": "/workspace/frappe-bench/.fnm/aliases/default/bin/node",
225 "webserver_port": config.get("webserver_port", 80),
226 "gunicorn_workers": web_worker_count,
227 "gunicorn_max_requests": max_requests,
228 "gunicorn_max_requests_jitter": self._compute_max_requests_jitter(max_requests),
229 "gunicorn_threads": gunicorn_threads,
230 "bench_name": "frappe-bench",
231 "background_workers": config.get("background_workers") or 1,
232 "bench_cmd": "/opt/user/.bin/bench",
233 "workers": config.get("workers", {}),
234 "multi_queue_consumption": self._can_enable_multi_queue_consumption(bench_path),
235 "supervisor_startretries": 10,
236 }
238 template_path = get_template_path("supervisor.conf.tmpl")
239 return Template(template_path.read_text()).render(**context), context
241 def setup_supervisor(self, bench_path, force: bool = False, use_run: bool = False) -> None:
242 import configparser
243 import io
244 from pathlib import Path
246 bench_dir = Path(bench_path).resolve() / "workspace" / "frappe-bench"
247 config_dir = bench_dir / "config"
249 self.output.change_head("Checking supervisor configuration")
251 fm_confs_exist = any(config_dir.glob("*.fm.supervisor.conf")) if config_dir.exists() else False
252 if fm_confs_exist and not force:
253 return
255 self.output.change_head("Configuring supervisor configs")
256 try:
257 rendered, context = self.generate_supervisor_config(bench_path)
258 except Exception as e:
259 raise BenchOperationException(self.bench_name, f"Failed to configure supervisor: {e}")
261 config_dir.mkdir(parents=True, exist_ok=True)
262 parsed = configparser.ConfigParser(allow_no_value=True, strict=False, interpolation=None)
263 parsed.read_string(rendered)
264 self._write_split_configs(parsed, config_dir)
265 self._write_gunicorn_wrapper(config_dir, context)
266 if self.config.newrelic_enabled and self.config.newrelic_license_key:
267 self._write_newrelic_config(config_dir)
268 self.output.print("Configured supervisor configs")
270 def setup_newrelic(self, bench_path) -> None:
271 from pathlib import Path
273 bench_dir = Path(bench_path).resolve() / "workspace" / "frappe-bench"
274 config_dir = bench_dir / "config"
275 config_dir.mkdir(parents=True, exist_ok=True)
277 self.output.change_head("Configuring supervisor configs")
279 try:
280 _, context = self.generate_supervisor_config(bench_path)
281 except Exception as e:
282 raise BenchOperationException(self.bench_name, f"Failed to read bench config for NewRelic setup: {e}")
284 self._write_gunicorn_wrapper(config_dir, context)
286 if self.config.newrelic_enabled and self.config.newrelic_license_key:
287 self._write_newrelic_config(config_dir)
289 self.output.print("Configured supervisor configs")
291 def _write_split_configs(self, config, config_dir) -> None:
292 import configparser
293 import io
294 from pathlib import Path
296 for section_name in config.sections():
297 if section_name.startswith("group:"):
298 continue
300 section_config = configparser.ConfigParser(allow_no_value=True, strict=False, interpolation=None)
301 section_config.add_section(section_name)
302 for key, value in config.items(section_name):
303 section_config.set(section_name, key, value)
305 delimiter = "-node-" if "-node-" in section_name else "-frappe-"
306 file_name_prefix = section_name.split(delimiter)[-1]
307 file_name = (
308 file_name_prefix + ".workers.fm.supervisor.conf"
309 if "worker" in section_name
310 else file_name_prefix + ".fm.supervisor.conf"
311 )
313 buf = io.StringIO()
314 section_config.write(buf)
315 Path(config_dir / file_name).write_text(buf.getvalue())
316 self.logger.info(f"Split supervisor conf {section_name} => {file_name}")
318 def _write_newrelic_config(self, config_dir) -> None:
319 import configparser
320 import io
321 from pathlib import Path
323 cfg = configparser.RawConfigParser()
325 cfg.add_section("newrelic")
326 cfg.set("newrelic", "license_key", self.config.newrelic_license_key or "")
327 cfg.set("newrelic", "app_name", f"Frappe - {self.bench_name}")
328 cfg.set("newrelic", "monitor_mode", "true")
329 cfg.set("newrelic", "high_security", "false")
330 cfg.set("newrelic", "log_file", f"{CONTAINER_BENCH_DIR}/logs/newrelic-agent.log")
331 cfg.set("newrelic", "log_level", "info")
332 cfg.set("newrelic", "startup_timeout", "0.0")
333 cfg.set("newrelic", "shutdown_timeout", "2.5")
334 cfg.set("newrelic", "distributed_tracing.enabled", "true")
335 cfg.set("newrelic", "span_events.max_samples_stored", "2000")
336 cfg.set("newrelic", "datastore_tracer.instance_reporting.enabled", "true")
337 cfg.set("newrelic", "datastore_tracer.database_name_reporting.enabled", "true")
339 cfg.add_section("transaction_tracer")
340 cfg.set("transaction_tracer", "enabled", "true")
341 cfg.set("transaction_tracer", "transaction_threshold", "apdex_f")
342 cfg.set("transaction_tracer", "record_sql", "obfuscated")
343 cfg.set("transaction_tracer", "stack_trace_threshold", "0.5")
344 cfg.set("transaction_tracer", "explain_enabled", "true")
345 cfg.set("transaction_tracer", "explain_threshold", "0.5")
346 cfg.set("transaction_tracer", "attributes.enabled", "true")
347 cfg.set(
348 "transaction_tracer",
349 "attributes.include",
350 "request.headers.x-frappe-request-id request.uri request.method",
351 )
352 cfg.set(
353 "transaction_tracer",
354 "attributes.exclude",
355 "request.headers.authorization request.headers.cookie",
356 )
358 cfg.add_section("error_collector")
359 cfg.set("error_collector", "enabled", "true")
360 cfg.set("error_collector", "ignore_status_codes", "100-102 200-208 226 300-308 404")
361 cfg.set(
362 "error_collector",
363 "expected_classes",
364 (
365 "frappe.exceptions.ValidationError "
366 "frappe.exceptions.PermissionError "
367 "frappe.exceptions.DoesNotExistError"
368 ),
369 )
370 cfg.set("error_collector", "attributes.enabled", "true")
371 cfg.set("error_collector", "max_event_samples_stored", "100")
373 buf = io.StringIO()
374 cfg.write(buf)
375 Path(config_dir / "newrelic.ini").write_text(buf.getvalue())
376 self.logger.info("Generated newrelic.ini")
378 def _write_gunicorn_wrapper(self, config_dir, context: dict) -> None:
379 from pathlib import Path
381 gunicorn_args = (
382 f"-b 0.0.0.0:{context['webserver_port']}"
383 f" -w {context['gunicorn_workers']}"
384 f" --worker-class=gthread"
385 f" --threads {context['gunicorn_threads']}"
386 f" --max-requests {context['gunicorn_max_requests']}"
387 f" --max-requests-jitter {context['gunicorn_max_requests_jitter']}"
388 f" -t {context['http_timeout']}"
389 f" --graceful-timeout 30"
390 f" frappe.app:application --preload"
391 )
393 template_path = get_template_path("fm-web-server.sh.tmpl")
394 script = Template(template_path.read_text()).render(
395 bench_dir=context["bench_dir"],
396 gunicorn_args=gunicorn_args,
397 bench_name=self.bench_name,
398 )
400 wrapper_path = Path(config_dir) / "fm-web-server.sh"
401 wrapper_path.write_text(script)
402 wrapper_path.chmod(0o755)
403 self.logger.info("Generated gunicorn.sh wrapper")