Coverage for frappe_manager / services_manager / database_service_manager.py: 22%
194 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1import json
2import time
3from pathlib import Path
4from typing import Any, Protocol
6from pydantic import BaseModel
8from frappe_manager.docker import ComposeFile, DockerClient, DockerException
9from frappe_manager.docker.subprocess_output import SubprocessOutput
10from frappe_manager.output_manager import OutputHandler
11from frappe_manager.output_manager.rich_output import RichOutputHandler
12from frappe_manager.services_manager.services_exceptions import (
13 DatabaseServiceDBCreateFailed,
14 DatabaseServiceDBExportFailed,
15 DatabaseServiceDBImportFailed,
16 DatabaseServiceDBNotFoundError,
17 DatabaseServiceDBRemoveFailError,
18 DatabaseServiceException,
19 DatabaseServicePasswordNotFound,
20 DatabaseServiceQueryAccessDenied,
21 DatabaseServiceStartTimeout,
22 DatabaseServiceUserRemoveFailError,
23)
24from frappe_manager.site_manager.exceptions import BenchException
27# TODO this class will be used for validation for main config
28class DatabaseServerServiceInfo(BaseModel):
29 host: str
30 user: str
31 port: int
32 password: str
33 name: str | None = None
35 @classmethod
36 def import_from_compose_file(
37 cls,
38 compose_service_name: str,
39 compose_file_manager: ComposeFile,
40 raise_exception: bool = True,
41 ):
42 """
43 Provides info about a database server
44 """
45 compose_service_envs = compose_file_manager.get_envs(container=compose_service_name)
47 info: dict[str, Any] = {}
48 info["user"] = "root"
49 # this also being considered as servicename
50 info["host"] = compose_service_name
51 info["port"] = 3306
53 # TODO use fm main config here
54 # secrets or password ?
55 if "MYSQL_ROOT_PASSWORD_FILE" in compose_service_envs:
56 password_path: Path = compose_file_manager.get_secret_file_path("db_root_password")
57 info["password"] = password_path.read_text()
58 elif "MYSQL_ROOT_PASSWORD" in compose_service_envs:
59 info["password"] = compose_service_envs["MYSQL_ROOT_PASSWORD"]
60 elif raise_exception:
61 raise DatabaseServicePasswordNotFound(compose_service_name)
63 return cls(**info)
65 @classmethod
66 def import_from_bench(cls, bench_name: str, bench_path: Path, raise_exception=False):
67 """
68 Provides info about a database server
69 """
71 site_config_file: Path = bench_path / "workspace" / "frappe-bench" / "sites" / bench_name / "site_config.json"
72 common_site_config_file: Path = bench_path / "workspace" / "frappe-bench" / "sites" / "common_site_config.json"
74 info: dict[str, Any] = {}
76 info["password"] = None
78 if common_site_config_file.exists():
79 with open(common_site_config_file) as f:
80 common_site_config = json.load(f)
81 if common_site_config:
82 info["host"] = common_site_config.get("db_host")
83 info["port"] = common_site_config.get("db_port")
85 if site_config_file.exists():
86 with open(site_config_file) as f:
87 site_config = json.load(f)
88 if site_config:
89 info["host"] = site_config.get("db_host")
90 info["name"] = site_config["db_name"]
91 info["user"] = site_config["db_name"]
92 info["password"] = site_config["db_password"]
94 if raise_exception and not info["password"]:
95 raise BenchException(
96 bench_name,
97 f"Password for the db user doesn't exits in either {common_site_config_file.name},{site_config_file.name}",
98 )
100 return cls(**info)
103class DatabaseServiceManager(Protocol):
104 database_server_info: DatabaseServerServiceInfo
105 compose_file_manager: ComposeFile
106 docker_client: DockerClient
108 def __init__(
109 self,
110 database_server_info: DatabaseServerServiceInfo,
111 compose_file_manager: ComposeFile,
112 docker_client: DockerClient,
113 ) -> None: ...
115 def remove_user(self, db_user: str, db_user_host: str = "%", remove_all_host: bool = False): ...
117 def add_user(self, db_user: str, db_pass: str, db_user_host: str = "%", force: bool = False, timeout=25): ...
119 def grant_user_privilages(self, db_user: str, db_name: str): ...
121 def check_user_exists(self, db_user: str): ...
123 def check_db_exists(self, db_name: str): ...
125 def remove_db(self, db_name: str): ...
127 def wait_till_db_start(self, interval: int = 5, timeout: int = 30) -> bool: ...
129 def db_import(self, db_name: str, host_db_file_path: Path, force: bool = False): ...
132class MariaDBManager(DatabaseServiceManager):
133 def __init__(
134 self,
135 database_server_info: DatabaseServerServiceInfo,
136 compose_file_manager: ComposeFile,
137 docker_client: DockerClient,
138 run_on_compose_service: str | None = None,
139 output_handler: OutputHandler | None = None,
140 ) -> None:
141 """
142 Database manager
143 """
144 self.database_server_info: DatabaseServerServiceInfo = database_server_info
145 self.compose_file_manager: ComposeFile = compose_file_manager
146 self.docker_client: DockerClient = docker_client
147 self.output = output_handler or RichOutputHandler()
149 if not run_on_compose_service:
150 self.run_on_compose_service: str = self.database_server_info.host
151 else:
152 self.run_on_compose_service: str = run_on_compose_service
154 self.base_command = f"/usr/bin/mariadb -u{self.database_server_info.user} -p'{self.database_server_info.password}' -P{self.database_server_info.port} -h{self.database_server_info.host} "
155 self.base_query = "-e "
157 def _is_service_running(self, service: str) -> bool:
158 """Check if a service is running."""
159 all_statuses = self.docker_client.compose.get_all_services_status()
160 containers = self.compose_file_manager.get_container_names()
161 service_container = containers.get(service)
163 for status in all_statuses:
164 if status.get("Name") == service_container:
165 return status.get("State") == "running"
166 return False
168 def _compose_exec_or_run(
169 self,
170 command: str,
171 stream: bool = False,
172 user: str | None = None,
173 rm: bool = False,
174 entrypoint: str | None = None,
175 ):
176 """
177 Executes a command using compose.exec if the service is running,
178 otherwise uses compose.run.
179 """
180 if self._is_service_running(self.run_on_compose_service):
181 return self.docker_client.compose.exec(self.run_on_compose_service, command=command, stream=stream)
182 return self.docker_client.compose.run(
183 self.run_on_compose_service,
184 # command=command,
185 stream=stream,
186 user=user,
187 rm=rm,
188 entrypoint=command,
189 )
191 def db_run_query(
192 self,
193 query: str,
194 raise_exception_obj: DatabaseServiceException | None = None,
195 capture_output: bool = False,
196 ):
197 base_command = self.base_command
199 if capture_output:
200 base_command += "--batch --skip-column-names "
202 db_query = base_command + self.base_query + query
204 try:
205 output = self._compose_exec_or_run(
206 db_query,
207 stream=not capture_output,
208 user="frappe",
209 rm=True,
210 )
211 if capture_output:
212 return output
213 self.output.live_lines(output)
214 except DockerException as e:
215 if raise_exception_obj:
216 raise raise_exception_obj
217 raise e
219 def wait_till_db_start(self, interval: int = 5, timeout: int = 30) -> bool:
220 for i in range(timeout):
221 if not self.is_db_running():
222 time.sleep(interval)
223 else:
224 return True
225 total_timeout = interval * timeout
226 raise DatabaseServiceStartTimeout(total_timeout, self.run_on_compose_service)
228 def is_db_running(self) -> bool:
229 db_started_command = f"mysqladmin -P{self.database_server_info.port} -h{self.database_server_info.host} -u'{self.database_server_info.user}' -p'{self.database_server_info.password}' ping"
230 try:
231 output = self._compose_exec_or_run(
232 db_started_command,
233 stream=False,
234 user="frappe",
235 rm=True,
236 entrypoint=None,
237 )
238 return "mysqld is alive" in " ".join(output.stdout)
239 except DockerException as e:
240 return False
242 def get_db_users(self) -> dict[str, str]:
243 show_db_user_command = "'SELECT User, Host FROM mysql.user;'"
244 exception = DatabaseServiceException(self.database_server_info.host, "Failed to determine mysql users.")
245 output: SubprocessOutput = self.db_run_query(
246 show_db_user_command,
247 raise_exception_obj=exception,
248 capture_output=True,
249 )
250 user_list: dict[str, str] = {}
251 for line in output.stdout:
252 username, host = line.split("\t")
253 user_list[username] = host
254 return user_list
256 def check_user_exists(self, username: str, host: str | None = None) -> bool:
257 user_list = self.get_db_users()
258 if username not in user_list:
259 return False
260 if not host:
261 return True
262 if not user_list[username] == host:
263 return False
264 return True
266 def get_all_databases(self) -> list[str]:
267 db_exits_commmand = "'SHOW DATABASES;'"
268 db_exits_exception = DatabaseServiceException(
269 self.database_server_info.host,
270 "Failed to get list of all databases.",
271 )
272 try:
273 output: SubprocessOutput = self.db_run_query(db_exits_commmand, capture_output=True)
274 return output.stdout
275 except DockerException as e:
276 if "access denied" in " ".join(e.output.combined).lower():
277 raise DatabaseServiceQueryAccessDenied(db_exits_commmand)
278 raise db_exits_exception
280 def check_db_exists(self, db_name: str):
281 databases = self.get_all_databases()
282 return db_name in databases
284 def remove_user(self, db_user: str, db_user_host: str = "%", remove_all_host: bool = False):
285 users = {db_user: db_user_host}
287 if remove_all_host:
288 users = self.get_db_users()
290 for user, host in users.items():
291 if db_user == user:
292 remove_db_user_command = f"'DROP USER `{user}`@`{host}`;'"
293 remove_db_user_exception = DatabaseServiceUserRemoveFailError(user, host)
294 self.db_run_query(remove_db_user_command, remove_db_user_exception)
296 def remove_db(self, db_name: str):
297 remove_db_command = f"'DROP DATABASE `{db_name}`;'"
298 remove_db_exception = DatabaseServiceDBRemoveFailError(db_name, self.database_server_info.host)
299 self.db_run_query(remove_db_command, remove_db_exception)
301 def grant_user_privilages(self, db_user: str, db_name: str):
302 grant_user_command = f"'GRANT ALL PRIVILEGES ON `{db_name}`.* TO `{db_user}`@`%`;'"
303 grant_user_exception = DatabaseServiceException(
304 self.database_server_info.host,
305 f"Failed to grant prvilages for user {db_user} on {db_name}.",
306 )
307 self.db_run_query(grant_user_command, grant_user_exception)
309 def add_user(self, db_user: str, db_pass: str, db_user_host: str = "%", force: bool = False, timeout=25):
310 if self.check_user_exists(db_user, db_user_host):
311 if force:
312 self.remove_user(db_user, db_user_host)
313 else:
314 raise DatabaseServiceException(
315 self.run_on_compose_service,
316 f"User {db_user} for {db_user_host} already exists.",
317 )
319 add_user_command = f"'CREATE USER `{db_user}`@`%` IDENTIFIED BY \"{db_pass}\";'"
320 add_user_exception = DatabaseServiceException(self.database_server_info.host, f"Failed to add user {db_user}.")
321 self.db_run_query(add_user_command, add_user_exception)
323 def db_export(self, db_name: str, export_file_path: str | Path):
324 if not self.check_db_exists(db_name):
325 raise DatabaseServiceDBNotFoundError(db_name, self.run_on_compose_service)
327 if isinstance(export_file_path, Path):
328 export_file_path = str(export_file_path.absolute())
330 db_export_command = f"mysqldump -u'{self.database_server_info.user}' -p'{self.database_server_info.password}' -h'{self.database_server_info.host}' -P{self.database_server_info.port} {db_name} --result-file={export_file_path}"
332 try:
333 output = self._compose_exec_or_run(
334 db_export_command,
335 stream=False,
336 user="frappe",
337 rm=True,
338 entrypoint=db_export_command,
339 )
340 except DockerException:
341 raise DatabaseServiceDBExportFailed(self.run_on_compose_service, db_name)
343 def db_create(self, db_name):
344 create_db_command = f"'CREATE DATABASE IF NOT EXISTS `{db_name}`';"
345 create_db_exception = DatabaseServiceDBCreateFailed(self.run_on_compose_service, db_name)
346 self.db_run_query(create_db_command, create_db_exception)
348 def db_import(self, db_name: str, host_db_file_path: Path, force: bool = False):
349 if not self.check_db_exists(db_name):
350 if force:
351 self.db_create(db_name)
352 else:
353 raise DatabaseServiceDBNotFoundError(db_name, self.run_on_compose_service)
355 container_db_file_name = host_db_file_path.name
356 source = str(host_db_file_path.absolute())
358 destination = f"{self.run_on_compose_service}:/tmp/{container_db_file_name}"
359 db_import_command = self.base_command + f" {db_name} -e 'source /tmp/{container_db_file_name}'"
361 try:
362 output = self.docker_client.compose.cp(source, destination, stream=False)
363 output = self._compose_exec_or_run(
364 db_import_command,
365 stream=False,
366 user="frappe",
367 rm=True,
368 entrypoint=None,
369 )
370 except DockerException:
371 raise DatabaseServiceDBImportFailed(self.run_on_compose_service, source)