Coverage for src / mysingle / auth / security / jwt.py: 21%

157 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-01 23:57 +0900

1""" 

2JWT Token Management - Unified System 

3 

4Kong Gateway와 통합된 JWT 토큰 관리 시스템입니다. 

5- 사용자 액세스/리프레시 토큰 생성 및 검증 

6- 이메일 인증/비밀번호 재설정 토큰 생성 및 검증 

7- 서비스 간 통신용 토큰 생성 

8 

9 

10Usage Example: 

11 from mysingle.auth.security.jwt import get_jwt_manager 

12 

13 # 사용자 로그인 토큰 생성 (access / refresh) 

14 jwt_manager = get_jwt_manager() 

15 access = jwt_manager.create_user_token( 

16 user_id="507f1f77bcf86cd799439011", 

17 email="user@example.com", 

18 token_type="access", 

19 ) 

20 refresh = jwt_manager.create_user_token( 

21 user_id="507f1f77bcf86cd799439011", 

22 email="user@example.com", 

23 token_type="refresh", 

24 ) 

25 

26 # 토큰 검증/디코딩 

27 payload = jwt_manager.decode_token(access) 

28""" 

29 

30from datetime import UTC, datetime, timedelta 

31from typing import Any, Literal, Optional 

32 

33import jwt 

34 

35from ...core.config import settings 

36from ...core.logging import get_structured_logger 

37 

38logger = get_structured_logger(__name__) 

39 

40 

41class JWTManager: 

42 """ 

43 JWT Token 관리 클래스 

44 

45 Kong Gateway와 호환되는 JWT 토큰을 생성하고 검증합니다. 

46 """ 

47 

48 def __init__(self, app_settings: Optional[Any] = None): 

49 """ 

50 JWTManager 초기화 

51 

52 Args: 

53 settings: CommonSettings 인스턴스 (None이면 자동 생성) 

54 """ 

55 # 전역 settings 싱글톤을 기본으로 사용합니다. 

56 self.settings = app_settings or settings 

57 

58 # JWT 설정 

59 self.algorithm = "HS256" 

60 # 만료 시간 정책 (기본값) 

61 self.access_token_expire_minutes = settings.ACCESS_TOKEN_EXPIRE_MINUTES 

62 self.refresh_token_expire_days = settings.REFRESH_TOKEN_EXPIRE_DAYS 

63 self.service_token_expire_minutes = settings.SERVICE_TOKEN_EXPIRE_MINUTES 

64 self.verify_token_expire_hours = settings.VERIFY_TOKEN_EXPIRE_MINUTES // 60 

65 self.reset_token_expire_hours = settings.RESET_TOKEN_EXPIRE_MINUTES // 60 

66 # Kong Consumer Keys 

67 self.frontend_consumer_key = "frontend-key" 

68 # 서비스명은 '-service' 접미를 기준으로 정규화합니다. 

69 self.service_consumer_keys = { 

70 "iam-service": "iam-service-key", 

71 "journey-orchestrator-service": "journey-orchestrator-service-key", 

72 "strategy-service": "strategy-service-key", 

73 "backtest-service": "backtest-service-key", 

74 "optimization-service": "optimization-service-key", 

75 "dashboard-service": "dashboard-service-key", 

76 "notification-service": "notification-service-key", 

77 "market-data-service": "market-data-service-key", 

78 "genai-service": "genai-service-key", 

79 "ml-service": "ml-service-key", 

80 } 

81 

82 def create_user_token( 

83 self, 

84 user_id: str, 

85 email: str, 

86 *, 

87 token_type: Literal["access", "refresh"] = "access", 

88 is_verified: bool = False, 

89 is_superuser: bool = False, 

90 is_active: bool = True, 

91 audience: str = "quant-users", 

92 expires_delta: Optional[timedelta] = None, 

93 ) -> str: 

94 """ 

95 사용자 로그인용 JWT 토큰 생성 

96 

97 Args: 

98 user_id: 사용자 ID (MongoDB ObjectId string) 

99 email: 사용자 이메일 

100 token_type: 토큰 용도 (access|refresh) 

101 is_verified: 이메일 인증 여부 

102 is_superuser: 관리자 여부 

103 is_active: 활성 사용자 여부 

104 audience: aud 클레임 값(기본: "quant-users") 

105 expires_delta: 만료 시간 (None이면 정책에 따름) 

106 

107 Returns: 

108 str: JWT 토큰 

109 """ 

110 if expires_delta is None: 

111 if token_type == "access": 

112 expires_delta = timedelta(minutes=self.access_token_expire_minutes) 

113 else: 

114 expires_delta = timedelta(days=self.refresh_token_expire_days) 

115 

116 now = datetime.now(UTC) 

117 expire = now + expires_delta 

118 

