Coverage for src / mysingle / auth / middleware.py: 0%

221 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 00:58 +0900

1""" 

2Authentication Middleware v2 - Request-Based Authentication with Kong Gateway Integration 

3 

4새로운 Request 기반 인증 시스템을 위한 리팩토링된 미들웨어입니다. 

5기존 gateway_deps 의존성을 제거하고 내장 인증 로직으로 대체했습니다. 

6 

7Features: 

8- Request.state.user 직접 주입 (deps_new.py와 완전 호환) 

9- 서비스 타입별 자동 인증 방식 선택 (IAM vs NON_IAM) 

10- Kong Gateway 헤더 기반 인증 지원 

11- 공개 경로 자동 제외 

12- 테스트 환경 인증 우회 지원 (MYSINGLE_AUTH_BYPASS=true) 

13- 높은 성능 및 에러 처리 

14""" 

15 

16import os 

17from typing import Optional 

18 

19from fastapi import Request 

20from fastapi.responses import JSONResponse 

21from starlette.middleware.base import BaseHTTPMiddleware 

22from starlette.types import ASGIApp 

23 

24from ..core.logging import get_structured_logger 

25from ..core.service_types import ServiceConfig, ServiceType 

26from .cache import get_user_cache 

27from .exceptions import AuthorizationFailed, InvalidToken, UserInactive, UserNotExists 

28from .models import User 

29 

30logger = get_structured_logger(__name__) 

31 

32 

33class AuthMiddleware(BaseHTTPMiddleware): 

34 """ 

35 MSA 환경에서 Kong Gateway와 연동되는 인증 미들웨어 

36 

37 Features: 

38 - 서비스 타입별 자동 인증 방식 선택 (IAM vs NON_IAM) 

39 - Kong Gateway 헤더 기반 인증 지원 

40 - 공개 경로 자동 제외 

41 - Request.state에 사용자 정보 주입 

42 """ 

43 

44 def __init__(self, app: ASGIApp, service_config: ServiceConfig): 

45 super().__init__(app) 

46 self.service_config = service_config 

47 self.public_paths = self._prepare_public_paths() 

48 # User Cache (Hybrid: Redis + In-Memory) 

49 self.user_cache = get_user_cache() 

50 # Test bypass flag (only active in non-production) 

51 self.auth_bypass = self._check_auth_bypass() 

52 

53 def _check_auth_bypass(self) -> bool: 

54 """Check if authentication bypass is enabled for testing""" 

55 bypass_enabled = os.getenv("MYSINGLE_AUTH_BYPASS", "false").lower() == "true" 

56 env = os.getenv("ENVIRONMENT", "development").lower() 

57 

58 if bypass_enabled and env == "production": 

59 logger.warning( 

60 "⚠️ MYSINGLE_AUTH_BYPASS is set in production environment - IGNORING for security" 

61 ) 

62 return False 

63 

64 if bypass_enabled: 

65 logger.info( 

66 f"🧪 Authentication bypass enabled for testing (Environment: {env})" 

67 ) 

68 

69 return bypass_enabled 

70 

71 def _prepare_public_paths(self) -> list[str]: 

72 """공개 경로 목록 준비""" 

73 default_public_paths = [ 

74 "/health", 

75 "/metrics", 

76 "/docs", 

77 "/redoc", 

78 "/openapi.json", 

79 "/favicon.ico", 

80 ] 

81 

82 # 서비스별 공개 경로 추가 

83 service_public_paths = self.service_config.public_paths or [] 

84 

85 # IAM 서비스는 인증 관련 경로도 공개 

86 if self.service_config.service_type == ServiceType.IAM_SERVICE: 

87 auth_public_paths = [ 

88 "/api/v1/auth/login", 

89 "/api/v1/auth/register", 

90 "/api/v1/auth/verify-email", 

91 "/api/v1/auth/reset-password", 

92 "/api/v1/oauth2/google/authorize", 

93 "/api/v1/oauth2/google/callback", 

94 "/api/v1/oauth2/kakao/authorize", 

95 "/api/v1/oauth2/kakao/callback", 

96 "/api/v1/oauth2/naver/authorize", 

97 "/api/v1/oauth2/naver/callback", 

98 ] 

99 default_public_paths.extend(auth_public_paths) 

100 

101 return default_public_paths + service_public_paths 

102 

103 def _is_public_path(self, path: str) -> bool: 

104 """요청 경로가 공개 경로인지 확인""" 

105 return any(path.startswith(public_path) for public_path in self.public_paths) 

106 

107 async def _authenticate_iam_service(self, request: Request) -> Optional[User]: 

108 """IAM 서비스용 직접 JWT 토큰 검증""" 

109 try: 

110 # Authorization 헤더에서 Bearer 토큰 추출 

111 authorization = request.headers.get("Authorization", "") 

