Coverage for src / mysingle / auth / deps / decorators.py: 0%

91 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 00:58 +0900

1from __future__ import annotations 

2 

3import asyncio 

4from functools import wraps 

5from typing import Any, Callable 

6 

7from fastapi import Request 

8 

9from ...core.logging import get_structured_logger 

10from ..exceptions import AuthorizationFailed 

11from .core import ( 

12 get_current_active_superuser, 

13 get_current_active_verified_user, 

14 get_current_user, 

15) 

16from .permissions import require_user_role 

17 

18logger = get_structured_logger(__name__) 

19 

20 

21def _extract_request(*args: Any, **kwargs: Any) -> Request: 

22 """전달된 args/kwargs에서 FastAPI Request를 추출. 

23 엔드포인트 첫 인자 또는 키워드 인자에 존재한다고 가정한다. 

24 """ 

25 for arg in args: 

26 if isinstance(arg, Request): 

27 return arg 

28 for value in kwargs.values(): 

29 if isinstance(value, Request): 

30 return value 

31 raise RuntimeError("Request object not found in endpoint parameters") 

32 

33 

34def _ensure_async(func: Callable[..., Any]) -> Callable[..., Any]: 

35 """func가 sync면 스레드 풀로 감싸고, async면 그대로 반환.""" 

36 if asyncio.iscoroutinefunction(func): 

37 return func 

38 

39 @wraps(func) 

40 async def wrapper(*args: Any, **kwargs: Any): 

41 loop = asyncio.get_running_loop() 

42 return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) 

43 

44 return wrapper 

45 

46 

47def authenticated(func: Callable[..., Any]) -> Callable[..., Any]: 

48 """인증 필수 데코레이터 (기본 사용자 보장)""" 

49 

50 async_func = _ensure_async(func) 

51 

52 @wraps(func) 

53 async def inner(*args: Any, **kwargs: Any): 

54 request = _extract_request(*args, **kwargs) 

55 # 검증: 미들웨어가 주입한 사용자 보장 

56 _ = get_current_user(request) 

57 return await async_func(*args, **kwargs) 

58 

59 return inner 

60 

61 

62def verified_only(func: Callable[..., Any]) -> Callable[..., Any]: 

63 """이메일 검증 사용자만 허용""" 

64 

65 async_func = _ensure_async(func) 

66 

67 @wraps(func) 

68 async def inner(*args: Any, **kwargs: Any): 

69 request = _extract_request(*args, **kwargs) 

70 _ = get_current_active_verified_user(request) 

71 return await async_func(*args, **kwargs) 

72 

73 return inner 

74 

75 

76def admin_only(func: Callable[..., Any]) -> Callable[..., Any]: 

77 """관리자(슈퍼유저) 전용""" 

78 

79 async_func = _ensure_async(func) 

80 

81 @wraps(func) 

82 async def inner(*args: Any, **kwargs: Any): 

83 request = _extract_request(*args, **kwargs) 

84 _ = get_current_active_superuser(request) 

85 return await async_func(*args, **kwargs) 

86 

87 return inner 

88 

89 

90def roles_required(*roles: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]: 

91 """특정 역할 요구 데코레이터""" 

92 

93 def decorator(func: Callable[..., Any]) -> Callable[..., Any]: 

94 async_func = _ensure_async(func) 

95 

96 @wraps(func) 

97 async def inner(*args: Any, **kwargs: Any): 

98 request = _extract_request(*args, **kwargs) 

99 _ = require_user_role(request, list(roles)) 

100 return await async_func(*args, **kwargs) 

101 

102 return inner 

103 

104 return decorator 

105 

106 

107def resource_owner_required( 

108 param_name: str | None = None, 

109 *, 

110 extractor: Callable[[Request, dict[str, Any]], Any] | None = None, 

111) -> Callable[[Callable[..., Any]], Callable[..., Any]]: 

112 """ 

113 리소스 소유자 요구 데코레이터. 

114 

115 - 엔드포인트의 경로/쿼리 인자 중 `param_name`으로 전달된 사용자 ID와 

116 현재 인증된 사용자 ID가 동일해야 접근을 허용합니다. 

117 - param_name 값은 함수의 키워드 인자(kwargs), Request.path_params, Request.query_params에서 찾습니다. 

118 - extractor 콜백을 제공하면 Request와 kwargs를 받아 소유자 ID를 직접 추출할 수 있습니다. 

119 - 비교는 문자열로 수행합니다. 

120 """ 

121 

122 def decorator(func: Callable[..., Any]) -> Callable[..., Any]: 

123 async_func = _ensure_async(func) 

124 

125 @wraps(func) 

126 async def inner(*args: Any, **kwargs: Any): 

127 request = _extract_request(*args, **kwargs) 

128 current_user = get_current_user(request) 

129 

130 owner_val: Any | None = None 

131 # 0) 커스텀 extractor가 제공되면 우선 사용 

132 if extractor is not None: 

133 try: 

134 owner_val = extractor(request, kwargs) 

135 except Exception: 

136 owner_val = None 

137 # 1) extractor 결과가 없고 param_name이 제공된 경우 자동 탐색 

138 if owner_val is None: 

139 if not param_name: 

140 raise AuthorizationFailed( 

141 required_permission="resource_owner", 

142 user_id=str(current_user.id), 

143 ) 

144 # 1-1) 함수 키워드 인자에서 찾기 

145 owner_val = kwargs.get(param_name) 

146 # 1-2) 없으면 path_params에서 찾기 

147 if owner_val is None and hasattr(request, "path_params"): 

148 owner_val = request.path_params.get(param_name) 

149 # 1-3) 없으면 query_params에서 찾기 

150 if owner_val is None and hasattr(request, "query_params"): 

151 try: 

152 owner_val = request.query_params.get(param_name) 

153 except Exception: 

154 pass 

155 

156 if owner_val is None: 

157 raise AuthorizationFailed( 

158 required_permission=f"resource_owner:{param_name or 'custom'}", 

159 user_id=str(current_user.id), 

160 ) 

161 

162 # 문자열 비교로 통일 

163 if str(owner_val) != str(current_user.id): 

164 raise AuthorizationFailed( 

165 required_permission=f"resource_owner:{param_name or 'custom'}", 

166 user_id=str(current_user.id), 

167 ) 

168 

169 return await async_func(*args, **kwargs) 

170 

171 return inner 

172 

173 return decorator