Coverage for frappe_manager / services_manager / database_service_manager.py: 22%

194 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1import json 

2import time 

3from pathlib import Path 

4from typing import Any, Protocol 

5 

6from pydantic import BaseModel 

7 

8from frappe_manager.docker import ComposeFile, DockerClient, DockerException 

9from frappe_manager.docker.subprocess_output import SubprocessOutput 

10from frappe_manager.output_manager import OutputHandler 

11from frappe_manager.output_manager.rich_output import RichOutputHandler 

12from frappe_manager.services_manager.services_exceptions import ( 

13 DatabaseServiceDBCreateFailed, 

14 DatabaseServiceDBExportFailed, 

15 DatabaseServiceDBImportFailed, 

16 DatabaseServiceDBNotFoundError, 

17 DatabaseServiceDBRemoveFailError, 

18 DatabaseServiceException, 

19 DatabaseServicePasswordNotFound, 

20 DatabaseServiceQueryAccessDenied, 

21 DatabaseServiceStartTimeout, 

22 DatabaseServiceUserRemoveFailError, 

23) 

24from frappe_manager.site_manager.exceptions import BenchException 

25 

26 

27# TODO this class will be used for validation for main config 

28class DatabaseServerServiceInfo(BaseModel): 

29 host: str 

30 user: str 

31 port: int 

32 password: str 

33 name: str | None = None 

34 

35 @classmethod 

36 def import_from_compose_file( 

37 cls, 

38 compose_service_name: str, 

39 compose_file_manager: ComposeFile, 

40 raise_exception: bool = True, 

41 ): 

42 """ 

43 Provides info about a database server 

44 """ 

45 compose_service_envs = compose_file_manager.get_envs(container=compose_service_name) 

46 

47 info: dict[str, Any] = {} 

48 info["user"] = "root" 

49 # this also being considered as servicename 

50 info["host"] = compose_service_name 

51 info["port"] = 3306 

52 

53 # TODO use fm main config here 

54 # secrets or password ? 

55 if "MYSQL_ROOT_PASSWORD_FILE" in compose_service_envs: 

56 password_path: Path = compose_file_manager.get_secret_file_path("db_root_password") 

57 info["password"] = password_path.read_text() 

58 elif "MYSQL_ROOT_PASSWORD" in compose_service_envs: 

59 info["password"] = compose_service_envs["MYSQL_ROOT_PASSWORD"] 

60 elif raise_exception: 

61 raise DatabaseServicePasswordNotFound(compose_service_name) 

62 

63 return cls(**info) 

64 

65 @classmethod 

66 def import_from_bench(cls, bench_name: str, bench_path: Path, raise_exception=False): 

67 """ 

68 Provides info about a database server 

69 """ 

70 

71 site_config_file: Path = bench_path / "workspace" / "frappe-bench" / "sites" / bench_name / "site_config.json" 

72 common_site_config_file: Path = bench_path / "workspace" / "frappe-bench" / "sites" / "common_site_config.json" 

73 

74 info: dict[str, Any] = {} 

75 

76 info["password"] = None 

77 

78 if common_site_config_file.exists(): 

79 with open(common_site_config_file) as f: 

80 common_site_config = json.load(f) 

81 if common_site_config: 

82 info["host"] = common_site_config.get("db_host") 

83 info["port"] = common_site_config.get("db_port") 

84 

85 if site_config_file.exists(): 

86 with open(site_config_file) as f: 

87 site_config = json.load(f) 

88 if site_config: 

89 info["host"] = site_config.get("db_host") 

90 info["name"] = site_config["db_name"] 

91 info["user"] = site_config["db_name"] 

92 info["password"] = site_config["db_password"] 

93 

94 if raise_exception and not info["password"]: 

95 raise BenchException( 

96 bench_name, 

97 f"Password for the db user doesn't exits in either {common_site_config_file.name},{site_config_file.name}", 

98 ) 

99 

100 return cls(**info) 

101 

102 

103class DatabaseServiceManager(Protocol): 

104 database_server_info: DatabaseServerServiceInfo 

105 compose_file_manager: ComposeFile 

106 docker_client: DockerClient 

107 

108 def __init__( 

109 self, 

110 database_server_info: DatabaseServerServiceInfo, 

111 compose_file_manager: ComposeFile, 

112 docker_client: DockerClient, 

113 ) -> None: ... 

114 

115 def remove_user(self, db_user: str, db_user_host: str = "%", remove_all_host: bool = False): ... 

116 

117 def add_user(self, db_user: str, db_pass: str, db_user_host: str = "%", force: bool = False, timeout=25): ... 

118 

119 def grant_user_privilages(self, db_user: str, db_name: str): ... 

120 

121 def check_user_exists(self, db_user: str): ... 

122 

123 def check_db_exists(self, db_name: str): ... 

124 

125 def remove_db(self, db_name: str): ... 

126 

127 def wait_till_db_start(self, interval: int = 5, timeout: int = 30) -> bool: ... 

128 