112 token: Optional[str] = None 

113 if authorization.startswith("Bearer "): 

114 token = authorization.replace("Bearer ", "").strip() 

115 

116 # Authorization이 없으면 쿠키에서 access_token 검색 (브라우저 호출 대비) 

117 if not token: 

118 try: 

119 token = request.cookies.get("access_token") 

120 except Exception: 

121 token = None 

122 

123 if not token: 

124 return None 

125 

126 # JWT 토큰 직접 검증 

127 try: 

128 from .security.jwt import get_jwt_manager 

129 except ImportError: 

130 logger.warning("JWT security module not available") 

131 return None 

132 

133 jwt_manager = get_jwt_manager() 

134 decoded_token = jwt_manager.decode_token(token) 

135 user_id = decoded_token.get("sub") 

136 if not user_id: 

137 return None 

138 

139 # 캐시 우선 조회 -> 미스 시 DB 조회 후 캐시 저장 

140 user = await self._get_user_with_cache(user_id) 

141 

142 if user and not user.is_active: 

143 logger.warning(f"Inactive user attempted access: {user_id}") 

144 return None 

145 

146 # DB에 사용자 레코드가 없더라도, JWT 클레임으로 최소 사용자 컨텍스트를 구성해 허용 

147 # (게이트웨이에서 이미 서명 검증을 통과했고, 여기서도 검증됨) 

148 if not user: 

149 try: 

150 from beanie import PydanticObjectId 

151 except Exception: 

152 PydanticObjectId = None # 타입 회피 

153 

154 user_obj_id = ( 

155 PydanticObjectId(decoded_token.get("sub")) 

156 if PydanticObjectId 

157 else decoded_token.get("sub") 

158 ) 

159 user = User( 

160 id=user_obj_id, # type: ignore[arg-type] 

161 email=decoded_token.get("email") or "unknown@token.local", 

162 hashed_password="", 

163 is_verified=bool(decoded_token.get("is_verified", False)), 

164 is_active=bool(decoded_token.get("is_active", True)), 

165 is_superuser=bool(decoded_token.get("is_superuser", False)), 

166 ) 

167 

168 # 비활성 토큰은 거부 

169 if not user.is_active: 

170 logger.warning( 

171 f"Inactive user (from token claims) attempted access: {user_id}" 

172 ) 

173 return None 

174 

175 logger.debug( 

176 "Authenticated via JWT claims fallback: %s (ID: %s)", 

177 user.email, 

178 user.id, 

179 ) 

180 

181 # 캐시에도 적재 시도 (최소 컨텍스트) 

182 try: 

183 await self.user_cache.set_user(user) 

184 except Exception as e: 

185 logger.debug(f"Failed to cache user from claims: {e}") 

186 

187 return user 

188 

189 except Exception as e: 

190 logger.debug(f"IAM service authentication failed: {e}") 

191 return None 

192 

193 async def _authenticate_non_iam_service(self, request: Request) -> Optional[User]: 

194 """NON_IAM 서비스용 Kong Gateway 헤더 기반 인증""" 

195 try: 

196 # Kong Gateway에서 전달하는 헤더들 

197 x_user_id = request.headers.get("X-User-Id") 

198 x_user_email = request.headers.get("X-User-Email") 

199 x_user_verified = request.headers.get("X-User-Verified", "false") 

200 x_user_active = request.headers.get("X-User-Active", "false") 

201 x_user_superuser = request.headers.get("X-User-Superuser", "false") 

202 

203 if not x_user_id: 

204 logger.debug("No X-User-Id header found in request") 

205 return None 

206 

207 # 캐시에 사용자 정보가 있으면 우선 사용 (게이트웨이 경로에서도 재사용) 

208 cached_user = await self.user_cache.get_user(str(x_user_id)) 

209 if cached_user: 

210 if not cached_user.is_active: 

211 logger.warning( 

212 f"Inactive user from cache via gateway headers: {x_user_id}" 

213 ) 

214 return None 

215 logger.debug( 

216 f"User authenticated via cache (gateway): {cached_user.email} (ID: {cached_user.id})" 

217 ) 

218 return cached_user 

219 

220 # Gateway 헤더로부터 User 객체 구성 

221 try: 

222 from beanie import PydanticObjectId 

223 except ImportError: 

224 logger.warning("Beanie not available for user ID conversion") 

225 return None 

226 

227 # 헤더 값 검증 및 변환 

228 try: 

229 user_object_id = PydanticObjectId(x_user_id) 

230 except Exception as e: 

231 logger.warning( 

232 f"Invalid user ID format in X-User-Id header: {x_user_id} ({e})" 

233 ) 

234 return None 

235 

236 # User 객체 생성 (Gateway에서 이미 검증된 정보) 

