Coverage for src / mysingle / auth / authenticate.py: 0%
97 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1from typing import Any, Literal, Union
3from fastapi import HTTPException
4from fastapi.responses import Response
5from pydantic import SecretStr
7from ..core.config import settings
8from ..core.logging import get_structured_logger
9from .cache import get_user_cache
10from .models import User
11from .schemas.auth import TokenResponse
12from .security.cookie import delete_cookie, set_auth_cookies
13from .security.jwt import get_jwt_manager
14from .user_manager import UserManager
16logger = get_structured_logger(__name__)
17SecretType = Union[str, SecretStr]
18user_manager = UserManager()
21class Authentication:
22 def __init__(self) -> None:
23 self.logger = get_structured_logger(__name__)
24 self.jwt_manager = get_jwt_manager()
25 self.transport_type = settings.TOKEN_TRANSPORT_TYPE
27 def login(
28 self,
29 user: User,
30 response: Response,
31 ) -> TokenResponse | None:
32 if user is None:
33 raise HTTPException(status_code=400, detail="Invalid user")
34 if not user.is_active:
35 raise HTTPException(status_code=400, detail="Inactive user")
36 if not user.is_verified:
37 raise HTTPException(status_code=400, detail="Unverified user")
39 # JWT Manager로 토큰 생성 (access / refresh 구분)
40 access_token = self.jwt_manager.create_user_token(
41 user_id=str(user.id),
42 email=user.email,
43 token_type="access",
44 is_verified=user.is_verified,
45 is_superuser=user.is_superuser,
46 is_active=user.is_active,
47 audience="quant-users",
48 )
49 refresh_token = self.jwt_manager.create_user_token(
50 user_id=str(user.id),
51 email=user.email,
52 token_type="refresh",
53 is_verified=user.is_verified,
54 is_superuser=user.is_superuser,
55 is_active=user.is_active,
56 audience="quant-users",
57 )
59 # 캐시 갱신: 로그인 성공 시 최신 사용자 정보 캐시에 저장 (비동기, 실패 무시)
60 try:
61 import asyncio
63 async def _cache_set():
64 try:
65 await get_user_cache().set_user(user)
66 except Exception:
67 pass
69 loop = asyncio.get_event_loop()
70 if loop.is_running():
71 loop.create_task(_cache_set())
72 else:
73 loop.run_until_complete(_cache_set())
74 except Exception:
75 pass
77 # 토큰 전송 방식에 따른 처리
78 token_response = TokenResponse(
79 access_token=access_token,
80 refresh_token=refresh_token,
81 token_type="bearer",
82 )
84 if self.transport_type in ["cookie", "hybrid"]:
85 # 쿠키에 토큰 설정
86 set_auth_cookies(
87 response,
88 access_token=access_token,
89 refresh_token=refresh_token,
90 )
92 if self.transport_type in ["bearer", "hybrid"]:
93 # Bearer 방식에서는 토큰 정보 반환
94 return token_response
96 # Cookie 전용 방식에서는 None 반환 (토큰은 쿠키에만 설정)
97 return None
99 def refresh_token(
100 self,
101 refresh_token: str,
102 response: Response,
103 transport_type: Literal["cookie", "bearer", "hybrid"] | None = None,
104 ) -> TokenResponse | None:
105 """Refresh token을 사용하여 새로운 access token과 refresh token을 생성합니다."""
106 try:
107 payload = self.jwt_manager.decode_token(refresh_token)
108 except Exception as e:
109 self.logger.error(f"Failed to decode refresh token: {e}")
110 raise HTTPException(status_code=401, detail="Invalid refresh token")
112 user_id = payload.get("sub")
113 email = payload.get("email", "")
114 if not user_id:
115 raise HTTPException(status_code=401, detail="Invalid token payload")
117 # refresh 토큰 유형/오디언스 검증
118 token_typ = payload.get("typ")
119 token_aud = payload.get("aud")
120 if token_typ != "refresh" or token_aud != "quant-users":
121 raise HTTPException(status_code=401, detail="Invalid refresh token type")
123 # 새로운 토큰 생성
124 access_token = self.jwt_manager.create_user_token(
125 user_id=user_id,
126 email=email,
127 token_type="access",
128 is_verified=payload.get("is_verified", False),
129 is_superuser=payload.get("is_superuser", False),
130 is_active=payload.get("is_active", True),
131 audience="quant-users",
132 )
133 new_refresh_token = self.jwt_manager.create_user_token(
134 user_id=user_id,
135 email=email,
136 token_type="refresh",
137 is_verified=payload.get("is_verified", False),
138 is_superuser=payload.get("is_superuser", False),
139 is_active=payload.get("is_active", True),
140 audience="quant-users",
141 )
143 # 캐시 갱신: refresh 토큰 갱신 시에도 사용자 캐시 최신화 (비동기, 실패 무시)
144 try:
145 import asyncio
147 from beanie import PydanticObjectId
149 async def _refresh_cache():
150 try:
151 user = await user_manager.get(PydanticObjectId(user_id))
152 if user:
153 await get_user_cache().set_user(user)
154 except Exception:
155 pass
157 loop = asyncio.get_event_loop()
158 if loop.is_running():
159 loop.create_task(_refresh_cache())
160 else:
161 loop.run_until_complete(_refresh_cache())
162 except Exception:
163 pass
165 token_response = TokenResponse(
166 access_token=access_token,
167 refresh_token=new_refresh_token,
168 token_type="bearer",
169 )
171 # 기본 전송 방식은 인스턴스 설정을 따른다
172 effective_transport = transport_type or self.transport_type
174 if effective_transport in ["cookie", "hybrid"]:
175 set_auth_cookies(
176 response,
177 access_token=access_token,
178 refresh_token=new_refresh_token,
179 )
181 if effective_transport in ["bearer", "hybrid"]:
182 return token_response
184 return None
186 def validate_token(self, token: str) -> dict[str, Any]:
187 """토큰을 검증하고 payload를 반환합니다."""
188 try:
189 return self.jwt_manager.decode_token(token)
190 except Exception as e:
191 self.logger.error(f"Failed to validate token: {e}")
192 raise HTTPException(status_code=401, detail="Invalid token")
194 def logout(self, response: Response) -> None:
195 """로그아웃 처리 (쿠키 삭제)."""
196 delete_cookie(response, key="access_token")
197 delete_cookie(response, key="refresh_token")
200authenticator = Authentication()