129 def db_import(self, db_name: str, host_db_file_path: Path, force: bool = False): ... 

130 

131 

132class MariaDBManager(DatabaseServiceManager): 

133 def __init__( 

134 self, 

135 database_server_info: DatabaseServerServiceInfo, 

136 compose_file_manager: ComposeFile, 

137 docker_client: DockerClient, 

138 run_on_compose_service: str | None = None, 

139 output_handler: OutputHandler | None = None, 

140 ) -> None: 

141 """ 

142 Database manager 

143 """ 

144 self.database_server_info: DatabaseServerServiceInfo = database_server_info 

145 self.compose_file_manager: ComposeFile = compose_file_manager 

146 self.docker_client: DockerClient = docker_client 

147 self.output = output_handler or RichOutputHandler() 

148 

149 if not run_on_compose_service: 

150 self.run_on_compose_service: str = self.database_server_info.host 

151 else: 

152 self.run_on_compose_service: str = run_on_compose_service 

153 

154 self.base_command = f"/usr/bin/mariadb -u{self.database_server_info.user} -p'{self.database_server_info.password}' -P{self.database_server_info.port} -h{self.database_server_info.host} " 

155 self.base_query = "-e " 

156 

157 def _is_service_running(self, service: str) -> bool: 

158 """Check if a service is running.""" 

159 all_statuses = self.docker_client.compose.get_all_services_status() 

160 containers = self.compose_file_manager.get_container_names() 

161 service_container = containers.get(service) 

162 

163 for status in all_statuses: 

164 if status.get("Name") == service_container: 

165 return status.get("State") == "running" 

166 return False 

167 

168 def _compose_exec_or_run( 

169 self, 

170 command: str, 

171 stream: bool = False, 

172 user: str | None = None, 

173 rm: bool = False, 

174 entrypoint: str | None = None, 

175 ): 

176 """ 

177 Executes a command using compose.exec if the service is running, 

178 otherwise uses compose.run. 

179 """ 

180 if self._is_service_running(self.run_on_compose_service): 

181 return self.docker_client.compose.exec(self.run_on_compose_service, command=command, stream=stream) 

182 return self.docker_client.compose.run( 

183 self.run_on_compose_service, 

184 # command=command, 

185 stream=stream, 

186 user=user, 

187 rm=rm, 

188 entrypoint=command, 

189 ) 

190 

191 def db_run_query( 

192 self, 

193 query: str, 

194 raise_exception_obj: DatabaseServiceException | None = None, 

195 capture_output: bool = False, 

196 ): 

197 base_command = self.base_command 

198 

199 if capture_output: 

200 base_command += "--batch --skip-column-names " 

201 

202 db_query = base_command + self.base_query + query 

203 

204 try: 

205 output = self._compose_exec_or_run( 

206 db_query, 

207 stream=not capture_output, 

208 user="frappe", 

209 rm=True, 

210 ) 

211 if capture_output: 

212 return output 

213 self.output.live_lines(output) 

214 except DockerException as e: 

215 if raise_exception_obj: 

216 raise raise_exception_obj 

217 raise e 

218 

219 def wait_till_db_start(self, interval: int = 5, timeout: int = 30) -> bool: 

220 for i in range(timeout): 

221 if not self.is_db_running(): 

222 time.sleep(interval) 

223 else: 

224 return True 

225 total_timeout = interval * timeout 

226 raise DatabaseServiceStartTimeout(total_timeout, self.run_on_compose_service) 

227 

228 def is_db_running(self) -> bool: 

229 db_started_command = f"mysqladmin -P{self.database_server_info.port} -h{self.database_server_info.host} -u'{self.database_server_info.user}' -p'{self.database_server_info.password}' ping" 

230 try: 

231 output = self._compose_exec_or_run( 

232 db_started_command, 

233 stream=False, 

234 user="frappe", 

235 rm=True, 

236 entrypoint=None, 

237 ) 

238 return "mysqld is alive" in " ".join(output.stdout) 

239 except DockerException as e: 

240 return False 

241 

242 def get_db_users(self) -> dict[str, str]: 

243 show_db_user_command = "'SELECT User, Host FROM mysql.user;'" 

244 exception = DatabaseServiceException(self.database_server_info.host, "Failed to determine mysql users.") 

245 output: SubprocessOutput = self.db_run_query( 

246 show_db_user_command, 

247 raise_exception_obj=exception, 

248 capture_output=True, 

249 ) 

250 user_list: dict[str, str] = {} 

251 for line in output.stdout: 

252 username, host = line.split("\t") 

253 user_list[username] = host 

254 return user_list 

255 

256 def check_user_exists(self, username: str, host: str | None = None) -> bool: 

257 user_list = self.get_db_users() 

258 if username not in user_list: 

259 return False 

260 if not host: 

261 return True 

262 if not user_list[username] == host: 

263 return False 

264 return True 

265 

266 def get_all_databases(self) -> list[str]: 

267 db_exits_commmand = "'SHOW DATABASES;'" 