237 user = User( 

238 id=user_object_id, 

239 email=x_user_email or "unknown@gateway.local", 

240 hashed_password="", # Gateway 인증에서는 불필요 

241 is_verified=x_user_verified.lower() == "true", 

242 is_active=x_user_active.lower() == "true", 

243 is_superuser=x_user_superuser.lower() == "true", 

244 ) 

245 

246 # 활성 사용자만 허용 

247 if not user.is_active: 

248 logger.warning(f"Inactive user from gateway headers: {user_object_id}") 

249 return None 

250 

251 logger.debug( 

252 f"User authenticated via gateway headers: {user.email} (ID: {user.id})" 

253 ) 

254 

255 # 게이트웨이 기반 사용자도 단기 캐시 (TTL 기본값) 

256 try: 

257 await self.user_cache.set_user(user) 

258 except Exception as e: 

259 logger.debug(f"Failed to set user in cache (gateway): {e}") 

260 return user 

261 

262 except Exception as e: 

263 logger.debug(f"NON_IAM service authentication failed: {e}") 

264 return None 

265 

266 async def _authenticate_user(self, request: Request) -> Optional[User]: 

267 """서비스 타입에 따른 인증 수행""" 

268 if self.service_config.service_type == ServiceType.IAM_SERVICE: 

269 # IAM 서비스: 직접 JWT 검증 우선 

270 user = await self._authenticate_iam_service(request) 

271 if user: 

272 logger.debug(f"IAM service: User authenticated via JWT: {user.email}") 

273 return user 

274 

275 # Fallback: Gateway 헤더 (개발/테스트 환경) 

276 logger.debug("IAM service: Falling back to gateway headers") 

277 return await self._authenticate_non_iam_service(request) 

278 

279 else: 

280 # NON_IAM 서비스: Gateway 헤더 우선 

281 user = await self._authenticate_non_iam_service(request) 

282 if user: 

283 logger.debug( 

284 f"NON_IAM service: User authenticated via gateway: {user.email}" 

285 ) 

286 return user 

287 

288 # Fallback: 직접 토큰 (개발 환경에서 Gateway 없이 테스트할 때) 

289 logger.debug("NON_IAM service: Falling back to direct JWT validation") 

290 return await self._authenticate_iam_service(request) 

291 

292 async def _get_user_with_cache(self, user_id: str) -> Optional[User]: 

293 """캐시 우선으로 사용자 조회, 미스 시 DB 조회 후 캐시 저장""" 

294 try: 

295 # 1) 캐시 조회 

296 cached = await self.user_cache.get_user(str(user_id)) 

297 if cached: 

298 logger.debug(f"Cache HIT for user_id={user_id}") 

299 return cached 

300 

301 logger.debug(f"Cache MISS for user_id={user_id} - querying DB") 

302 

303 # 2) DB 조회 

304 from beanie import PydanticObjectId 

305 

306 from .user_manager import UserManager 

307 

308 user_manager = UserManager() 

309 user = await user_manager.get(PydanticObjectId(user_id)) 

310 

311 # 3) 캐시에 저장 (성공 시) 

312 if user: 

313 try: 

314 await self.user_cache.set_user(user) 

315 except Exception as e: 

316 logger.debug(f"Failed to set user in cache: {e}") 

317 return user 

318 

319 except Exception as e: 

320 logger.debug(f"_get_user_with_cache error: {e}") 

321 return None 

322 

323 def _create_test_user(self) -> User: 

324 """Create a test user for authentication bypass 

325 

326 Uses environment variables for test user configuration: 

327 - TEST_USER_EMAIL: Test user email (default: test_user@test.com) 

328 - TEST_USER_FULLNAME: Test user full name (default: Test User) 

329 - TEST_ADMIN_EMAIL: Test admin email (for superuser bypass) 

330 """ 

331 # Determine if superuser based on auth bypass mode 

332 # Default to regular test user, use admin if explicitly configured 

333 use_admin = os.getenv("MYSINGLE_AUTH_BYPASS_ADMIN", "false").lower() == "true" 

334 

335 if use_admin: 

336 email = os.getenv("TEST_ADMIN_EMAIL", "test_admin@test.com") 

337 fullname = os.getenv("TEST_ADMIN_FULLNAME", "Test Admin") 

338 is_superuser = True 

339 else: 

340 email = os.getenv("TEST_USER_EMAIL", "test_user@test.com") 

341 fullname = os.getenv("TEST_USER_FULLNAME", "Test User") 

342 is_superuser = False 

343 

344 try: 

345 from beanie import PydanticObjectId 

346 

347 test_id = PydanticObjectId("000000000000000000000001") 

348 except ImportError: 

349 test_id = "000000000000000000000001" # type: ignore 

350 

351 return User( 

352 id=test_id, # type: ignore 

353 email=email, 

354 full_name=fullname, 

355 hashed_password="", 

356 is_verified=True, 

357 is_active=True, 

358 is_superuser=is_superuser, 

359 ) 