119 payload = { 

120 # JWT 표준 클레임 

121 "iss": self.frontend_consumer_key, # Issuer (Kong Consumer Key) 

122 "sub": user_id, # Subject (User ID) 

123 "exp": expire, # Expiration Time 

124 "iat": now, # Issued At 

125 "aud": audience, 

126 "typ": token_type, 

127 # 커스텀 클레임 (사용자 정보) 

128 "email": email, 

129 "is_verified": is_verified, 

130 "is_superuser": is_superuser, 

131 "is_active": is_active, 

132 } 

133 

134 # JWT Secret (환경변수에서 가져옴) 

135 secret = self._get_jwt_secret_for_consumer(self.frontend_consumer_key) 

136 

137 try: 

138 token = jwt.encode(payload, secret, algorithm=self.algorithm) 

139 logger.debug( 

140 f"Created user token for user_id={user_id}, expires_at={expire}" 

141 ) 

142 return token 

143 except Exception as e: 

144 logger.error(f"Failed to create user token: {e}") 

145 raise 

146 

147 def create_service_token( 

148 self, 

149 service_name: str, 

150 expires_delta: Optional[timedelta] = None, 

151 ) -> str: 

152 """ 

153 서비스 간 통신용 JWT 토큰 생성 

154 

155 Args: 

156 service_name: 서비스 이름 (strategy-service, market-data-service 등) 

157 expires_delta: 만료 시간 (None이면 5분) 

158 

159 Returns: 

160 str: JWT 토큰 

161 """ 

162 normalized = ( 

163 service_name 

164 if service_name.endswith("-service") 

165 else f"{service_name}-service" 

166 ) 

167 if normalized not in self.service_consumer_keys: 

168 raise ValueError( 

169 f"Unknown service: {service_name}. " 

170 f"Valid services: {list(self.service_consumer_keys.keys())}" 

171 ) 

172 

173 if expires_delta is None: 

174 expires_delta = timedelta(minutes=self.service_token_expire_minutes) 

175 

176 now = datetime.now(UTC) 

177 expire = now + expires_delta 

178 

179 consumer_key = self.service_consumer_keys[normalized] 

180 

181 payload = { 

182 # JWT 표준 클레임 

183 "iss": consumer_key, # Issuer (Service Consumer Key) 

184 "sub": "service-account", # Subject (Service Account) 

185 "exp": expire, # Expiration Time 

186 "iat": now, # Issued At 

187 "aud": "internal", 

188 "typ": "service", 

189 # 커스텀 클레임 (서비스 정보) 

190 "service": normalized, 

191 } 

192 

193 # JWT Secret (서비스별 Secret) 

194 secret = self._get_jwt_secret_for_consumer(consumer_key) 

195 

196 try: 

197 token = jwt.encode(payload, secret, algorithm=self.algorithm) 

198 logger.debug( 

199 f"Created service token for {service_name}, expires_at={expire}" 

200 ) 

201 return token 

202 except Exception as e: 

203 logger.error(f"Failed to create service token: {e}") 

204 raise 

205 

206 def create_verification_token(self, user_id: str, email: str) -> str: 

207 """ 

208 이메일 인증 토큰 생성 

209 

210 iss는 iam-service consumer로 설정합니다. 

211 aud = "users:verify", typ = "verify" 

212 """ 

213 now = datetime.now(UTC) 

214 expire = now + timedelta(hours=self.verify_token_expire_hours) 

215 

216 consumer_key = self.service_consumer_keys["iam-service"] 

217 secret = self._get_jwt_secret_for_consumer(consumer_key) 

218 

219 payload = { 

220 "iss": consumer_key, 

221 "sub": user_id, 

222 "email": email, 

223 "aud": "users:verify", 

224 "typ": "verify", 

225 "iat": now, 

226 "exp": expire, 

227 } 

228 

229 try: 

230 return jwt.encode(payload, secret, algorithm=self.algorithm) 

231 except Exception as e: 

232 logger.error(f"Failed to create verification token: {e}") 

233 raise 

234 

235 def create_reset_password_token( 

236 self, user_id: str, password_fingerprint: str 

237 ) -> str: 

238 """ 

239 비밀번호 재설정 토큰 생성 

240 

241 iss는 iam-service consumer로 설정합니다. 

242 aud = "users:reset", typ = "reset" 

243 """ 

244 now = datetime.now(UTC) 

245 expire = now + timedelta(hours=self.reset_token_expire_hours) 

246 

247 consumer_key = self.service_consumer_keys["iam-service"] 

248 secret = self._get_jwt_secret_for_consumer(consumer_key) 

249 

250 payload = { 

251 "iss": consumer_key, 

252 "sub": user_id, 

253 "password_fgpt": password_fingerprint, 

254 "aud": "users:reset", 

255 "typ": "reset", 

256 "iat": now, 

257 "exp": expire, 

258 } 

