Coverage for src/auth/services.py: 98%

83 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-26 17:22 +0300

1from typing import Any 

2import advanced_alchemy 

3import bcrypt 

4import jwt 

5from advanced_alchemy.repository import SQLAlchemyAsyncRepository 

6from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService 

7from uuid import UUID 

8import msgspec 

9from sqlalchemy.ext.asyncio import AsyncSession 

10 

11from core.settings import settings 

12from core.mail import MailClient 

13from .models import PermissionModel, RoleModel, UserModel 

14 

15__all__ = [ 

16 "UserService", 

17 "AuthService", 

18 "PermissionService", 

19 "RoleService", 

20 "LoginUserDTO", 

21 "SignupUserDTO", 

22 "provide_user_service", 

23 "provide_auth_service", 

24 "provide_permission_service", 

25 "provide_role_service", 

26 "InvalidPasswordError", 

27 "InvalidEmailError", 

28 "DecodeTokenError", 

29] 

30 

31 

32class InvalidPasswordError(Exception): 

33 pass 

34 

35 

36class InvalidEmailError(Exception): 

37 pass 

38 

39 

40class DecodeTokenError(Exception): 

41 pass 

42 

43 

44class LoginUserDTO(msgspec.Struct): 

45 email: str 

46 password: str 

47 

48 

49class SignupUserDTO(msgspec.Struct): 

50 email: str 

51 password: str 

52 

53 

54class AccountDTO(msgspec.Struct): 

55 id: UUID 

56 email: str 

57 

58 

59class UserService(SQLAlchemyAsyncRepositoryService): 

60 class Repository(SQLAlchemyAsyncRepository[UserModel]): 

61 model_type = UserModel 

62 

63 repository_type = Repository 

64 

65 

66class RoleService(SQLAlchemyAsyncRepositoryService): 

67 class Repository(SQLAlchemyAsyncRepository[RoleModel]): 

68 model_type = RoleModel 

69 

70 repository_type = Repository 

71 

72 

73class PermissionService(SQLAlchemyAsyncRepositoryService): 

74 class Repository(SQLAlchemyAsyncRepository[PermissionModel]): 

75 model_type = PermissionModel 

76 

77 repository_type = Repository 

78 

79 

80class AuthService: 

81 def __init__(self, session: AsyncSession, mail_client: MailClient, user_service: UserService): 

82 self.session = session 

83 self.mail_client = mail_client 

84 self.user_service = user_service 

85 

86 class Repository(SQLAlchemyAsyncRepository[UserModel]): 

87 model_type = UserModel 

88 

89 repository_type = Repository 

90 

91 async def signup(self, user: SignupUserDTO) -> UserModel: 

92 self.mail_client.send([user.email], "Sign up", "sign up info code") 

93 return await self.user_service.create( 

94 UserModel( 

95 email=user.email, 

96 password=self.hash_password(user.password), 

97 is_email_verified=False, 

98 is_enabled=True, 

99 ), 

100 auto_commit=True, 

101 ) 

102 

103 async def login(self, user: LoginUserDTO) -> str: 

104 try: 

105 login_user = await self.user_service.get_one(UserModel.email == user.email) 

106 except advanced_alchemy.exceptions.NotFoundError: 

107 raise InvalidEmailError(f"Email {user.email} not found") 

108 

109 if not self._check_password(user.password, login_user.password): 

110 raise InvalidPasswordError 

111 

112 return self._encode_token( 

113 AccountDTO( 

114 id=login_user.id, 

115 email=login_user.email, 

116 ) 

117 ) 

118 

119 @staticmethod 

120 def _check_password(password: str, hashed_password: str) -> bool: 

121 return bcrypt.checkpw(password.encode("utf-8"), hashed_password.encode("utf-8")) 

122 

123 @staticmethod 

124 def hash_password(password: str) -> str: 

125 return bcrypt.hashpw(password.encode("utf8"), bcrypt.gensalt()).decode("utf8") 

126 

127 async def get_account(self, token: str) -> AccountDTO: 

128 try: 

129 account_data = self._decode_token(token) 

130 return AccountDTO(**account_data) 

131 except Exception: 

132 raise DecodeTokenError 

133 

134 def _encode_token(self, account: AccountDTO) -> str: 

135 return jwt.encode(msgspec.to_builtins(account), settings.jwt_secret, algorithm="HS256") 

136 

137 def _decode_token(self, token: str) -> dict[str, Any]: 

138 return jwt.decode(token, settings.jwt_secret, algorithms=["HS256"]) 

139 

140 

141async def provide_user_service(db_session: AsyncSession) -> UserService: 

142 return UserService(session=db_session) 

143 

144 

145async def provide_role_service(db_session: AsyncSession) -> RoleService: 

146 return RoleService(session=db_session) 

147 

148 

149async def provide_permission_service(db_session: AsyncSession) -> PermissionService: 

150 return PermissionService(session=db_session) 

151 

152 

153async def provide_auth_service( 

154 db_session: AsyncSession, 

155 mail_client: MailClient, 

156 user_service: UserService, 

157) -> AuthService: 

158 return AuthService(session=db_session, mail_client=mail_client, user_service=user_service)