268 db_exits_exception = DatabaseServiceException( 

269 self.database_server_info.host, 

270 "Failed to get list of all databases.", 

271 ) 

272 try: 

273 output: SubprocessOutput = self.db_run_query(db_exits_commmand, capture_output=True) 

274 return output.stdout 

275 except DockerException as e: 

276 if "access denied" in " ".join(e.output.combined).lower(): 

277 raise DatabaseServiceQueryAccessDenied(db_exits_commmand) 

278 raise db_exits_exception 

279 

280 def check_db_exists(self, db_name: str): 

281 databases = self.get_all_databases() 

282 return db_name in databases 

283 

284 def remove_user(self, db_user: str, db_user_host: str = "%", remove_all_host: bool = False): 

285 users = {db_user: db_user_host} 

286 

287 if remove_all_host: 

288 users = self.get_db_users() 

289 

290 for user, host in users.items(): 

291 if db_user == user: 

292 remove_db_user_command = f"'DROP USER `{user}`@`{host}`;'" 

293 remove_db_user_exception = DatabaseServiceUserRemoveFailError(user, host) 

294 self.db_run_query(remove_db_user_command, remove_db_user_exception) 

295 

296 def remove_db(self, db_name: str): 

297 remove_db_command = f"'DROP DATABASE `{db_name}`;'" 

298 remove_db_exception = DatabaseServiceDBRemoveFailError(db_name, self.database_server_info.host) 

299 self.db_run_query(remove_db_command, remove_db_exception) 

300 

301 def grant_user_privilages(self, db_user: str, db_name: str): 

302 grant_user_command = f"'GRANT ALL PRIVILEGES ON `{db_name}`.* TO `{db_user}`@`%`;'" 

303 grant_user_exception = DatabaseServiceException( 

304 self.database_server_info.host, 

305 f"Failed to grant prvilages for user {db_user} on {db_name}.", 

306 ) 

307 self.db_run_query(grant_user_command, grant_user_exception) 

308 

309 def add_user(self, db_user: str, db_pass: str, db_user_host: str = "%", force: bool = False, timeout=25): 

310 if self.check_user_exists(db_user, db_user_host): 

311 if force: 

312 self.remove_user(db_user, db_user_host) 

313 else: 

314 raise DatabaseServiceException( 

315 self.run_on_compose_service, 

316 f"User {db_user} for {db_user_host} already exists.", 

317 ) 

318 

319 add_user_command = f"'CREATE USER `{db_user}`@`%` IDENTIFIED BY \"{db_pass}\";'" 

320 add_user_exception = DatabaseServiceException(self.database_server_info.host, f"Failed to add user {db_user}.") 

321 self.db_run_query(add_user_command, add_user_exception) 

322 

323 def db_export(self, db_name: str, export_file_path: str | Path): 

324 if not self.check_db_exists(db_name): 

325 raise DatabaseServiceDBNotFoundError(db_name, self.run_on_compose_service) 

326 

327 if isinstance(export_file_path, Path): 

328 export_file_path = str(export_file_path.absolute()) 

329 

330 db_export_command = f"mysqldump -u'{self.database_server_info.user}' -p'{self.database_server_info.password}' -h'{self.database_server_info.host}' -P{self.database_server_info.port} {db_name} --result-file={export_file_path}" 

331 

332 try: 

333 output = self._compose_exec_or_run( 

334 db_export_command, 

335 stream=False, 

336 user="frappe", 

337 rm=True, 

338 entrypoint=db_export_command, 

339 ) 

340 except DockerException: 

341 raise DatabaseServiceDBExportFailed(self.run_on_compose_service, db_name) 

342 

343 def db_create(self, db_name): 

344 create_db_command = f"'CREATE DATABASE IF NOT EXISTS `{db_name}`';" 

345 create_db_exception = DatabaseServiceDBCreateFailed(self.run_on_compose_service, db_name) 

346 self.db_run_query(create_db_command, create_db_exception) 

347 

348 def db_import(self, db_name: str, host_db_file_path: Path, force: bool = False): 

349 if not self.check_db_exists(db_name): 

350 if force: 

351 self.db_create(db_name) 

352 else: 

353 raise DatabaseServiceDBNotFoundError(db_name, self.run_on_compose_service) 

354 

355 container_db_file_name = host_db_file_path.name 

356 source = str(host_db_file_path.absolute()) 

357 

358 destination = f"{self.run_on_compose_service}:/tmp/{container_db_file_name}" 

359 db_import_command = self.base_command + f" {db_name} -e 'source /tmp/{container_db_file_name}'" 

360 

361 try: 

362 output = self.docker_client.compose.cp(source, destination, stream=False) 

363 output = self._compose_exec_or_run( 

364 db_import_command, 

365 stream=False, 

366 user="frappe", 

367 rm=True, 

368 entrypoint=None, 

369 ) 

370 except DockerException: 

371 raise DatabaseServiceDBImportFailed(self.run_on_compose_service, source)