259 

260 try: 

261 return jwt.encode(payload, secret, algorithm=self.algorithm) 

262 except Exception as e: 

263 logger.error(f"Failed to create reset password token: {e}") 

264 raise 

265 

266 def create_email_token(self, email: str) -> str: 

267 """ 

268 이메일 인증용 토큰 생성 

269 

270 iss는 iam-service consumer로 설정합니다. 

271 aud = "emails", typ = "email" 

272 """ 

273 now = datetime.now(UTC) 

274 expire = now + timedelta(hours=self.verify_token_expire_hours) 

275 

276 consumer_key = self.service_consumer_keys["iam-service"] 

277 secret = self._get_jwt_secret_for_consumer(consumer_key) 

278 

279 payload = { 

280 "iss": consumer_key, 

281 "sub": email, 

282 "aud": "emails", 

283 "typ": "email", 

284 "iat": now, 

285 "exp": expire, 

286 } 

287 

288 try: 

289 return jwt.encode(payload, secret, algorithm=self.algorithm) 

290 except Exception as e: 

291 logger.error(f"Failed to create email token: {e}") 

292 raise 

293 

294 def decode_token( 

295 self, 

296 token: str, 

297 verify: bool = True, 

298 ) -> dict[str, Any]: 

299 """ 

300 JWT 토큰 디코딩 및 검증 

301 

302 Args: 

303 token: JWT 토큰 

304 verify: 서명 검증 여부 (False는 디버깅용) 

305 

306 Returns: 

307 dict: JWT Payload 

308 

309 Raises: 

310 jwt.ExpiredSignatureError: 토큰 만료 

311 jwt.InvalidTokenError: 유효하지 않은 토큰 

312 """ 

313 try: 

314 if verify: 

315 # iss 클레임으로 Consumer 식별 

316 unverified_payload = jwt.decode( 

317 token, options={"verify_signature": False} 

318 ) 

319 consumer_key = unverified_payload.get("iss") 

320 

321 if not consumer_key: 

322 raise jwt.InvalidTokenError("Missing 'iss' claim") 

323 

324 secret = self._get_jwt_secret_for_consumer(consumer_key) 

325 

326 # 서명 검증 (aud 검증은 비활성화: 프런트/서비스 토큰의 aud 다양성 허용) 

327 payload = jwt.decode( 

328 token, 

329 secret, 

330 algorithms=[self.algorithm], 

331 options={"verify_exp": True, "verify_aud": False}, 

332 ) 

333 else: 

334 # 검증 없이 디코딩 (디버깅용) 

335 payload = jwt.decode(token, options={"verify_signature": False}) 

336 

337 return dict(payload) 

338 

339 except jwt.ExpiredSignatureError: 

340 logger.warning("Token has expired") 

341 raise 

342 except jwt.InvalidTokenError as e: 

343 logger.error(f"Invalid token: {e}") 

344 raise 

345 

346 def verify_token(self, token: str) -> bool: 

347 """ 

348 토큰 유효성 간단 검증 

349 

350 Args: 

351 token: JWT 토큰 

352 

353 Returns: 

354 bool: 유효하면 True, 아니면 False 

355 """ 

356 try: 

357 self.decode_token(token, verify=True) 

358 return True 

359 except (jwt.ExpiredSignatureError, jwt.InvalidTokenError): 

360 return False 

361 

362 def get_token_expiry(self, token: str) -> Optional[datetime]: 

363 """ 

364 토큰 만료 시간 조회 

365 

366 Args: 

367 token: JWT 토큰 

368 

369 Returns: 

370 Optional[datetime]: 만료 시간 (UTC) 

371 """ 

372 try: 

373 payload = self.decode_token(token, verify=False) 

374 exp_timestamp = payload.get("exp") 

375 if exp_timestamp: 

376 return datetime.fromtimestamp(exp_timestamp, tz=UTC) 

377 return None 

378 except Exception as e: 

379 logger.error(f"Failed to get token expiry: {e}") 

380 return None 

381 

382 def is_token_expired(self, token: str) -> bool: 

383 """ 

384 토큰 만료 여부 확인 

385 

386 Args: 

387 token: JWT 토큰 

388 

389 Returns: 

390 bool: 만료되었으면 True 

391 """ 

392 expiry = self.get_token_expiry(token) 

393 if expiry is None: 

394 return True 

395 return datetime.now(UTC) > expiry 

396 

397 def _get_jwt_secret_for_consumer(self, consumer_key: str) -> str: 

398 """ 

399 Consumer Key에 해당하는 JWT Secret 조회 

400 

401 Args: 

402 consumer_key: Kong Consumer Key 

403 

404 Returns: 

405 str: JWT Secret 

406 

407 Raises: 

408 ValueError: Secret이 설정되지 않은 경우 

409 """ 

