Coverage for src / mysingle / auth / deps / core.py: 0%
65 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1from typing import Any, Dict, Optional
3from fastapi import HTTPException, Request, status
5from ...core.logging import get_structured_logger
6from ..models import User
7from .kong import (
8 get_kong_correlation_id,
9 get_kong_request_id,
10 get_kong_user_id,
11)
13logger = get_structured_logger(__name__)
16def get_current_user(request: Request) -> User:
17 """
18 현재 인증된 사용자 반환 (Kong Gateway + AuthMiddleware 통합)
19 """
20 user: Optional[User] = getattr(request.state, "user", None)
22 if not user:
23 logger.warning("No user found in request.state - authentication failed")
24 # 인증 실패는 서버 에러(500)가 아닌 401을 반환해야 함
25 raise HTTPException(
26 status_code=status.HTTP_401_UNAUTHORIZED,
27 detail="Not authenticated",
28 )
30 if not isinstance(user, User):
31 logger.error(f"Invalid user type in request.state: {type(user)}")
32 raise HTTPException(
33 status_code=status.HTTP_401_UNAUTHORIZED,
34 detail="Invalid authentication context",
35 )
37 # Kong Gateway 보안 검증 (헤더가 있으면 교차 확인)
38 # Kong의 pre-function 플러그인이 JWT의 sub 클레임을 X-User-Id로 추출하여 전달
39 kong_user_id = get_kong_user_id(request)
40 if kong_user_id:
41 # ObjectId 형식인지 확인 (24자 hex 문자열)
42 is_object_id = len(kong_user_id) == 24 and all(
43 c in "0123456789abcdef" for c in kong_user_id.lower()
44 )
46 if is_object_id:
47 # Kong에서 추출한 사용자 ID와 JWT 토큰에서 추출한 사용자 ID 일치 확인
48 if str(user.id) != kong_user_id:
49 logger.error(f"User ID mismatch: Kong={kong_user_id}, User={user.id}")
50 # 보안상 불일치는 인증 실패로 간주하여 401 반환
51 raise HTTPException(
52 status_code=status.HTTP_401_UNAUTHORIZED,
53 detail="Authentication user mismatch",
54 )
55 logger.debug(f"Kong user ID validation passed: {kong_user_id}")
56 else:
57 # Legacy: Consumer 이름인 경우 (구 Kong 설정 호환성)
58 logger.debug(
59 f"Kong X-User-Id is Consumer name (legacy): {kong_user_id}, authenticated user: {user.id}"
60 )
62 return user
65def get_current_active_user(request: Request) -> User:
66 """활성 사용자 (is_active) 보장"""
67 user = get_current_user(request)
68 if not user.is_active:
69 logger.warning(f"Inactive user attempted access: {user.id}")
70 # 비활성 사용자는 인가 실패로 403 반환
71 raise HTTPException(
72 status_code=status.HTTP_403_FORBIDDEN,
73 detail="User is inactive",
74 )
75 return user
78def get_current_active_verified_user(request: Request) -> User:
79 """활성 + 이메일 검증 사용자 보장"""
80 user = get_current_active_user(request)
81 if not user.is_verified:
82 logger.warning(f"Unverified user attempted access: {user.id}")
83 raise HTTPException(
84 status_code=status.HTTP_403_FORBIDDEN,
85 detail="Email verification required",
86 )
87 return user
90def get_current_active_superuser(request: Request) -> User:
91 """슈퍼유저 보장"""
92 user = get_current_active_verified_user(request)
93 if not user.is_superuser:
94 logger.warning(f"Non-superuser attempted admin access: {user.id}")
95 raise HTTPException(
96 status_code=status.HTTP_403_FORBIDDEN,
97 detail="Superuser privileges required",
98 )
99 return user
102def get_current_user_optional(request: Request) -> Optional[User]:
103 """선택적 인증: 없으면 None (타입 보장)"""
104 user = getattr(request.state, "user", None)
105 return user if isinstance(user, User) else None
108def is_user_authenticated(request: Request) -> bool:
109 """사용자 인증 여부"""
110 return isinstance(getattr(request.state, "user", None), User)
113def get_user_id(request: Request) -> Optional[str]:
114 """사용자 ID 반환"""
115 user = getattr(request.state, "user", None)
116 return str(user.id) if user else None
119def get_user_email(request: Request) -> Optional[str]:
120 """사용자 이메일 반환"""
121 user = get_current_user_optional(request)
122 return user.email if user else None
125def get_user_display_name(request: Request) -> Optional[str]:
126 """표시 이름 반환: full_name → email 앞부분 → id prefix"""
127 user: Optional[User] = getattr(request.state, "user", None)
128 if not user or not isinstance(user, User):
129 return None
131 if hasattr(user, "full_name") and user.full_name:
132 return str(user.full_name)
133 elif user.email:
134 return str(user.email).split("@")[0]
135 else:
136 return f"User {str(user.id)[:8]}"
139def get_request_security_context(request: Request) -> Dict[str, Any]:
140 """요청 보안 컨텍스트 반환 (Kong 트레이싱 일부 포함)"""
141 user = get_current_user_optional(request)
142 return {
143 "authenticated": user is not None,
144 "user_id": str(user.id) if user else None,
145 "user_email": user.email if user else None,
146 "is_active": user.is_active if user else False,
147 "is_verified": user.is_verified if user else False,
148 "is_superuser": user.is_superuser if user else False,
149 "client_ip": request.client.host if request.client else None,
150 "user_agent": request.headers.get("user-agent"),
151 "endpoint": f"{request.method} {request.url.path}",
152 "correlation_id": get_kong_correlation_id(request),
153 "request_id": get_kong_request_id(request),
154 }