360 

361 def _create_error_response(self, error: Exception) -> JSONResponse: 

362 """인증 에러 응답 생성""" 

363 if isinstance(error, UserNotExists): 

364 return JSONResponse( 

365 status_code=401, 

366 content={ 

367 "detail": "Authentication required", 

368 "error_type": "UserNotExists", 

369 "message": "Valid authentication credentials required", 

370 }, 

371 ) 

372 elif isinstance(error, InvalidToken): 

373 # Avoid directly accessing attributes that may not exist on the exception 

374 return JSONResponse( 

375 status_code=401, 

376 content={ 

377 "detail": "Invalid authentication token", 

378 "error_type": "InvalidToken", 

379 "message": getattr(error, "reason", "Token validation failed"), 

380 }, 

381 ) 

382 elif isinstance(error, UserInactive): 

383 return JSONResponse( 

384 status_code=403, 

385 content={ 

386 "detail": "User account is inactive", 

387 "error_type": "UserInactive", 

388 "message": "Account has been deactivated", 

389 }, 

390 ) 

391 elif isinstance(error, AuthorizationFailed): 

392 return JSONResponse( 

393 status_code=403, 

394 content={ 

395 "detail": "Insufficient permissions", 

396 "error_type": "AuthorizationFailed", 

397 "message": str(error), 

398 }, 

399 ) 

400 else: 

401 logger.error(f"Unexpected authentication error: {error}") 

402 return JSONResponse( 

403 status_code=500, 

404 content={ 

405 "detail": "Internal authentication error", 

406 "error_type": "InternalError", 

407 "message": "An unexpected error occurred during authentication", 

408 }, 

409 ) 

410 

411 async def dispatch(self, request: Request, call_next): 

412 """미들웨어 메인 로직 - Request.state.user 주입""" 

413 path = request.url.path 

414 method = request.method 

415 

416 # 공개 경로는 인증 건너뛰기 

417 if self._is_public_path(path): 

418 logger.debug(f"Skipping authentication for public path: {method} {path}") 

419 return await call_next(request) 

420 

421 # 인증이 비활성화된 경우 건너뛰기 

422 if not self.service_config.enable_auth: 

423 logger.debug( 

424 f"Authentication disabled for service: {self.service_config.service_name}" 

425 ) 

426 return await call_next(request) 

427 

428 # 테스트 환경 인증 우회 

429 if self.auth_bypass: 

430 logger.debug(f"🧪 Auth bypass: injecting test user for {method} {path}") 

431 test_user = self._create_test_user() 

432 request.state.user = test_user 

433 request.state.authenticated = True 

434 request.state.service_type = self.service_config.service_type 

435 return await call_next(request) 

436 

437 try: 

438 # 사용자 인증 수행 

439 user = await self._authenticate_user(request) 

440 

441 if user: 

442 # 이중 활성화 상태 확인 (인증 과정에서도 확인하지만 보안을 위해 재확인) 

443 if not user.is_active: 

444 logger.warning( 

445 f"Inactive user blocked: {user.id} at {method} {path}" 

446 ) 

447 raise UserInactive(user_id=str(user.id)) 

448 

449 # Request.state에 사용자 정보 저장 (deps_new.py와 호환) 

450 request.state.user = user 

451 request.state.authenticated = True 

452 request.state.service_type = self.service_config.service_type 

453 

454 logger.debug( 

455 f"✅ User authenticated: {user.email} " 

456 f"(ID: {user.id}, Verified: {user.is_verified}, " 

457 f"Superuser: {user.is_superuser}) for {method} {path}" 

458 ) 

459 else: 

460 # 인증 필요한 경로에서 사용자 정보 없음 

461 logger.warning( 

462 f"❌ Authentication required for protected endpoint: {method} {path}" 

463 ) 

464 raise UserNotExists( 

465 identifier="user", identifier_type="authenticated user" 

466 ) 

467 

468 except (UserNotExists, InvalidToken, UserInactive, AuthorizationFailed) as e: 

469 logger.warning( 

470 f"🔒 Authentication failed for {method} {path}: {type(e).__name__} - {e}" 

471 ) 

472 return self._create_error_response(e) 

473 

474 except Exception as e: 

475 logger.error( 

476 f"💥 Unexpected authentication error for {method} {path}: {e}", 

477 exc_info=True, 

478 ) 

479 return self._create_error_response(e) 

480 

481 # 다음 미들웨어/핸들러 호출 

482 response = await call_next(request) 

483 

484 # 응답 헤더에 사용자 정보 추가 (디버깅용, 프로덕션에서는 제거 권장) 

485 if hasattr(request.state, "user") and request.state.user: 

486 response.headers["X-Authenticated-User"] = str(request.state.user.id) 

487 

488 return response