410 # 환경변수 이름 매핑 

411 secret_env_map = { 

412 "frontend-key": "KONG_JWT_SECRET_FRONTEND", 

413 "iam-service-key": "KONG_JWT_SECRET_IAM", 

414 "strategy-service-key": "KONG_JWT_SECRET_STRATEGY", 

415 "backtest-service-key": "KONG_JWT_SECRET_BACKTEST", 

416 "indicator-service": "KONG_JWT_SECRET_INDICATOR", 

417 "optimization-service-key": "KONG_JWT_SECRET_OPTIMIZATION", 

418 "dashboard-service-key": "KONG_JWT_SECRET_DASHBOARD", 

419 "notification-service-key": "KONG_JWT_SECRET_NOTIFICATION", 

420 "market-data-service-key": "KONG_JWT_SECRET_MARKET_DATA", 

421 "genai-service-key": "KONG_JWT_SECRET_GENAI", 

422 "ml-service-key": "KONG_JWT_SECRET_ML", 

423 } 

424 

425 env_var = secret_env_map.get(consumer_key) 

426 if not env_var: 

427 raise ValueError(f"Unknown consumer key: {consumer_key}") 

428 

429 secret = getattr(self.settings, env_var, None) 

430 if not secret: 

431 # SECRET_KEY는 사용하지 않습니다. 구성 누락은 즉시 오류 처리합니다. 

432 raise ValueError( 

433 f"JWT secret not found for consumer '{consumer_key}'. Missing env: {env_var}" 

434 ) 

435 

436 return str(secret) 

437 

438 def extract_user_id_from_token(self, token: str) -> Optional[str]: 

439 """ 

440 토큰에서 User ID 추출 (편의 함수) 

441 

442 Args: 

443 token: JWT 토큰 

444 

445 Returns: 

446 Optional[str]: User ID 

447 """ 

448 try: 

449 payload = self.decode_token(token, verify=False) 

450 return payload.get("sub") 

451 except Exception: 

452 return None 

453 

454 def extract_user_email_from_token(self, token: str) -> Optional[str]: 

455 """ 

456 토큰에서 이메일 추출 (편의 함수) 

457 

458 Args: 

459 token: JWT 토큰 

460 

461 Returns: 

462 Optional[str]: 이메일 

463 """ 

464 try: 

465 payload = self.decode_token(token, verify=False) 

466 return payload.get("email") 

467 except Exception: 

468 return None 

469 

470 

471# ============================================================================= 

472# Singleton Instance & Convenience Functions 

473# ============================================================================= 

474 

475# 전역 JWTManager 인스턴스 (싱글톤 패턴) 

476_jwt_manager_instance: Optional[JWTManager] = None 

477 

478 

479def get_jwt_manager() -> JWTManager: 

480 """ 

481 JWTManager 싱글톤 인스턴스 반환 

482 

483 Returns: 

484 JWTManager: JWTManager 인스턴스 

485 """ 

486 global _jwt_manager_instance 

487 if _jwt_manager_instance is None: 487 ↛ 489line 487 didn't jump to line 489 because the condition on line 487 was always true

488 _jwt_manager_instance = JWTManager() 

489 return _jwt_manager_instance 

490 

491 

492def create_access_token( 

493 user_id: str, 

494 email: str, 

495 is_verified: bool = False, 

496 is_superuser: bool = False, 

497) -> str: 

498 """ 

499 사용자 액세스 토큰 생성 (편의 함수) 

500 

501 Args: 

502 user_id: 사용자 ID 

503 email: 이메일 

504 is_verified: 이메일 인증 여부 

505 is_superuser: 관리자 여부 

506 

507 Returns: 

508 str: JWT 토큰 

509 """ 

510 jwt_manager = get_jwt_manager() 

511 return jwt_manager.create_user_token( 

512 user_id=user_id, email=email, is_verified=is_verified, is_superuser=is_superuser 

513 ) 

514 

515 

516def verify_access_token(token: str) -> bool: 

517 """ 

518 액세스 토큰 검증 (편의 함수) 

519 

520 Args: 

521 token: JWT 토큰 

522 

523 Returns: 

524 bool: 유효하면 True 

525 """ 

526 jwt_manager = get_jwt_manager() 

527 return jwt_manager.verify_token(token) 

528 

529 

530def decode_access_token(token: str) -> dict[str, Any]: 

531 """ 

532 액세스 토큰 디코딩 (편의 함수) 

533 

534 Args: 

535 token: JWT 토큰 

536 

537 Returns: 

538 dict: JWT Payload 

539 """ 

540 jwt_manager = get_jwt_manager() 

541 return jwt_manager.decode_token(token)