Coverage for src / mysingle / auth / authenticate.py: 0%

97 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 00:58 +0900

1from typing import Any, Literal, Union 

2 

3from fastapi import HTTPException 

4from fastapi.responses import Response 

5from pydantic import SecretStr 

6 

7from ..core.config import settings 

8from ..core.logging import get_structured_logger 

9from .cache import get_user_cache 

10from .models import User 

11from .schemas.auth import TokenResponse 

12from .security.cookie import delete_cookie, set_auth_cookies 

13from .security.jwt import get_jwt_manager 

14from .user_manager import UserManager 

15 

16logger = get_structured_logger(__name__) 

17SecretType = Union[str, SecretStr] 

18user_manager = UserManager() 

19 

20 

21class Authentication: 

22 def __init__(self) -> None: 

23 self.logger = get_structured_logger(__name__) 

24 self.jwt_manager = get_jwt_manager() 

25 self.transport_type = settings.TOKEN_TRANSPORT_TYPE 

26 

27 def login( 

28 self, 

29 user: User, 

30 response: Response, 

31 ) -> TokenResponse | None: 

32 if user is None: 

33 raise HTTPException(status_code=400, detail="Invalid user") 

34 if not user.is_active: 

35 raise HTTPException(status_code=400, detail="Inactive user") 

36 if not user.is_verified: 

37 raise HTTPException(status_code=400, detail="Unverified user") 

38 

39 # JWT Manager로 토큰 생성 (access / refresh 구분) 

40 access_token = self.jwt_manager.create_user_token( 

41 user_id=str(user.id), 

42 email=user.email, 

43 token_type="access", 

44 is_verified=user.is_verified, 

45 is_superuser=user.is_superuser, 

46 is_active=user.is_active, 

47 audience="quant-users", 

48 ) 

49 refresh_token = self.jwt_manager.create_user_token( 

50 user_id=str(user.id), 

51 email=user.email, 

52 token_type="refresh", 

53 is_verified=user.is_verified, 

54 is_superuser=user.is_superuser, 

55 is_active=user.is_active, 

56 audience="quant-users", 

57 ) 

58 

59 # 캐시 갱신: 로그인 성공 시 최신 사용자 정보 캐시에 저장 (비동기, 실패 무시) 

60 try: 

61 import asyncio 

62 

63 async def _cache_set(): 

64 try: 

65 await get_user_cache().set_user(user) 

66 except Exception: 

67 pass 

68 

69 loop = asyncio.get_event_loop() 

70 if loop.is_running(): 

71 loop.create_task(_cache_set()) 

72 else: 

73 loop.run_until_complete(_cache_set()) 

74 except Exception: 

75 pass 

76 

77 # 토큰 전송 방식에 따른 처리 

78 token_response = TokenResponse( 

79 access_token=access_token, 

80 refresh_token=refresh_token, 

81 token_type="bearer", 

82 ) 

83 

84 if self.transport_type in ["cookie", "hybrid"]: 

85 # 쿠키에 토큰 설정 

86 set_auth_cookies( 

87 response, 

88 access_token=access_token, 

89 refresh_token=refresh_token, 

90 ) 

91 

92 if self.transport_type in ["bearer", "hybrid"]: 

93 # Bearer 방식에서는 토큰 정보 반환 

94 return token_response 

95 

96 # Cookie 전용 방식에서는 None 반환 (토큰은 쿠키에만 설정) 

97 return None 

98 

99 def refresh_token( 

100 self, 

101 refresh_token: str, 

102 response: Response, 

103 transport_type: Literal["cookie", "bearer", "hybrid"] | None = None, 

104 ) -> TokenResponse | None: 

105 """Refresh token을 사용하여 새로운 access token과 refresh token을 생성합니다.""" 

106 try: 

107 payload = self.jwt_manager.decode_token(refresh_token) 

108 except Exception as e: 

109 self.logger.error(f"Failed to decode refresh token: {e}") 

110 raise HTTPException(status_code=401, detail="Invalid refresh token") 

111 

112 user_id = payload.get("sub") 

113 email = payload.get("email", "") 

114 if not user_id: 

115 raise HTTPException(status_code=401, detail="Invalid token payload") 

116 

117 # refresh 토큰 유형/오디언스 검증 

118 token_typ = payload.get("typ") 

119 token_aud = payload.get("aud") 

120 if token_typ != "refresh" or token_aud != "quant-users": 

121 raise HTTPException(status_code=401, detail="Invalid refresh token type") 

122 

123 # 새로운 토큰 생성 

124 access_token = self.jwt_manager.create_user_token( 

125 user_id=user_id, 

126 email=email, 

127 token_type="access", 

128 is_verified=payload.get("is_verified", False), 

129 is_superuser=payload.get("is_superuser", False), 

130 is_active=payload.get("is_active", True), 

131 audience="quant-users", 

132 ) 

133 new_refresh_token = self.jwt_manager.create_user_token( 

134 user_id=user_id, 

135 email=email, 

136 token_type="refresh", 

137 is_verified=payload.get("is_verified", False), 

138 is_superuser=payload.get("is_superuser", False), 

139 is_active=payload.get("is_active", True), 

140 audience="quant-users", 

141 ) 

142 

143 # 캐시 갱신: refresh 토큰 갱신 시에도 사용자 캐시 최신화 (비동기, 실패 무시) 

144 try: 

145 import asyncio 

146 

147 from beanie import PydanticObjectId 

148 

149 async def _refresh_cache(): 

150 try: 

151 user = await user_manager.get(PydanticObjectId(user_id)) 

152 if user: 

153 await get_user_cache().set_user(user) 

154 except Exception: 

155 pass 

156 

157 loop = asyncio.get_event_loop() 

158 if loop.is_running(): 

159 loop.create_task(_refresh_cache()) 

160 else: 

161 loop.run_until_complete(_refresh_cache()) 

162 except Exception: 

163 pass 

164 

165 token_response = TokenResponse( 

166 access_token=access_token, 

167 refresh_token=new_refresh_token, 

168 token_type="bearer", 

169 ) 

170 

171 # 기본 전송 방식은 인스턴스 설정을 따른다 

172 effective_transport = transport_type or self.transport_type 

173 

174 if effective_transport in ["cookie", "hybrid"]: 

175 set_auth_cookies( 

176 response, 

177 access_token=access_token, 

178 refresh_token=new_refresh_token, 

179 ) 

180 

181 if effective_transport in ["bearer", "hybrid"]: 

182 return token_response 

183 

184 return None 

185 

186 def validate_token(self, token: str) -> dict[str, Any]: 

187 """토큰을 검증하고 payload를 반환합니다.""" 

188 try: 

189 return self.jwt_manager.decode_token(token) 

190 except Exception as e: 

191 self.logger.error(f"Failed to validate token: {e}") 

192 raise HTTPException(status_code=401, detail="Invalid token") 

193 

194 def logout(self, response: Response) -> None: 

195 """로그아웃 처리 (쿠키 삭제).""" 

196 delete_cookie(response, key="access_token") 

197 delete_cookie(response, key="refresh_token") 

198 

199 

200authenticator = Authentication()