Coverage for src / mysingle / auth / user_manager.py: 0%
357 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1import uuid
2from datetime import datetime, timezone
3from typing import Any, TypeVar
5import jwt
6from beanie import PydanticObjectId
7from fastapi import Request, Response
8from pydantic import BaseModel
10from mysingle.auth.schemas.oauth2 import BaseOAuthToken
12from ..auth.models import OAuthAccount, User
13from ..auth.schemas.user import UserCreate, UserUpdate
14from ..auth.types import DependencyCallable
15from ..core.config import settings
16from ..core.logging import get_structured_logger
17from ..email.email_gen import (
18 generate_new_account_email,
19 generate_reset_password_email,
20 generate_verification_email,
21)
22from ..email.email_sending import send_email
23from .cache import get_user_cache
24from .exceptions import (
25 InvalidID,
26 InvalidResetPasswordToken,
27 InvalidVerifyToken,
28 UserAlreadyExists,
29 UserAlreadyVerified,
30 UserInactive,
31 UserNotExists,
32)
33from .security.jwt import get_jwt_manager
34from .security.password import PasswordHelper, password_helper
36logger = get_structured_logger(__name__)
37jwt_manager = get_jwt_manager()
39# RESET_PASSWORD_TOKEN_AUDIENCE = "users:reset"
40# VERIFY_USER_TOKEN_AUDIENCE = "users:verify"
41SCHEMA = TypeVar("SCHEMA", bound=BaseModel)
44class UserManager:
45 """
46 사용자 관리 로직.
48 :attribute reset_password_token_secret: 비밀번호 재설정 토큰을
49 인코딩하는 데 사용되는 비밀 키.
51 :attribute reset_password_token_audience: 비밀번호 재설정 토큰의 JWT 대상(audience).
52 :attribute verification_token_secret: 인증 토큰을 인코딩하는 데 사용되는 비밀 키.
53 :attribute verification_token_lifetime_seconds: 인증 토큰의 유효 기간.
54 :attribute verification_token_audience: 인증 토큰의 JWT 대상.
56 :param user_db: 데이터베이스 어댑터 인스턴스.
57 """
59 password_helper: PasswordHelper
61 def __init__(
62 self,
63 ):
64 self.password_helper = password_helper
66 async def read_user_from_token(
67 self,
68 token: str | None,
69 token_audience: list[str] = ["quant-users"],
70 ) -> User | None:
71 if token is None:
72 return None
73 try:
74 data = jwt_manager.decode_token(token)
75 user_id = data.get("sub")
76 if user_id is None:
77 return None
78 except jwt.PyJWTError:
79 return None
81 try:
82 user = await self.get(user_id)
83 return user
84 except (UserNotExists, InvalidID):
85 return None
87 @staticmethod
88 def model_dump(model: BaseModel, *args: Any, **kwargs: Any) -> dict[str, Any]:
89 return model.model_dump(*args, **kwargs)
91 @staticmethod
92 def model_validate(
93 schema: type[BaseModel], obj: Any, *args: Any, **kwargs: Any
94 ) -> BaseModel:
95 return schema.model_validate(obj, *args, **kwargs)
97 async def get(self, id: PydanticObjectId) -> User:
98 """
99 ID로 사용자를 조회합니다.
101 :param id: 조회할 사용자의 ID.
102 :raises UserNotExists: 해당 사용자가 존재하지 않습니다.
103 :return: 사용자 객체.
104 """
105 user = await User.get(id)
107 if user is None:
108 raise UserNotExists()
110 return user
112 async def get_by_email(self, user_email: str) -> User:
113 """
114 이메일로 사용자를 조회합니다.
116 :param user_email: 조회할 사용자의 이메일 주소.
117 :raises UserNotExists: 해당 사용자가 존재하지 않습니다.
118 :return: 사용자 객체.
119 """
120 user = await User.find_one({"email": user_email})
122 if user is None:
123 raise UserNotExists()
125 return user
127 async def get_by_oauth_account(self, oauth: str, account_id: str) -> User:
128 """
129 OAuth 계정으로 사용자 가져오기.
131 :param oauth: OAuth 클라이언트 이름.
132 :param account_id: 외부 OAuth 서비스의 계정 ID.
133 :raises UserNotExists: 사용자가 존재하지 않습니다.
134 :return: 사용자 객체.
135 """
136 user = await User.find_one(
137 {
138 "oauth_accounts.oauth_name": oauth,
139 "oauth_accounts.account_id": account_id,
140 }
141 )
143 if user is None:
144 raise UserNotExists()
146 return user
148 def find_oauth_account(
149 self, user: User, oauth_name: str, account_id: str
150 ) -> OAuthAccount | None:
151 """
152 사용자의 특정 OAuth 계정을 찾습니다.
154 :param user: 검색할 사용자.
155 :param oauth_name: OAuth 클라이언트 이름.
156 :param account_id: OAuth 계정 ID.
157 :return: 찾은 OAuth 계정 또는 None.
158 """
159 for oauth_account in user.oauth_accounts:
160 if (
161 oauth_account.oauth_name == oauth_name
162 and oauth_account.account_id == account_id
163 ):
164 return oauth_account
165 return None
167 async def remove_oauth_account(
168 self, user: User, oauth_name: str, account_id: str
169 ) -> User:
170 """
171 사용자에서 OAuth 계정을 제거합니다.
173 :param user: OAuth 계정을 제거할 사용자.
174 :param oauth_name: OAuth 클라이언트 이름.
175 :param account_id: OAuth 계정 ID.
176 :return: 업데이트된 사용자 객체.
177 :raises UserNotExists: OAuth 계정이 존재하지 않습니다.
178 """
179 oauth_account = self.find_oauth_account(user, oauth_name, account_id)
180 if oauth_account is None:
181 raise UserNotExists(
182 identifier=f"{oauth_name}:{account_id}", identifier_type="OAuth account"
183 )
185 user.oauth_accounts.remove(oauth_account)
186 await user.save()
187 return user
189 async def add_oauth_account(
190 self, user: User, oauth_account_dict: dict[str, Any]
191 ) -> User:
192 """
193 사용자에게 OAuth 계정 추가.
195 :param user: OAuth 계정을 추가할 사용자.
196 :param oauth_account_dict: 추가할 OAuth 계정의 세부 정보.
197 :return: 업데이트된 사용자 객체.
198 """
199 oauth_account = OAuthAccount(**oauth_account_dict)
200 user.oauth_accounts.append(oauth_account)
201 await user.save()
202 return user
204 async def update_oauth_account(
205 self,
206 user: User,
207 existing_oauth_account: OAuthAccount,
208 oauth_account_dict: dict[str, Any],
209 ) -> User:
210 """
211 사용자 OAuth 계정 업데이트.
213 :param user: OAuth 계정을 업데이트할 사용자.
214 :param existing_oauth_account: 업데이트할 기존 OAuth 계정.
215 :param oauth_account_dict: 업데이트할 OAuth 계정의 새 세부 정보.
216 :return: 업데이트된 사용자 객체.
217 """
218 existing_oauth_account.access_token = oauth_account_dict.get(
219 "access_token", existing_oauth_account.access_token
220 )
221 existing_oauth_account.expires_at = oauth_account_dict.get(
222 "expires_at", existing_oauth_account.expires_at
223 )
224 existing_oauth_account.refresh_token = oauth_account_dict.get(
225 "refresh_token", existing_oauth_account.refresh_token
226 )
227 # 프로바이더에서 내려준 표시 정보도 가능하면 최신화
228 # Only update if changed
229 if oauth_account_dict.get("avatar_url") is not None and (
230 existing_oauth_account.avatar_url != oauth_account_dict["avatar_url"]
231 ):
232 existing_oauth_account.avatar_url = oauth_account_dict["avatar_url"]
233 if oauth_account_dict.get("name") is not None and (
234 existing_oauth_account.name != oauth_account_dict["name"]
235 ):
236 existing_oauth_account.name = oauth_account_dict["name"]
237 await user.save()
238 return user
240 async def create(
241 self,
242 obj_in: UserCreate,
243 request: Request | None = None,
244 ) -> User:
245 """
246 데이터베이스에 사용자를 생성합니다.
248 성공 시 on_after_register 핸들러를 트리거합니다.
250 :param user_create: 생성할 UserCreate 모델입니다.
251 :param safe: True인 경우 is_superuser 또는 is_verified와 같은 민감한 값이
252 생성 과정에서 무시됩니다. 기본값은 False입니다.
253 :param request: 작업을 트리거한 선택적 FastAPI 요청입니다.
254 기본값은 None입니다.
255 :raises UserAlreadyExists: 동일한 이메일로 이미 사용자가 존재할 경우 발생합니다.
256 :return: 새로 생성된 사용자입니다.
257 """
258 await self.validate_password(obj_in.password, obj_in)
260 existing_user = await User.find_one({"email": obj_in.email})
261 if existing_user is not None:
262 raise UserAlreadyExists()
264 user_dict = User(
265 email=obj_in.email,
266 full_name=obj_in.full_name,
267 hashed_password=self.password_helper.hash(obj_in.password),
268 )
269 # Remove password if exists (keeping for safety)
270 # password = user_dict.pop("password")
272 created_user = await User.create(user_dict)
274 await self.on_after_register(created_user, request)
276 return created_user
278 async def oauth_callback(
279 self,
280 oauth_name: str,
281 token_data: BaseOAuthToken,
282 profile_id: str,
283 profile_email: str,
284 profile_image: str | None = None,
285 fullname: str | None = None,
286 *,
287 request: Request | None = None,
288 associate_by_email: bool = True,
289 ) -> User | None:
290 """
291 OAuth 연결 성공 후 콜백 처리.
293 지정된 사용자에게 이 새로운 OAuth 계정을 추가하거나 기존 OAuth 계정을 업데이트합니다.
295 :param oauth_name: OAuth 클라이언트 이름.
296 :param access_token: 서비스 공급자에 대한 유효한 액세스 토큰.
297 :param account_id: 서비스 공급자 내 사용자의 models.ID.
298 :param account_email: 서비스 공급자 측 사용자의 이메일.
299 :param expires_at: 액세스 토큰이 만료되는 선택적 타임스탬프.
300 :param refresh_token: 서비스 공급자로부터 새로운 액세스
301 토큰을 얻기 위한 선택적 리프레시 토큰.
302 :param request: 작업을 트리거한 선택적 FastAPI 요청, 기본값은 None
303 :return: 사용자 객체.
304 """
305 oauth_account_dict = {
306 "oauth_name": oauth_name,
307 "avatar_url": profile_image,
308 "name": fullname,
309 "access_token": token_data.access_token,
310 "account_id": profile_id,
311 "account_email": profile_email,
312 "expires_at": token_data.expires_at,
313 "refresh_token": token_data.refresh_token,
314 }
316 # 기존 OAuth 계정이 있는지 확인
317 try:
318 user = await self.get_by_oauth_account(oauth_name, profile_id)
319 except UserNotExists:
320 try:
321 # 기존 이메일 사용자와 연동
322 user = await self.get_by_email(profile_email)
323 if not associate_by_email:
324 raise UserAlreadyExists()
325 user = await self.add_oauth_account(user, oauth_account_dict)
327 # 프로바이더에서 제공한 아바타/이름으로 사용자 정보 보강
328 updated = False
329 if profile_image and user.avatar_url != profile_image:
330 user.avatar_url = profile_image
331 updated = True
332 if fullname and (not user.full_name or user.full_name.strip() == ""):
333 user.full_name = fullname
334 updated = True
335 if updated:
336 await user.save()
337 except UserNotExists:
338 # 신규 사용자 생성
339 password = self.password_helper.generate_secure_password()
340 new_user = UserCreate(
341 email=profile_email,
342 full_name=fullname,
343 password=password,
344 avatar_url=profile_image,
345 is_verified=True,
346 is_active=True,
347 )
348 user = await self.create(new_user)
349 user = await self.add_oauth_account(user, oauth_account_dict)
350 await self.on_after_register_by_oauth(user, password, request)
351 else:
352 # 기존 OAuth 계정 정보 업데이트
353 for existing_oauth_account in user.oauth_accounts:
354 if (
355 existing_oauth_account.account_id == profile_id
356 and existing_oauth_account.oauth_name == oauth_name
357 ):
358 user = await self.update_oauth_account(
359 user, existing_oauth_account, oauth_account_dict
360 )
362 # 프로바이더에서 제공한 아바타/이름으로 사용자 정보 보강
363 updated = False
364 if profile_image and user.avatar_url != profile_image:
365 user.avatar_url = profile_image
366 updated = True
367 if fullname and (not user.full_name or user.full_name.strip() == ""):
368 user.full_name = fullname
369 updated = True
370 if updated:
371 await user.save()
373 return user
375 async def request_verify(self, user: User, request: Request | None = None) -> None:
376 """
377 인증 요청을 시작합니다.
379 성공 시 on_after_request_verify 핸들러를 트리거합니다.
381 :param user: 인증할 사용자.
382 :param request: 작업을 트리거한 선택적 FastAPI 요청, 기본값은 None입니다.
383 :raises UserInactive: 사용자가 비활성 상태입니다.
384 :raises UserAlreadyVerified: 사용자가 이미 인증되었습니다.
385 """
386 if not user.is_active:
387 raise UserInactive()
388 if user.is_verified:
389 raise UserAlreadyVerified()
391 # JWTManager를 통해 이메일 인증 토큰 생성 (aud/typ/iss 포함)
392 token = jwt_manager.create_verification_token(
393 user_id=str(user.id), email=user.email
394 )
395 await self.on_after_request_verify(user, token, request)
397 async def verify(self, token: str, request: Request | None = None) -> User:
398 """
399 인증 요청을 검증합니다.
401 사용자의 is_verified 플래그를 True로 변경합니다.
403 성공 시 on_after_verify 핸들러를 트리거합니다.
405 :param token: request_verify로 생성된 인증 토큰.
406 :param request: 작업을 트리거한 선택적 FastAPI 요청, 기본값은 None.
407 :raises InvalidVerifyToken: 토큰이 유효하지 않거나 만료됨.
408 :raises UserAlreadyVerified: 사용자가 이미 인증됨.
409 :return: 인증된 사용자.
410 """
411 try:
412 data = jwt_manager.decode_token(token)
413 except jwt.PyJWTError:
414 raise InvalidVerifyToken()
416 try:
417 # user_id = data["sub"]
418 email = data["email"]
419 except KeyError:
420 raise InvalidVerifyToken()
422 # aud/typ 검증
423 if data.get("aud") != "users:verify" or data.get("typ") != "verify":
424 raise InvalidVerifyToken()
426 try:
427 user = await self.get_by_email(email)
428 except UserNotExists:
429 raise InvalidVerifyToken()
430 if user.is_verified:
431 raise UserAlreadyVerified()
433 verified_user = await self._update(user, {"is_verified": True})
435 await self.on_after_verify(verified_user, request)
437 return verified_user
439 async def forgot_password(self, user: User, request: Request | None = None) -> None:
440 """
441 비밀번호 찾기 요청을 시작합니다.
443 성공 시 on_after_forgot_password 핸들러를 트리거합니다.
445 :param user: 비밀번호를 분실한 사용자입니다.
446 :param request: 작업을 트리거한 선택적 FastAPI 요청입니다.
447 기본값은 None입니다.
448 :raises UserInactive: 사용자가 비활성 상태입니다.
449 """
450 if not user.is_active:
451 raise UserInactive()
453 password_fingerprint = password_helper.hash(user.hashed_password)
454 # JWTManager를 통해 비밀번호 재설정 토큰 생성 (aud/typ/iss 포함)
455 token = jwt_manager.create_reset_password_token(
456 user_id=str(user.id), password_fingerprint=password_fingerprint
457 )
458 await self.on_after_forgot_password(user, token, request)
460 async def reset_password(
461 self, token: str, password: str, request: Request | None = None
462 ) -> User:
463 """
464 사용자의 비밀번호를 재설정합니다.
466 성공 시 on_after_reset_password 핸들러를 트리거합니다.
468 :param token: forgot_password에서 생성된 토큰입니다.
469 :param password: 설정할 새 비밀번호입니다.
470 :param request: 작업을 트리거한 선택적 FastAPI 요청, 기본값은 None입니다.
471 :raises InvalidResetPasswordToken: 토큰이 유효하지 않거나 만료되었습니다.
472 :raises UserInactive: 사용자가 비활성 상태입니다.
473 :raises InvalidPasswordException: 비밀번호가 유효하지 않습니다.
474 :return: 비밀번호가 업데이트된 사용자.
475 """
476 try:
477 data = jwt_manager.decode_token(token)
478 except jwt.PyJWTError:
479 raise InvalidResetPasswordToken()
481 try:
482 user_id = data["sub"]
483 password_fingerprint = data["password_fgpt"]
484 except KeyError:
485 raise InvalidResetPasswordToken()
487 # aud/typ 검증
488 if data.get("aud") != "users:reset" or data.get("typ") != "reset":
489 raise InvalidResetPasswordToken()
490 user = await self.get(user_id)
492 valid_password_fingerprint, _ = self.password_helper.verify_and_update(
493 user.hashed_password, password_fingerprint
494 )
495 if not valid_password_fingerprint:
496 raise InvalidResetPasswordToken()
498 if not user.is_active:
499 raise UserInactive()
501 updated_user = await self._update(user, {"password": password})
503 await self.on_after_reset_password(user, request)
505 return updated_user
507 async def update(
508 self,
509 obj_in: UserUpdate,
510 user: User,
511 request: Request | None = None,
512 ) -> User:
513 """
514 사용자 업데이트.
516 성공 시 on_after_update 핸들러를 트리거합니다.
518 :param obj_in: 사용자에게 적용할 변경 사항을 포함하는
519 UserUpdate 모델.
520 :param user: 업데이트할 현재 사용자.
521 :param safe: True인 경우 is_superuser 또는 is_verified와 같은 민감한 값이
522 업데이트 중 무시됩니다. 기본값은 False입니다.
523 :param request: 작업을 트리거한 선택적 FastAPI 요청입니다.
524 기본값은 None입니다.
525 :return: 업데이트된 사용자.
526 """
527 updated_user = await self._update(
528 user, self.model_dump(obj_in, exclude_unset=True)
529 )
530 await self.on_after_update(
531 updated_user, self.model_dump(updated_user, exclude_unset=True), request
532 )
533 return updated_user
535 async def delete(
536 self,
537 user: User,
538 request: Request | None = None,
539 ) -> None:
540 """
541 Delete a user.
543 :param user: The user to delete.
544 :param request: Optional FastAPI request that
545 triggered the operation, defaults to None.
546 """
547 await self.on_before_delete(user, request)
548 # 실제 삭제 수행 (Beanie Document instance 삭제)
549 await user.delete()
550 await self.on_after_delete(user, request)
552 async def validate_password(self, password: str, user: UserCreate | User) -> None:
553 """
554 Validate a password.
556 *You should overload this method to add your own validation logic.*
558 :param password: The password to validate.
559 :param user: The user associated to this password.
560 :raises InvalidPasswordException: The password is invalid.
561 :return: None if the password is valid.
562 """
564 return # pragma: no cover
566 async def on_after_register(self, user: User, request: Request | None = None):
567 """신규 가입 후 이메일 인증 발송"""
568 logger.info(f"New user registered: {user.email} (ID: {user.id})")
570 if not user.is_verified and settings.emails_enabled():
571 try:
572 # 인증 이메일 발송
573 origin = (
574 str(request.base_url).rstrip("/")
575 if request
576 else settings.FRONTEND_URL
577 )
578 email_data = generate_verification_email(user.email, origin)
580 send_email(
581 email_to=user.email,
582 subject=email_data.subject,
583 html_content=email_data.html_content,
584 )
586 logger.info(f"Verification email sent to {user.email}")
587 except Exception as e:
588 logger.error(f"Failed to send verification email to {user.email}: {e}")
590 async def on_after_register_by_oauth(
591 self, user: User, password: str | None = None, request: Request | None = None
592 ):
593 """OAuth로 신규 가입 후(관리자)"""
594 logger.info(f"New user registered via OAuth: {user.email} (ID: {user.id})")
596 if settings.emails_enabled() and password is not None:
597 try:
598 # 신규 계정 생성 이메일 발송
599 origin = (
600 str(request.base_url).rstrip("/")
601 if request
602 else settings.FRONTEND_URL
603 )
604 email_data = generate_new_account_email(
605 email_to=user.email,
606 username=user.full_name or user.email,
607 password=password,
608 origin=origin,
609 )
611 send_email(
612 email_to=user.email,
613 subject=email_data.subject,
614 html_content=email_data.html_content,
615 )
617 logger.info(f"New account email sent to {user.email}")
618 except Exception as e:
619 logger.error(f"Failed to send new account email to {user.email}: {e}")
621 async def on_after_update(
622 self, user: User, update_dict: dict, request: Request | None = None
623 ):
624 """사용자 정보 업데이트 후"""
625 logger.info(
626 f"User updated: {user.email} (ID: {user.id}), fields: {list(update_dict.keys())}"
627 )
628 # 캐시 무효화
629 try:
630 await get_user_cache().invalidate_user(str(user.id))
631 logger.debug(f"User cache invalidated after update: {user.id}")
632 except Exception as e:
633 logger.debug(f"Failed to invalidate user cache after update: {e}")
635 async def on_after_request_verify(
636 self, user: User, token: str, request: Request | None = None
637 ):
638 """인증 이메일 재요청 후 발송"""
639 logger.info(
640 f"Email verification re-requested for user: {user.email} (ID: {user.id})"
641 )
643 if settings.emails_enabled():
644 try:
645 origin = (
646 str(request.base_url).rstrip("/")
647 if request
648 else settings.FRONTEND_URL
649 )
650 email_data = generate_verification_email(user.email, origin)
652 send_email(
653 email_to=user.email,
654 subject=email_data.subject,
655 html_content=email_data.html_content,
656 )
658 logger.info(f"Verification email re-sent to {user.email}")
659 except Exception as e:
660 logger.error(
661 f"Failed to re-send verification email to {user.email}: {e}"
662 )
664 async def on_after_verify(self, user: User, request: Request | None = None):
665 """이메일 인증 완료 후"""
666 logger.info(f"User email verified successfully: {user.email} (ID: {user.id})")
668 async def on_after_forgot_password(
669 self, user: User, token: str, request: Request | None = None
670 ):
671 """패스워드 복구 요청 후 이메일 발송"""
672 logger.info(f"Password reset requested for user: {user.email} (ID: {user.id})")
674 if settings.emails_enabled():
675 try:
676 origin = (
677 str(request.base_url).rstrip("/")
678 if request
679 else settings.FRONTEND_URL
680 )
681 email_data = generate_reset_password_email(
682 email_to=user.email,
683 email=user.email,
684 token=token,
685 origin=origin,
686 )
688 send_email(
689 email_to=user.email,
690 subject=email_data.subject,
691 html_content=email_data.html_content,
692 )
694 logger.info(f"Password reset email sent to {user.email}")
695 except Exception as e:
696 logger.error(
697 f"Failed to send password reset email to {user.email}: {e}"
698 )
700 async def on_after_reset_password(
701 self, user: User, request: Request | None = None
702 ) -> None:
703 """
704 비밀번호 재설정 성공 후 실행되는 로직.
706 보안 알림 이메일을 발송하고 로그를 기록합니다.
708 :param user: 비밀번호를 재설정한 사용자.
709 :param request: 작업을 트리거한 선택적 FastAPI 요청, 기본값은 None.
710 """
711 logger.info(f"Password reset completed for user: {user.email} (ID: {user.id})")
713 # 보안 알림 이메일 발송
714 if settings.emails_enabled():
715 try:
716 origin = (
717 str(request.base_url).rstrip("/")
718 if request
719 else settings.FRONTEND_URL
720 )
722 # 새로운 템플릿을 사용한 보안 알림 이메일
723 from ..email.email_gen import generate_password_reset_confirmation_email
725 email_data = generate_password_reset_confirmation_email(
726 email_to=user.email,
727 username=user.full_name or user.email,
728 origin=origin,
729 )
731 send_email(
732 email_to=user.email,
733 subject=email_data.subject,
734 html_content=email_data.html_content,
735 )
737 logger.info(f"Password reset confirmation email sent to {user.email}")
738 except Exception as e:
739 logger.error(
740 f"Failed to send password reset confirmation email to {user.email}: {e}"
741 )
743 async def on_after_login(
744 self,
745 user: User,
746 request: Request | None = None,
747 response: Response | None = None,
748 ) -> None:
749 """
750 Perform logic after user login.
752 *You should overload this method to add your own logic.*
754 :param user: The user that is logging in
755 :param request: Optional FastAPI request
756 :param response: Optional response built by the transport.
757 Defaults to None
758 """
759 # 로그인 활동 기록 업데이트
760 await self.update_login_activity(user, request)
761 return # pragma: no cover
763 async def update_login_activity(
764 self, user: User, request: Request | None = None
765 ) -> User:
766 """
767 사용자 로그인 활동 기록 업데이트.
769 :param user: 로그인한 사용자
770 :param request: 선택적 FastAPI 요청
771 :return: 업데이트된 사용자
772 """
773 now = datetime.now(timezone.utc)
774 client_ip = None
776 if request and request.client:
777 client_ip = request.client.host
778 # X-Forwarded-For 헤더 확인 (프록시/로드밸런서 뒤에 있는 경우)
779 forwarded_for = request.headers.get("x-forwarded-for")
780 if forwarded_for:
781 client_ip = forwarded_for.split(",")[0].strip()
783 update_dict: dict[str, Any] = {
784 "last_login_at": now,
785 "last_activity_at": now,
786 "login_count": user.login_count + 1,
787 }
789 if client_ip:
790 update_dict["last_login_ip"] = client_ip
791 update_dict["last_activity_ip"] = client_ip
793 updated_user = await self._update(user, update_dict)
794 logger.info(
795 f"Login activity recorded for user: {user.email} "
796 f"(ID: {user.id}, IP: {client_ip}, Login count: {updated_user.login_count})"
797 )
799 return updated_user
801 async def update_activity(self, user: User, request: Request | None = None) -> User:
802 """
803 사용자 활동 시간 업데이트 (마지막 활동 시각만 갱신).
805 API 호출 등 일반적인 활동에 사용합니다.
807 :param user: 활동한 사용자
808 :param request: 선택적 FastAPI 요청
809 :return: 업데이트된 사용자
810 """
811 now = datetime.now(timezone.utc)
812 client_ip = None
814 if request and request.client:
815 client_ip = request.client.host
816 forwarded_for = request.headers.get("x-forwarded-for")
817 if forwarded_for:
818 client_ip = forwarded_for.split(",")[0].strip()
820 update_dict: dict[str, Any] = {"last_activity_at": now}
822 if client_ip:
823 update_dict["last_activity_ip"] = client_ip
825 updated_user = await self._update(user, update_dict)
827 return updated_user
829 async def get_user_activity_summary(self, user: User) -> dict[str, Any]:
830 """
831 사용자 활동 요약 정보 반환.
833 :param user: 조회할 사용자
834 :return: 활동 요약 정보
835 """
836 return {
837 "user_id": str(user.id),
838 "email": user.email,
839 "last_login_at": user.last_login_at,
840 "last_activity_at": user.last_activity_at,
841 "login_count": user.login_count,
842 "last_login_ip": user.last_login_ip,
843 "last_activity_ip": user.last_activity_ip,
844 "account_created_at": user.created_at,
845 "is_active": user.is_active,
846 "is_verified": user.is_verified,
847 }
849 async def on_after_logout(
850 self,
851 user: User,
852 request: Request | None = None,
853 ) -> None:
854 """
855 로그아웃 후 실행되는 로직.
857 *필요에 따라 이 메소드를 오버로드하여 사용자 정의 로직을 추가할 수 있습니다.*
859 :param user: 로그아웃하는 사용자
860 :param request: 선택적 FastAPI 요청
861 """
862 logger.info(f"User logged out: {user.email} (ID: {user.id})")
863 # 로그아웃 시 캐시 무효화 정책 적용 (세션 종료 시점에 최신 정책 반영)
864 try:
865 await get_user_cache().invalidate_user(str(user.id))
866 logger.debug(f"User cache invalidated after logout: {user.id}")
867 except Exception as e:
868 logger.debug(f"Failed to invalidate user cache after logout: {e}")
869 return # pragma: no cover
871 async def on_before_delete(
872 self, user: User, request: Request | None = None
873 ) -> None:
874 """
875 Perform logic before user delete.
877 *You should overload this method to add your own logic.*
879 :param user: The user to be deleted
880 :param request: Optional FastAPI request that
881 triggered the operation, defaults to None.
882 """
883 return # pragma: no cover
885 async def on_after_delete(self, user: User, request: Request | None = None) -> None:
886 """
887 Perform logic before user delete.
889 *You should overload this method to add your own logic.*
891 :param user: The user to be deleted
892 :param request: Optional FastAPI request that
893 triggered the operation, defaults to None.
894 """
895 # 캐시 무효화
896 try:
897 await get_user_cache().invalidate_user(str(user.id))
898 logger.debug(f"User cache invalidated after delete: {user.id}")
899 except Exception as e:
900 logger.debug(f"Failed to invalidate user cache after delete: {e}")
901 return # pragma: no cover
903 async def authenticate(self, username: str, password: str) -> User | None:
904 """
905 Authenticate and return a user following an email and a password.
907 Will automatically upgrade password hash if necessary.
909 :param credentials: The user credentials.
910 """
911 try:
912 user = await self.get_by_email(username)
913 except UserNotExists:
914 # Run the hasher to mitigate timing attack
915 # Inspired from Django: https://code.djangoproject.com/ticket/20760
916 self.password_helper.hash(password)
917 return None
919 verified, updated_password_hash = self.password_helper.verify_and_update(
920 password, user.hashed_password
921 )
922 if not verified:
923 return None
924 # Update password hash to a more robust one if needed
925 if updated_password_hash is not None:
926 await self._update(user, {"hashed_password": updated_password_hash})
928 return user
930 async def _update(self, user: User, update_dict: dict[str, Any]) -> User:
931 validated_update_dict: dict[str, Any] = {}
932 for field, value in update_dict.items():
933 if field == "email" and value != user.email:
934 try:
935 await self.get_by_email(value)
936 raise UserAlreadyExists()
937 except UserNotExists:
938 validated_update_dict["email"] = value
939 validated_update_dict["is_verified"] = False
940 elif field == "password" and value is not None:
941 await self.validate_password(value, user)
942 validated_update_dict["hashed_password"] = self.password_helper.hash(
943 value
944 )
945 else:
946 validated_update_dict[field] = value
948 # Beanie Document update
949 for key, val in validated_update_dict.items():
950 setattr(user, key, val)
951 await user.save()
952 # 업데이트 후 캐시 무효화 (중앙화)
953 try:
954 await get_user_cache().invalidate_user(str(user.id))
955 logger.debug(f"User cache invalidated in _update: {user.id}")
956 except Exception as e:
957 logger.debug(f"Failed to invalidate user cache in _update: {e}")
958 return user
961class UUIDIDMixin:
962 def parse_id(self, value: Any) -> uuid.UUID:
963 if isinstance(value, uuid.UUID):
964 return value
965 try:
966 return uuid.UUID(value)
967 except ValueError as e:
968 raise InvalidID() from e
971class IntegerIDMixin:
972 def parse_id(self, value: Any) -> int:
973 if isinstance(value, float):
974 raise InvalidID()
975 try:
976 return int(value)
977 except ValueError as e:
978 raise InvalidID() from e
981UserManagerDependency = DependencyCallable[UserManager]