Coverage for src / mysingle / auth / middleware.py: 0%
221 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1"""
2Authentication Middleware v2 - Request-Based Authentication with Kong Gateway Integration
4새로운 Request 기반 인증 시스템을 위한 리팩토링된 미들웨어입니다.
5기존 gateway_deps 의존성을 제거하고 내장 인증 로직으로 대체했습니다.
7Features:
8- Request.state.user 직접 주입 (deps_new.py와 완전 호환)
9- 서비스 타입별 자동 인증 방식 선택 (IAM vs NON_IAM)
10- Kong Gateway 헤더 기반 인증 지원
11- 공개 경로 자동 제외
12- 테스트 환경 인증 우회 지원 (MYSINGLE_AUTH_BYPASS=true)
13- 높은 성능 및 에러 처리
14"""
16import os
17from typing import Optional
19from fastapi import Request
20from fastapi.responses import JSONResponse
21from starlette.middleware.base import BaseHTTPMiddleware
22from starlette.types import ASGIApp
24from ..core.logging import get_structured_logger
25from ..core.service_types import ServiceConfig, ServiceType
26from .cache import get_user_cache
27from .exceptions import AuthorizationFailed, InvalidToken, UserInactive, UserNotExists
28from .models import User
30logger = get_structured_logger(__name__)
33class AuthMiddleware(BaseHTTPMiddleware):
34 """
35 MSA 환경에서 Kong Gateway와 연동되는 인증 미들웨어
37 Features:
38 - 서비스 타입별 자동 인증 방식 선택 (IAM vs NON_IAM)
39 - Kong Gateway 헤더 기반 인증 지원
40 - 공개 경로 자동 제외
41 - Request.state에 사용자 정보 주입
42 """
44 def __init__(self, app: ASGIApp, service_config: ServiceConfig):
45 super().__init__(app)
46 self.service_config = service_config
47 self.public_paths = self._prepare_public_paths()
48 # User Cache (Hybrid: Redis + In-Memory)
49 self.user_cache = get_user_cache()
50 # Test bypass flag (only active in non-production)
51 self.auth_bypass = self._check_auth_bypass()
53 def _check_auth_bypass(self) -> bool:
54 """Check if authentication bypass is enabled for testing"""
55 bypass_enabled = os.getenv("MYSINGLE_AUTH_BYPASS", "false").lower() == "true"
56 env = os.getenv("ENVIRONMENT", "development").lower()
58 if bypass_enabled and env == "production":
59 logger.warning(
60 "⚠️ MYSINGLE_AUTH_BYPASS is set in production environment - IGNORING for security"
61 )
62 return False
64 if bypass_enabled:
65 logger.info(
66 f"🧪 Authentication bypass enabled for testing (Environment: {env})"
67 )
69 return bypass_enabled
71 def _prepare_public_paths(self) -> list[str]:
72 """공개 경로 목록 준비"""
73 default_public_paths = [
74 "/health",
75 "/metrics",
76 "/docs",
77 "/redoc",
78 "/openapi.json",
79 "/favicon.ico",
80 ]
82 # 서비스별 공개 경로 추가
83 service_public_paths = self.service_config.public_paths or []
85 # IAM 서비스는 인증 관련 경로도 공개
86 if self.service_config.service_type == ServiceType.IAM_SERVICE:
87 auth_public_paths = [
88 "/api/v1/auth/login",
89 "/api/v1/auth/register",
90 "/api/v1/auth/verify-email",
91 "/api/v1/auth/reset-password",
92 "/api/v1/oauth2/google/authorize",
93 "/api/v1/oauth2/google/callback",
94 "/api/v1/oauth2/kakao/authorize",
95 "/api/v1/oauth2/kakao/callback",
96 "/api/v1/oauth2/naver/authorize",
97 "/api/v1/oauth2/naver/callback",
98 ]
99 default_public_paths.extend(auth_public_paths)
101 return default_public_paths + service_public_paths
103 def _is_public_path(self, path: str) -> bool:
104 """요청 경로가 공개 경로인지 확인"""
105 return any(path.startswith(public_path) for public_path in self.public_paths)
107 async def _authenticate_iam_service(self, request: Request) -> Optional[User]:
108 """IAM 서비스용 직접 JWT 토큰 검증"""
109 try:
110 # Authorization 헤더에서 Bearer 토큰 추출
111 authorization = request.headers.get("Authorization", "")
112 token: Optional[str] = None
113 if authorization.startswith("Bearer "):
114 token = authorization.replace("Bearer ", "").strip()
116 # Authorization이 없으면 쿠키에서 access_token 검색 (브라우저 호출 대비)
117 if not token:
118 try:
119 token = request.cookies.get("access_token")
120 except Exception:
121 token = None
123 if not token:
124 return None
126 # JWT 토큰 직접 검증
127 try:
128 from .security.jwt import get_jwt_manager
129 except ImportError:
130 logger.warning("JWT security module not available")
131 return None
133 jwt_manager = get_jwt_manager()
134 decoded_token = jwt_manager.decode_token(token)
135 user_id = decoded_token.get("sub")
136 if not user_id:
137 return None
139 # 캐시 우선 조회 -> 미스 시 DB 조회 후 캐시 저장
140 user = await self._get_user_with_cache(user_id)
142 if user and not user.is_active:
143 logger.warning(f"Inactive user attempted access: {user_id}")
144 return None
146 # DB에 사용자 레코드가 없더라도, JWT 클레임으로 최소 사용자 컨텍스트를 구성해 허용
147 # (게이트웨이에서 이미 서명 검증을 통과했고, 여기서도 검증됨)
148 if not user:
149 try:
150 from beanie import PydanticObjectId
151 except Exception:
152 PydanticObjectId = None # 타입 회피
154 user_obj_id = (
155 PydanticObjectId(decoded_token.get("sub"))
156 if PydanticObjectId
157 else decoded_token.get("sub")
158 )
159 user = User(
160 id=user_obj_id, # type: ignore[arg-type]
161 email=decoded_token.get("email") or "unknown@token.local",
162 hashed_password="",
163 is_verified=bool(decoded_token.get("is_verified", False)),
164 is_active=bool(decoded_token.get("is_active", True)),
165 is_superuser=bool(decoded_token.get("is_superuser", False)),
166 )
168 # 비활성 토큰은 거부
169 if not user.is_active:
170 logger.warning(
171 f"Inactive user (from token claims) attempted access: {user_id}"
172 )
173 return None
175 logger.debug(
176 "Authenticated via JWT claims fallback: %s (ID: %s)",
177 user.email,
178 user.id,
179 )
181 # 캐시에도 적재 시도 (최소 컨텍스트)
182 try:
183 await self.user_cache.set_user(user)
184 except Exception as e:
185 logger.debug(f"Failed to cache user from claims: {e}")
187 return user
189 except Exception as e:
190 logger.debug(f"IAM service authentication failed: {e}")
191 return None
193 async def _authenticate_non_iam_service(self, request: Request) -> Optional[User]:
194 """NON_IAM 서비스용 Kong Gateway 헤더 기반 인증"""
195 try:
196 # Kong Gateway에서 전달하는 헤더들
197 x_user_id = request.headers.get("X-User-Id")
198 x_user_email = request.headers.get("X-User-Email")
199 x_user_verified = request.headers.get("X-User-Verified", "false")
200 x_user_active = request.headers.get("X-User-Active", "false")
201 x_user_superuser = request.headers.get("X-User-Superuser", "false")
203 if not x_user_id:
204 logger.debug("No X-User-Id header found in request")
205 return None
207 # 캐시에 사용자 정보가 있으면 우선 사용 (게이트웨이 경로에서도 재사용)
208 cached_user = await self.user_cache.get_user(str(x_user_id))
209 if cached_user:
210 if not cached_user.is_active:
211 logger.warning(
212 f"Inactive user from cache via gateway headers: {x_user_id}"
213 )
214 return None
215 logger.debug(
216 f"User authenticated via cache (gateway): {cached_user.email} (ID: {cached_user.id})"
217 )
218 return cached_user
220 # Gateway 헤더로부터 User 객체 구성
221 try:
222 from beanie import PydanticObjectId
223 except ImportError:
224 logger.warning("Beanie not available for user ID conversion")
225 return None
227 # 헤더 값 검증 및 변환
228 try:
229 user_object_id = PydanticObjectId(x_user_id)
230 except Exception as e:
231 logger.warning(
232 f"Invalid user ID format in X-User-Id header: {x_user_id} ({e})"
233 )
234 return None
236 # User 객체 생성 (Gateway에서 이미 검증된 정보)
237 user = User(
238 id=user_object_id,
239 email=x_user_email or "unknown@gateway.local",
240 hashed_password="", # Gateway 인증에서는 불필요
241 is_verified=x_user_verified.lower() == "true",
242 is_active=x_user_active.lower() == "true",
243 is_superuser=x_user_superuser.lower() == "true",
244 )
246 # 활성 사용자만 허용
247 if not user.is_active:
248 logger.warning(f"Inactive user from gateway headers: {user_object_id}")
249 return None
251 logger.debug(
252 f"User authenticated via gateway headers: {user.email} (ID: {user.id})"
253 )
255 # 게이트웨이 기반 사용자도 단기 캐시 (TTL 기본값)
256 try:
257 await self.user_cache.set_user(user)
258 except Exception as e:
259 logger.debug(f"Failed to set user in cache (gateway): {e}")
260 return user
262 except Exception as e:
263 logger.debug(f"NON_IAM service authentication failed: {e}")
264 return None
266 async def _authenticate_user(self, request: Request) -> Optional[User]:
267 """서비스 타입에 따른 인증 수행"""
268 if self.service_config.service_type == ServiceType.IAM_SERVICE:
269 # IAM 서비스: 직접 JWT 검증 우선
270 user = await self._authenticate_iam_service(request)
271 if user:
272 logger.debug(f"IAM service: User authenticated via JWT: {user.email}")
273 return user
275 # Fallback: Gateway 헤더 (개발/테스트 환경)
276 logger.debug("IAM service: Falling back to gateway headers")
277 return await self._authenticate_non_iam_service(request)
279 else:
280 # NON_IAM 서비스: Gateway 헤더 우선
281 user = await self._authenticate_non_iam_service(request)
282 if user:
283 logger.debug(
284 f"NON_IAM service: User authenticated via gateway: {user.email}"
285 )
286 return user
288 # Fallback: 직접 토큰 (개발 환경에서 Gateway 없이 테스트할 때)
289 logger.debug("NON_IAM service: Falling back to direct JWT validation")
290 return await self._authenticate_iam_service(request)
292 async def _get_user_with_cache(self, user_id: str) -> Optional[User]:
293 """캐시 우선으로 사용자 조회, 미스 시 DB 조회 후 캐시 저장"""
294 try:
295 # 1) 캐시 조회
296 cached = await self.user_cache.get_user(str(user_id))
297 if cached:
298 logger.debug(f"Cache HIT for user_id={user_id}")
299 return cached
301 logger.debug(f"Cache MISS for user_id={user_id} - querying DB")
303 # 2) DB 조회
304 from beanie import PydanticObjectId
306 from .user_manager import UserManager
308 user_manager = UserManager()
309 user = await user_manager.get(PydanticObjectId(user_id))
311 # 3) 캐시에 저장 (성공 시)
312 if user:
313 try:
314 await self.user_cache.set_user(user)
315 except Exception as e:
316 logger.debug(f"Failed to set user in cache: {e}")
317 return user
319 except Exception as e:
320 logger.debug(f"_get_user_with_cache error: {e}")
321 return None
323 def _create_test_user(self) -> User:
324 """Create a test user for authentication bypass
326 Uses environment variables for test user configuration:
327 - TEST_USER_EMAIL: Test user email (default: test_user@test.com)
328 - TEST_USER_FULLNAME: Test user full name (default: Test User)
329 - TEST_ADMIN_EMAIL: Test admin email (for superuser bypass)
330 """
331 # Determine if superuser based on auth bypass mode
332 # Default to regular test user, use admin if explicitly configured
333 use_admin = os.getenv("MYSINGLE_AUTH_BYPASS_ADMIN", "false").lower() == "true"
335 if use_admin:
336 email = os.getenv("TEST_ADMIN_EMAIL", "test_admin@test.com")
337 fullname = os.getenv("TEST_ADMIN_FULLNAME", "Test Admin")
338 is_superuser = True
339 else:
340 email = os.getenv("TEST_USER_EMAIL", "test_user@test.com")
341 fullname = os.getenv("TEST_USER_FULLNAME", "Test User")
342 is_superuser = False
344 try:
345 from beanie import PydanticObjectId
347 test_id = PydanticObjectId("000000000000000000000001")
348 except ImportError:
349 test_id = "000000000000000000000001" # type: ignore
351 return User(
352 id=test_id, # type: ignore
353 email=email,
354 full_name=fullname,
355 hashed_password="",
356 is_verified=True,
357 is_active=True,
358 is_superuser=is_superuser,
359 )
361 def _create_error_response(self, error: Exception) -> JSONResponse:
362 """인증 에러 응답 생성"""
363 if isinstance(error, UserNotExists):
364 return JSONResponse(
365 status_code=401,
366 content={
367 "detail": "Authentication required",
368 "error_type": "UserNotExists",
369 "message": "Valid authentication credentials required",
370 },
371 )
372 elif isinstance(error, InvalidToken):
373 # Avoid directly accessing attributes that may not exist on the exception
374 return JSONResponse(
375 status_code=401,
376 content={
377 "detail": "Invalid authentication token",
378 "error_type": "InvalidToken",
379 "message": getattr(error, "reason", "Token validation failed"),
380 },
381 )
382 elif isinstance(error, UserInactive):
383 return JSONResponse(
384 status_code=403,
385 content={
386 "detail": "User account is inactive",
387 "error_type": "UserInactive",
388 "message": "Account has been deactivated",
389 },
390 )
391 elif isinstance(error, AuthorizationFailed):
392 return JSONResponse(
393 status_code=403,
394 content={
395 "detail": "Insufficient permissions",
396 "error_type": "AuthorizationFailed",
397 "message": str(error),
398 },
399 )
400 else:
401 logger.error(f"Unexpected authentication error: {error}")
402 return JSONResponse(
403 status_code=500,
404 content={
405 "detail": "Internal authentication error",
406 "error_type": "InternalError",
407 "message": "An unexpected error occurred during authentication",
408 },
409 )
411 async def dispatch(self, request: Request, call_next):
412 """미들웨어 메인 로직 - Request.state.user 주입"""
413 path = request.url.path
414 method = request.method
416 # 공개 경로는 인증 건너뛰기
417 if self._is_public_path(path):
418 logger.debug(f"Skipping authentication for public path: {method} {path}")
419 return await call_next(request)
421 # 인증이 비활성화된 경우 건너뛰기
422 if not self.service_config.enable_auth:
423 logger.debug(
424 f"Authentication disabled for service: {self.service_config.service_name}"
425 )
426 return await call_next(request)
428 # 테스트 환경 인증 우회
429 if self.auth_bypass:
430 logger.debug(f"🧪 Auth bypass: injecting test user for {method} {path}")
431 test_user = self._create_test_user()
432 request.state.user = test_user
433 request.state.authenticated = True
434 request.state.service_type = self.service_config.service_type
435 return await call_next(request)
437 try:
438 # 사용자 인증 수행
439 user = await self._authenticate_user(request)
441 if user:
442 # 이중 활성화 상태 확인 (인증 과정에서도 확인하지만 보안을 위해 재확인)
443 if not user.is_active:
444 logger.warning(
445 f"Inactive user blocked: {user.id} at {method} {path}"
446 )
447 raise UserInactive(user_id=str(user.id))
449 # Request.state에 사용자 정보 저장 (deps_new.py와 호환)
450 request.state.user = user
451 request.state.authenticated = True
452 request.state.service_type = self.service_config.service_type
454 logger.debug(
455 f"✅ User authenticated: {user.email} "
456 f"(ID: {user.id}, Verified: {user.is_verified}, "
457 f"Superuser: {user.is_superuser}) for {method} {path}"
458 )
459 else:
460 # 인증 필요한 경로에서 사용자 정보 없음
461 logger.warning(
462 f"❌ Authentication required for protected endpoint: {method} {path}"
463 )
464 raise UserNotExists(
465 identifier="user", identifier_type="authenticated user"
466 )
468 except (UserNotExists, InvalidToken, UserInactive, AuthorizationFailed) as e:
469 logger.warning(
470 f"🔒 Authentication failed for {method} {path}: {type(e).__name__} - {e}"
471 )
472 return self._create_error_response(e)
474 except Exception as e:
475 logger.error(
476 f"💥 Unexpected authentication error for {method} {path}: {e}",
477 exc_info=True,
478 )
479 return self._create_error_response(e)
481 # 다음 미들웨어/핸들러 호출
482 response = await call_next(request)
484 # 응답 헤더에 사용자 정보 추가 (디버깅용, 프로덕션에서는 제거 권장)
485 if hasattr(request.state, "user") and request.state.user:
486 response.headers["X-Authenticated-User"] = str(request.state.user.id)
488 return response