Coverage for src / mysingle / auth / deps / core.py: 0%

65 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 00:58 +0900

1from typing import Any, Dict, Optional 

2 

3from fastapi import HTTPException, Request, status 

4 

5from ...core.logging import get_structured_logger 

6from ..models import User 

7from .kong import ( 

8 get_kong_correlation_id, 

9 get_kong_request_id, 

10 get_kong_user_id, 

11) 

12 

13logger = get_structured_logger(__name__) 

14 

15 

16def get_current_user(request: Request) -> User: 

17 """ 

18 현재 인증된 사용자 반환 (Kong Gateway + AuthMiddleware 통합) 

19 """ 

20 user: Optional[User] = getattr(request.state, "user", None) 

21 

22 if not user: 

23 logger.warning("No user found in request.state - authentication failed") 

24 # 인증 실패는 서버 에러(500)가 아닌 401을 반환해야 함 

25 raise HTTPException( 

26 status_code=status.HTTP_401_UNAUTHORIZED, 

27 detail="Not authenticated", 

28 ) 

29 

30 if not isinstance(user, User): 

31 logger.error(f"Invalid user type in request.state: {type(user)}") 

32 raise HTTPException( 

33 status_code=status.HTTP_401_UNAUTHORIZED, 

34 detail="Invalid authentication context", 

35 ) 

36 

37 # Kong Gateway 보안 검증 (헤더가 있으면 교차 확인) 

38 # Kong의 pre-function 플러그인이 JWT의 sub 클레임을 X-User-Id로 추출하여 전달 

39 kong_user_id = get_kong_user_id(request) 

40 if kong_user_id: 

41 # ObjectId 형식인지 확인 (24자 hex 문자열) 

42 is_object_id = len(kong_user_id) == 24 and all( 

43 c in "0123456789abcdef" for c in kong_user_id.lower() 

44 ) 

45 

46 if is_object_id: 

47 # Kong에서 추출한 사용자 ID와 JWT 토큰에서 추출한 사용자 ID 일치 확인 

48 if str(user.id) != kong_user_id: 

49 logger.error(f"User ID mismatch: Kong={kong_user_id}, User={user.id}") 

50 # 보안상 불일치는 인증 실패로 간주하여 401 반환 

51 raise HTTPException( 

52 status_code=status.HTTP_401_UNAUTHORIZED, 

53 detail="Authentication user mismatch", 

54 ) 

55 logger.debug(f"Kong user ID validation passed: {kong_user_id}") 

56 else: 

57 # Legacy: Consumer 이름인 경우 (구 Kong 설정 호환성) 

58 logger.debug( 

59 f"Kong X-User-Id is Consumer name (legacy): {kong_user_id}, authenticated user: {user.id}" 

60 ) 

61 

62 return user 

63 

64 

65def get_current_active_user(request: Request) -> User: 

66 """활성 사용자 (is_active) 보장""" 

67 user = get_current_user(request) 

68 if not user.is_active: 

69 logger.warning(f"Inactive user attempted access: {user.id}") 

70 # 비활성 사용자는 인가 실패로 403 반환 

71 raise HTTPException( 

72 status_code=status.HTTP_403_FORBIDDEN, 

73 detail="User is inactive", 

74 ) 

75 return user 

76 

77 

78def get_current_active_verified_user(request: Request) -> User: 

79 """활성 + 이메일 검증 사용자 보장""" 

80 user = get_current_active_user(request) 

81 if not user.is_verified: 

82 logger.warning(f"Unverified user attempted access: {user.id}") 

83 raise HTTPException( 

84 status_code=status.HTTP_403_FORBIDDEN, 

85 detail="Email verification required", 

86 ) 

87 return user 

88 

89 

90def get_current_active_superuser(request: Request) -> User: 

91 """슈퍼유저 보장""" 

92 user = get_current_active_verified_user(request) 

93 if not user.is_superuser: 

94 logger.warning(f"Non-superuser attempted admin access: {user.id}") 

95 raise HTTPException( 

96 status_code=status.HTTP_403_FORBIDDEN, 

97 detail="Superuser privileges required", 

98 ) 

99 return user 

100 

101 

102def get_current_user_optional(request: Request) -> Optional[User]: 

103 """선택적 인증: 없으면 None (타입 보장)""" 

104 user = getattr(request.state, "user", None) 

105 return user if isinstance(user, User) else None 

106 

107 

108def is_user_authenticated(request: Request) -> bool: 

109 """사용자 인증 여부""" 

110 return isinstance(getattr(request.state, "user", None), User) 

111 

112 

113def get_user_id(request: Request) -> Optional[str]: 

114 """사용자 ID 반환""" 

115 user = getattr(request.state, "user", None) 

116 return str(user.id) if user else None 

117 

118 

119def get_user_email(request: Request) -> Optional[str]: 

120 """사용자 이메일 반환""" 

121 user = get_current_user_optional(request) 

122 return user.email if user else None 

123 

124 

125def get_user_display_name(request: Request) -> Optional[str]: 

126 """표시 이름 반환: full_name → email 앞부분 → id prefix""" 

127 user: Optional[User] = getattr(request.state, "user", None) 

128 if not user or not isinstance(user, User): 

129 return None 

130 

131 if hasattr(user, "full_name") and user.full_name: 

132 return str(user.full_name) 

133 elif user.email: 

134 return str(user.email).split("@")[0] 

135 else: 

136 return f"User {str(user.id)[:8]}" 

137 

138 

139def get_request_security_context(request: Request) -> Dict[str, Any]: 

140 """요청 보안 컨텍스트 반환 (Kong 트레이싱 일부 포함)""" 

141 user = get_current_user_optional(request) 

142 return { 

143 "authenticated": user is not None, 

144 "user_id": str(user.id) if user else None, 

145 "user_email": user.email if user else None, 

146 "is_active": user.is_active if user else False, 

147 "is_verified": user.is_verified if user else False, 

148 "is_superuser": user.is_superuser if user else False, 

149 "client_ip": request.client.host if request.client else None, 

150 "user_agent": request.headers.get("user-agent"), 

151 "endpoint": f"{request.method} {request.url.path}", 

152 "correlation_id": get_kong_correlation_id(request), 

153 "request_id": get_kong_request_id(request), 

154 }