Coverage for src / mysingle / auth / security / jwt.py: 21%
157 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-01 23:57 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-01 23:57 +0900
1"""
2JWT Token Management - Unified System
4Kong Gateway와 통합된 JWT 토큰 관리 시스템입니다.
5- 사용자 액세스/리프레시 토큰 생성 및 검증
6- 이메일 인증/비밀번호 재설정 토큰 생성 및 검증
7- 서비스 간 통신용 토큰 생성
10Usage Example:
11 from mysingle.auth.security.jwt import get_jwt_manager
13 # 사용자 로그인 토큰 생성 (access / refresh)
14 jwt_manager = get_jwt_manager()
15 access = jwt_manager.create_user_token(
16 user_id="507f1f77bcf86cd799439011",
17 email="user@example.com",
18 token_type="access",
19 )
20 refresh = jwt_manager.create_user_token(
21 user_id="507f1f77bcf86cd799439011",
22 email="user@example.com",
23 token_type="refresh",
24 )
26 # 토큰 검증/디코딩
27 payload = jwt_manager.decode_token(access)
28"""
30from datetime import UTC, datetime, timedelta
31from typing import Any, Literal, Optional
33import jwt
35from ...core.config import settings
36from ...core.logging import get_structured_logger
38logger = get_structured_logger(__name__)
41class JWTManager:
42 """
43 JWT Token 관리 클래스
45 Kong Gateway와 호환되는 JWT 토큰을 생성하고 검증합니다.
46 """
48 def __init__(self, app_settings: Optional[Any] = None):
49 """
50 JWTManager 초기화
52 Args:
53 settings: CommonSettings 인스턴스 (None이면 자동 생성)
54 """
55 # 전역 settings 싱글톤을 기본으로 사용합니다.
56 self.settings = app_settings or settings
58 # JWT 설정
59 self.algorithm = "HS256"
60 # 만료 시간 정책 (기본값)
61 self.access_token_expire_minutes = settings.ACCESS_TOKEN_EXPIRE_MINUTES
62 self.refresh_token_expire_days = settings.REFRESH_TOKEN_EXPIRE_DAYS
63 self.service_token_expire_minutes = settings.SERVICE_TOKEN_EXPIRE_MINUTES
64 self.verify_token_expire_hours = settings.VERIFY_TOKEN_EXPIRE_MINUTES // 60
65 self.reset_token_expire_hours = settings.RESET_TOKEN_EXPIRE_MINUTES // 60
66 # Kong Consumer Keys
67 self.frontend_consumer_key = "frontend-key"
68 # 서비스명은 '-service' 접미를 기준으로 정규화합니다.
69 self.service_consumer_keys = {
70 "iam-service": "iam-service-key",
71 "journey-orchestrator-service": "journey-orchestrator-service-key",
72 "strategy-service": "strategy-service-key",
73 "backtest-service": "backtest-service-key",
74 "optimization-service": "optimization-service-key",
75 "dashboard-service": "dashboard-service-key",
76 "notification-service": "notification-service-key",
77 "market-data-service": "market-data-service-key",
78 "genai-service": "genai-service-key",
79 "ml-service": "ml-service-key",
80 }
82 def create_user_token(
83 self,
84 user_id: str,
85 email: str,
86 *,
87 token_type: Literal["access", "refresh"] = "access",
88 is_verified: bool = False,
89 is_superuser: bool = False,
90 is_active: bool = True,
91 audience: str = "quant-users",
92 expires_delta: Optional[timedelta] = None,
93 ) -> str:
94 """
95 사용자 로그인용 JWT 토큰 생성
97 Args:
98 user_id: 사용자 ID (MongoDB ObjectId string)
99 email: 사용자 이메일
100 token_type: 토큰 용도 (access|refresh)
101 is_verified: 이메일 인증 여부
102 is_superuser: 관리자 여부
103 is_active: 활성 사용자 여부
104 audience: aud 클레임 값(기본: "quant-users")
105 expires_delta: 만료 시간 (None이면 정책에 따름)
107 Returns:
108 str: JWT 토큰
109 """
110 if expires_delta is None:
111 if token_type == "access":
112 expires_delta = timedelta(minutes=self.access_token_expire_minutes)
113 else:
114 expires_delta = timedelta(days=self.refresh_token_expire_days)
116 now = datetime.now(UTC)
117 expire = now + expires_delta
119 payload = {
120 # JWT 표준 클레임
121 "iss": self.frontend_consumer_key, # Issuer (Kong Consumer Key)
122 "sub": user_id, # Subject (User ID)
123 "exp": expire, # Expiration Time
124 "iat": now, # Issued At
125 "aud": audience,
126 "typ": token_type,
127 # 커스텀 클레임 (사용자 정보)
128 "email": email,
129 "is_verified": is_verified,
130 "is_superuser": is_superuser,
131 "is_active": is_active,
132 }
134 # JWT Secret (환경변수에서 가져옴)
135 secret = self._get_jwt_secret_for_consumer(self.frontend_consumer_key)
137 try:
138 token = jwt.encode(payload, secret, algorithm=self.algorithm)
139 logger.debug(
140 f"Created user token for user_id={user_id}, expires_at={expire}"
141 )
142 return token
143 except Exception as e:
144 logger.error(f"Failed to create user token: {e}")
145 raise
147 def create_service_token(
148 self,
149 service_name: str,
150 expires_delta: Optional[timedelta] = None,
151 ) -> str:
152 """
153 서비스 간 통신용 JWT 토큰 생성
155 Args:
156 service_name: 서비스 이름 (strategy-service, market-data-service 등)
157 expires_delta: 만료 시간 (None이면 5분)
159 Returns:
160 str: JWT 토큰
161 """
162 normalized = (
163 service_name
164 if service_name.endswith("-service")
165 else f"{service_name}-service"
166 )
167 if normalized not in self.service_consumer_keys:
168 raise ValueError(
169 f"Unknown service: {service_name}. "
170 f"Valid services: {list(self.service_consumer_keys.keys())}"
171 )
173 if expires_delta is None:
174 expires_delta = timedelta(minutes=self.service_token_expire_minutes)
176 now = datetime.now(UTC)
177 expire = now + expires_delta
179 consumer_key = self.service_consumer_keys[normalized]
181 payload = {
182 # JWT 표준 클레임
183 "iss": consumer_key, # Issuer (Service Consumer Key)
184 "sub": "service-account", # Subject (Service Account)
185 "exp": expire, # Expiration Time
186 "iat": now, # Issued At
187 "aud": "internal",
188 "typ": "service",
189 # 커스텀 클레임 (서비스 정보)
190 "service": normalized,
191 }
193 # JWT Secret (서비스별 Secret)
194 secret = self._get_jwt_secret_for_consumer(consumer_key)
196 try:
197 token = jwt.encode(payload, secret, algorithm=self.algorithm)
198 logger.debug(
199 f"Created service token for {service_name}, expires_at={expire}"
200 )
201 return token
202 except Exception as e:
203 logger.error(f"Failed to create service token: {e}")
204 raise
206 def create_verification_token(self, user_id: str, email: str) -> str:
207 """
208 이메일 인증 토큰 생성
210 iss는 iam-service consumer로 설정합니다.
211 aud = "users:verify", typ = "verify"
212 """
213 now = datetime.now(UTC)
214 expire = now + timedelta(hours=self.verify_token_expire_hours)
216 consumer_key = self.service_consumer_keys["iam-service"]
217 secret = self._get_jwt_secret_for_consumer(consumer_key)
219 payload = {
220 "iss": consumer_key,
221 "sub": user_id,
222 "email": email,
223 "aud": "users:verify",
224 "typ": "verify",
225 "iat": now,
226 "exp": expire,
227 }
229 try:
230 return jwt.encode(payload, secret, algorithm=self.algorithm)
231 except Exception as e:
232 logger.error(f"Failed to create verification token: {e}")
233 raise
235 def create_reset_password_token(
236 self, user_id: str, password_fingerprint: str
237 ) -> str:
238 """
239 비밀번호 재설정 토큰 생성
241 iss는 iam-service consumer로 설정합니다.
242 aud = "users:reset", typ = "reset"
243 """
244 now = datetime.now(UTC)
245 expire = now + timedelta(hours=self.reset_token_expire_hours)
247 consumer_key = self.service_consumer_keys["iam-service"]
248 secret = self._get_jwt_secret_for_consumer(consumer_key)
250 payload = {
251 "iss": consumer_key,
252 "sub": user_id,
253 "password_fgpt": password_fingerprint,
254 "aud": "users:reset",
255 "typ": "reset",
256 "iat": now,
257 "exp": expire,
258 }
260 try:
261 return jwt.encode(payload, secret, algorithm=self.algorithm)
262 except Exception as e:
263 logger.error(f"Failed to create reset password token: {e}")
264 raise
266 def create_email_token(self, email: str) -> str:
267 """
268 이메일 인증용 토큰 생성
270 iss는 iam-service consumer로 설정합니다.
271 aud = "emails", typ = "email"
272 """
273 now = datetime.now(UTC)
274 expire = now + timedelta(hours=self.verify_token_expire_hours)
276 consumer_key = self.service_consumer_keys["iam-service"]
277 secret = self._get_jwt_secret_for_consumer(consumer_key)
279 payload = {
280 "iss": consumer_key,
281 "sub": email,
282 "aud": "emails",
283 "typ": "email",
284 "iat": now,
285 "exp": expire,
286 }
288 try:
289 return jwt.encode(payload, secret, algorithm=self.algorithm)
290 except Exception as e:
291 logger.error(f"Failed to create email token: {e}")
292 raise
294 def decode_token(
295 self,
296 token: str,
297 verify: bool = True,
298 ) -> dict[str, Any]:
299 """
300 JWT 토큰 디코딩 및 검증
302 Args:
303 token: JWT 토큰
304 verify: 서명 검증 여부 (False는 디버깅용)
306 Returns:
307 dict: JWT Payload
309 Raises:
310 jwt.ExpiredSignatureError: 토큰 만료
311 jwt.InvalidTokenError: 유효하지 않은 토큰
312 """
313 try:
314 if verify:
315 # iss 클레임으로 Consumer 식별
316 unverified_payload = jwt.decode(
317 token, options={"verify_signature": False}
318 )
319 consumer_key = unverified_payload.get("iss")
321 if not consumer_key:
322 raise jwt.InvalidTokenError("Missing 'iss' claim")
324 secret = self._get_jwt_secret_for_consumer(consumer_key)
326 # 서명 검증 (aud 검증은 비활성화: 프런트/서비스 토큰의 aud 다양성 허용)
327 payload = jwt.decode(
328 token,
329 secret,
330 algorithms=[self.algorithm],
331 options={"verify_exp": True, "verify_aud": False},
332 )
333 else:
334 # 검증 없이 디코딩 (디버깅용)
335 payload = jwt.decode(token, options={"verify_signature": False})
337 return dict(payload)
339 except jwt.ExpiredSignatureError:
340 logger.warning("Token has expired")
341 raise
342 except jwt.InvalidTokenError as e:
343 logger.error(f"Invalid token: {e}")
344 raise
346 def verify_token(self, token: str) -> bool:
347 """
348 토큰 유효성 간단 검증
350 Args:
351 token: JWT 토큰
353 Returns:
354 bool: 유효하면 True, 아니면 False
355 """
356 try:
357 self.decode_token(token, verify=True)
358 return True
359 except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
360 return False
362 def get_token_expiry(self, token: str) -> Optional[datetime]:
363 """
364 토큰 만료 시간 조회
366 Args:
367 token: JWT 토큰
369 Returns:
370 Optional[datetime]: 만료 시간 (UTC)
371 """
372 try:
373 payload = self.decode_token(token, verify=False)
374 exp_timestamp = payload.get("exp")
375 if exp_timestamp:
376 return datetime.fromtimestamp(exp_timestamp, tz=UTC)
377 return None
378 except Exception as e:
379 logger.error(f"Failed to get token expiry: {e}")
380 return None
382 def is_token_expired(self, token: str) -> bool:
383 """
384 토큰 만료 여부 확인
386 Args:
387 token: JWT 토큰
389 Returns:
390 bool: 만료되었으면 True
391 """
392 expiry = self.get_token_expiry(token)
393 if expiry is None:
394 return True
395 return datetime.now(UTC) > expiry
397 def _get_jwt_secret_for_consumer(self, consumer_key: str) -> str:
398 """
399 Consumer Key에 해당하는 JWT Secret 조회
401 Args:
402 consumer_key: Kong Consumer Key
404 Returns:
405 str: JWT Secret
407 Raises:
408 ValueError: Secret이 설정되지 않은 경우
409 """
410 # 환경변수 이름 매핑
411 secret_env_map = {
412 "frontend-key": "KONG_JWT_SECRET_FRONTEND",
413 "iam-service-key": "KONG_JWT_SECRET_IAM",
414 "strategy-service-key": "KONG_JWT_SECRET_STRATEGY",
415 "backtest-service-key": "KONG_JWT_SECRET_BACKTEST",
416 "indicator-service": "KONG_JWT_SECRET_INDICATOR",
417 "optimization-service-key": "KONG_JWT_SECRET_OPTIMIZATION",
418 "dashboard-service-key": "KONG_JWT_SECRET_DASHBOARD",
419 "notification-service-key": "KONG_JWT_SECRET_NOTIFICATION",
420 "market-data-service-key": "KONG_JWT_SECRET_MARKET_DATA",
421 "genai-service-key": "KONG_JWT_SECRET_GENAI",
422 "ml-service-key": "KONG_JWT_SECRET_ML",
423 }
425 env_var = secret_env_map.get(consumer_key)
426 if not env_var:
427 raise ValueError(f"Unknown consumer key: {consumer_key}")
429 secret = getattr(self.settings, env_var, None)
430 if not secret:
431 # SECRET_KEY는 사용하지 않습니다. 구성 누락은 즉시 오류 처리합니다.
432 raise ValueError(
433 f"JWT secret not found for consumer '{consumer_key}'. Missing env: {env_var}"
434 )
436 return str(secret)
438 def extract_user_id_from_token(self, token: str) -> Optional[str]:
439 """
440 토큰에서 User ID 추출 (편의 함수)
442 Args:
443 token: JWT 토큰
445 Returns:
446 Optional[str]: User ID
447 """
448 try:
449 payload = self.decode_token(token, verify=False)
450 return payload.get("sub")
451 except Exception:
452 return None
454 def extract_user_email_from_token(self, token: str) -> Optional[str]:
455 """
456 토큰에서 이메일 추출 (편의 함수)
458 Args:
459 token: JWT 토큰
461 Returns:
462 Optional[str]: 이메일
463 """
464 try:
465 payload = self.decode_token(token, verify=False)
466 return payload.get("email")
467 except Exception:
468 return None
471# =============================================================================
472# Singleton Instance & Convenience Functions
473# =============================================================================
475# 전역 JWTManager 인스턴스 (싱글톤 패턴)
476_jwt_manager_instance: Optional[JWTManager] = None
479def get_jwt_manager() -> JWTManager:
480 """
481 JWTManager 싱글톤 인스턴스 반환
483 Returns:
484 JWTManager: JWTManager 인스턴스
485 """
486 global _jwt_manager_instance
487 if _jwt_manager_instance is None: 487 ↛ 489line 487 didn't jump to line 489 because the condition on line 487 was always true
488 _jwt_manager_instance = JWTManager()
489 return _jwt_manager_instance
492def create_access_token(
493 user_id: str,
494 email: str,
495 is_verified: bool = False,
496 is_superuser: bool = False,
497) -> str:
498 """
499 사용자 액세스 토큰 생성 (편의 함수)
501 Args:
502 user_id: 사용자 ID
503 email: 이메일
504 is_verified: 이메일 인증 여부
505 is_superuser: 관리자 여부
507 Returns:
508 str: JWT 토큰
509 """
510 jwt_manager = get_jwt_manager()
511 return jwt_manager.create_user_token(
512 user_id=user_id, email=email, is_verified=is_verified, is_superuser=is_superuser
513 )
516def verify_access_token(token: str) -> bool:
517 """
518 액세스 토큰 검증 (편의 함수)
520 Args:
521 token: JWT 토큰
523 Returns:
524 bool: 유효하면 True
525 """
526 jwt_manager = get_jwt_manager()
527 return jwt_manager.verify_token(token)
530def decode_access_token(token: str) -> dict[str, Any]:
531 """
532 액세스 토큰 디코딩 (편의 함수)
534 Args:
535 token: JWT 토큰
537 Returns:
538 dict: JWT Payload
539 """
540 jwt_manager = get_jwt_manager()
541 return jwt_manager.decode_token(token)