Coverage for src / mysingle / core / db.py: 0%
55 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1"""Database utilities for MongoDB and Redis."""
3import os
4from typing import Any
5from urllib.parse import parse_qsl, quote_plus, urlencode, urlsplit, urlunsplit
7from beanie import Document, init_beanie
8from motor.motor_asyncio import AsyncIOMotorClient
10from .config import settings
13async def init_mongo(
14 models: list[type[Document]],
15 service_name: str,
16 mongodb_url: str | None = None,
17 **query_params: Any,
18) -> AsyncIOMotorClient:
19 """Initialize MongoDB with given models and return the client.
21 Parameters
22 ----------
23 models:
24 Beanie Document 모델 리스트.
25 service_name:
26 사용할 데이터베이스 이름(서비스명).
27 mongodb_url:
28 완전히 구성된 MongoDB URL.
29 - 제공되면 이 URL을 우선 사용하고, **query_params 로 온 값은 쿼리스트링에 병합.
30 - 제공되지 않으면 환경/설정 기반으로 URL을 생성.
31 **query_params:
32 URL 의 ? 뒤에 붙을 쿼리 파라미터들.
33 - production/staging: 기본값으로 retryWrites, w, appName 이 들어가며,
34 kwargs 로 덮어쓸 수 있음.
35 - 그 외 환경: 기본값으로 authSource 가 들어가며 kwargs 로 덮어쓸 수 있음.
36 """
37 admin_user = settings.MONGODB_USERNAME
38 admin_password = settings.MONGODB_PASSWORD
39 environment = settings.ENVIRONMENT
40 server = settings.MONGODB_SERVER
42 # 서비스 이름 fallback (비어있는 경우 환경변수 사용)
43 db_name = service_name or os.getenv("SERVICE_NAME", "mysingle")
45 # 1) 외부에서 완성된 mongodb_url 이 넘어온 경우 → 최우선 사용
46 if mongodb_url is not None:
47 final_url = add_query_params_to_url(mongodb_url, query_params)
48 else:
49 # 2) 환경에 따라 URL 생성
50 final_url = build_mongodb_url(
51 username=admin_user,
52 password=admin_password,
53 server=server,
54 database=db_name,
55 environment=environment,
56 query_params=query_params,
57 )
59 # Create Motor client
60 client: AsyncIOMotorClient = AsyncIOMotorClient(
61 final_url,
62 uuidRepresentation="standard",
63 )
65 # Initialize Beanie with the models
66 # Motor의 database 타입이 Beanie와 호환되지 않지만 실제로는 작동합니다
67 await init_beanie(
68 database=client.get_default_database(), # type: ignore[arg-type]
69 document_models=models,
70 )
72 return client
75def get_mongodb_url(service_name: str) -> str:
76 """Get MongoDB connection URL based on current settings."""
77 admin_user = settings.MONGODB_USERNAME
78 admin_password = settings.MONGODB_PASSWORD
79 environment = settings.ENVIRONMENT
80 server = settings.MONGODB_SERVER
82 db_name = service_name or os.getenv("SERVICE_NAME", "mysingle")
84 # 여기서는 쿼리 파라미터 커스터마이징 없이 기본값만 사용
85 return build_mongodb_url(
86 username=admin_user,
87 password=admin_password,
88 server=server,
89 database=db_name,
90 environment=environment,
91 query_params=None,
92 )
95def get_database_name(service_name: str) -> str:
96 """Get database name."""
97 return service_name
100def build_mongodb_url(
101 *,
102 username: str | None,
103 password: str | None,
104 server: str,
105 database: str,
106 environment: str,
107 query_params: dict[str, Any] | None = None,
108) -> str:
109 """
110 환경에 따라 mongodb 또는 mongodb+srv URL을 생성하는 헬퍼 함수.
112 - production/staging: mongodb+srv://
113 - 그 외: mongodb://
114 - query_params는 ? 뒤의 쿼리스트링으로 사용
115 """
116 query_params = {k: str(v) for k, v in (query_params or {}).items()}
118 # 프로토콜 결정
119 if environment in ["production", "staging"]:
120 scheme = "mongodb+srv"
121 # Atlas 같은 환경을 기본 타겟으로 기본값 부여 (kwargs로 덮어쓰기 가능)
122 default_params: dict[str, str] = {
123 "retryWrites": "true",
124 "w": "majority",
125 "appName": "mysingle",
126 }
127 else:
128 scheme = "mongodb"
129 # 로컬/개발용 기본값 (authSource는 주로 admin)
130 default_params = {
131 "authSource": "admin",
132 }
134 # kwargs가 기본값을 덮어쓰도록 병합
135 merged_params = (
136 {**default_params, **query_params} if query_params else default_params
137 )
139 # username/password 안전하게 인코딩
140 user = quote_plus(username) if username else ""
141 pwd = quote_plus(password) if password else ""
143 if user and pwd:
144 auth = f"{user}:{pwd}@"
145 elif user and not pwd:
146 # 패스워드 없는 계정이면 이렇게도 가능
147 auth = f"{user}@"
148 else:
149 auth = ""
151 query_string = urlencode(merged_params) if merged_params else ""
153 if query_string:
154 return f"{scheme}://{auth}{server}/{database}?{query_string}"
155 else:
156 return f"{scheme}://{auth}{server}/{database}"
159def add_query_params_to_url(url: str, extra_params: dict[str, Any] | None) -> str:
160 """
161 이미 완성된 mongodb_url에 쿼리 파라미터를 추가/병합.
163 - 기존 쿼리스트링이 있으면 유지하면서 extra_params가 덮어씀
164 - mongodb_url 파라미터를 직접 넘겼을 때도 kwargs를 활용하고 싶다면 유용
165 """
166 if not extra_params:
167 return url
169 extra_params = {k: str(v) for k, v in extra_params.items()}
171 scheme, netloc, path, query, fragment = urlsplit(url)
172 current_params = dict(parse_qsl(query, keep_blank_values=True)) if query else {}
173 current_params.update(extra_params)
175 new_query = urlencode(current_params)
176 return urlunsplit((scheme, netloc, path, new_query, fragment))