Coverage for src / mysingle / auth / deps / decorators.py: 0%
91 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1from __future__ import annotations
3import asyncio
4from functools import wraps
5from typing import Any, Callable
7from fastapi import Request
9from ...core.logging import get_structured_logger
10from ..exceptions import AuthorizationFailed
11from .core import (
12 get_current_active_superuser,
13 get_current_active_verified_user,
14 get_current_user,
15)
16from .permissions import require_user_role
18logger = get_structured_logger(__name__)
21def _extract_request(*args: Any, **kwargs: Any) -> Request:
22 """전달된 args/kwargs에서 FastAPI Request를 추출.
23 엔드포인트 첫 인자 또는 키워드 인자에 존재한다고 가정한다.
24 """
25 for arg in args:
26 if isinstance(arg, Request):
27 return arg
28 for value in kwargs.values():
29 if isinstance(value, Request):
30 return value
31 raise RuntimeError("Request object not found in endpoint parameters")
34def _ensure_async(func: Callable[..., Any]) -> Callable[..., Any]:
35 """func가 sync면 스레드 풀로 감싸고, async면 그대로 반환."""
36 if asyncio.iscoroutinefunction(func):
37 return func
39 @wraps(func)
40 async def wrapper(*args: Any, **kwargs: Any):
41 loop = asyncio.get_running_loop()
42 return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
44 return wrapper
47def authenticated(func: Callable[..., Any]) -> Callable[..., Any]:
48 """인증 필수 데코레이터 (기본 사용자 보장)"""
50 async_func = _ensure_async(func)
52 @wraps(func)
53 async def inner(*args: Any, **kwargs: Any):
54 request = _extract_request(*args, **kwargs)
55 # 검증: 미들웨어가 주입한 사용자 보장
56 _ = get_current_user(request)
57 return await async_func(*args, **kwargs)
59 return inner
62def verified_only(func: Callable[..., Any]) -> Callable[..., Any]:
63 """이메일 검증 사용자만 허용"""
65 async_func = _ensure_async(func)
67 @wraps(func)
68 async def inner(*args: Any, **kwargs: Any):
69 request = _extract_request(*args, **kwargs)
70 _ = get_current_active_verified_user(request)
71 return await async_func(*args, **kwargs)
73 return inner
76def admin_only(func: Callable[..., Any]) -> Callable[..., Any]:
77 """관리자(슈퍼유저) 전용"""
79 async_func = _ensure_async(func)
81 @wraps(func)
82 async def inner(*args: Any, **kwargs: Any):
83 request = _extract_request(*args, **kwargs)
84 _ = get_current_active_superuser(request)
85 return await async_func(*args, **kwargs)
87 return inner
90def roles_required(*roles: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
91 """특정 역할 요구 데코레이터"""
93 def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
94 async_func = _ensure_async(func)
96 @wraps(func)
97 async def inner(*args: Any, **kwargs: Any):
98 request = _extract_request(*args, **kwargs)
99 _ = require_user_role(request, list(roles))
100 return await async_func(*args, **kwargs)
102 return inner
104 return decorator
107def resource_owner_required(
108 param_name: str | None = None,
109 *,
110 extractor: Callable[[Request, dict[str, Any]], Any] | None = None,
111) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
112 """
113 리소스 소유자 요구 데코레이터.
115 - 엔드포인트의 경로/쿼리 인자 중 `param_name`으로 전달된 사용자 ID와
116 현재 인증된 사용자 ID가 동일해야 접근을 허용합니다.
117 - param_name 값은 함수의 키워드 인자(kwargs), Request.path_params, Request.query_params에서 찾습니다.
118 - extractor 콜백을 제공하면 Request와 kwargs를 받아 소유자 ID를 직접 추출할 수 있습니다.
119 - 비교는 문자열로 수행합니다.
120 """
122 def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
123 async_func = _ensure_async(func)
125 @wraps(func)
126 async def inner(*args: Any, **kwargs: Any):
127 request = _extract_request(*args, **kwargs)
128 current_user = get_current_user(request)
130 owner_val: Any | None = None
131 # 0) 커스텀 extractor가 제공되면 우선 사용
132 if extractor is not None:
133 try:
134 owner_val = extractor(request, kwargs)
135 except Exception:
136 owner_val = None
137 # 1) extractor 결과가 없고 param_name이 제공된 경우 자동 탐색
138 if owner_val is None:
139 if not param_name:
140 raise AuthorizationFailed(
141 required_permission="resource_owner",
142 user_id=str(current_user.id),
143 )
144 # 1-1) 함수 키워드 인자에서 찾기
145 owner_val = kwargs.get(param_name)
146 # 1-2) 없으면 path_params에서 찾기
147 if owner_val is None and hasattr(request, "path_params"):
148 owner_val = request.path_params.get(param_name)
149 # 1-3) 없으면 query_params에서 찾기
150 if owner_val is None and hasattr(request, "query_params"):
151 try:
152 owner_val = request.query_params.get(param_name)
153 except Exception:
154 pass
156 if owner_val is None:
157 raise AuthorizationFailed(
158 required_permission=f"resource_owner:{param_name or 'custom'}",
159 user_id=str(current_user.id),
160 )
162 # 문자열 비교로 통일
163 if str(owner_val) != str(current_user.id):
164 raise AuthorizationFailed(
165 required_permission=f"resource_owner:{param_name or 'custom'}",
166 user_id=str(current_user.id),
167 )
169 return await async_func(*args, **kwargs)
171 return inner
173 return decorator