Coverage for src / fastapi_authly / core / security.py: 100%

31 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-20 11:54 +0800

1from datetime import datetime, timedelta, timezone 

2from typing import Any, Dict, Optional 

3 

4from jose import JWTError, jwt 

5from passlib.context import CryptContext 

6 

7from ..interfaces import PasswordHasher, TokenService 

8 

9 

10class BcryptPasswordHasher(PasswordHasher): 

11 """Default bcrypt-based password hasher.""" 

12 

13 def __init__(self) -> None: 

14 # pbkdf2_sha256 avoids platform-specific bcrypt backend issues 

15 self._ctx = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") 

16 

17 def hash(self, password: str) -> str: 

18 return self._ctx.hash(password) 

19 

20 def verify(self, password: str, hashed: str) -> bool: 

21 return self._ctx.verify(password, hashed) 

22 

23 

24class JWTTokenService(TokenService): 

25 """Default JWT token service using python-jose.""" 

26 

27 def __init__( 

28 self, 

29 *, 

30 secret_key: str, 

31 algorithm: str, 

32 access_token_expire_minutes: int, 

33 refresh_token_expire_days: int, 

34 ) -> None: 

35 self.secret_key = secret_key 

36 self.algorithm = algorithm 

37 self.access_token_expire = timedelta(minutes=access_token_expire_minutes) 

38 self.refresh_token_expire = timedelta(days=refresh_token_expire_days) 

39 

40 def _create_token(self, subject: str | Any, expires_delta: timedelta, token_type: str) -> str: 

41 expire = datetime.now() + expires_delta 

42 to_encode: Dict[str, Any] = { 

43 "exp": expire, 

44 "sub": str(subject), 

45 "type": token_type, 

46 } 

47 return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm) 

48 

49 def create_access_token(self, subject: str | Any) -> str: 

50 return self._create_token(subject, self.access_token_expire, token_type="access") 

51 

52 def create_refresh_token(self, subject: str | Any) -> str: 

53 return self._create_token(subject, self.refresh_token_expire, token_type="refresh") 

54 

55 def decode_token(self, token: str, verify_type: Optional[str] = None) -> Dict[str, Any]: 

56 payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm]) 

57 if verify_type and payload.get("type") != verify_type: 

58 raise JWTError("Invalid token type") 

59 return payload