Coverage for src/auth/services.py: 98%
83 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-26 17:22 +0300
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-26 17:22 +0300
1from typing import Any
2import advanced_alchemy
3import bcrypt
4import jwt
5from advanced_alchemy.repository import SQLAlchemyAsyncRepository
6from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
7from uuid import UUID
8import msgspec
9from sqlalchemy.ext.asyncio import AsyncSession
11from core.settings import settings
12from core.mail import MailClient
13from .models import PermissionModel, RoleModel, UserModel
15__all__ = [
16 "UserService",
17 "AuthService",
18 "PermissionService",
19 "RoleService",
20 "LoginUserDTO",
21 "SignupUserDTO",
22 "provide_user_service",
23 "provide_auth_service",
24 "provide_permission_service",
25 "provide_role_service",
26 "InvalidPasswordError",
27 "InvalidEmailError",
28 "DecodeTokenError",
29]
32class InvalidPasswordError(Exception):
33 pass
36class InvalidEmailError(Exception):
37 pass
40class DecodeTokenError(Exception):
41 pass
44class LoginUserDTO(msgspec.Struct):
45 email: str
46 password: str
49class SignupUserDTO(msgspec.Struct):
50 email: str
51 password: str
54class AccountDTO(msgspec.Struct):
55 id: UUID
56 email: str
59class UserService(SQLAlchemyAsyncRepositoryService):
60 class Repository(SQLAlchemyAsyncRepository[UserModel]):
61 model_type = UserModel
63 repository_type = Repository
66class RoleService(SQLAlchemyAsyncRepositoryService):
67 class Repository(SQLAlchemyAsyncRepository[RoleModel]):
68 model_type = RoleModel
70 repository_type = Repository
73class PermissionService(SQLAlchemyAsyncRepositoryService):
74 class Repository(SQLAlchemyAsyncRepository[PermissionModel]):
75 model_type = PermissionModel
77 repository_type = Repository
80class AuthService:
81 def __init__(self, session: AsyncSession, mail_client: MailClient, user_service: UserService):
82 self.session = session
83 self.mail_client = mail_client
84 self.user_service = user_service
86 class Repository(SQLAlchemyAsyncRepository[UserModel]):
87 model_type = UserModel
89 repository_type = Repository
91 async def signup(self, user: SignupUserDTO) -> UserModel:
92 self.mail_client.send([user.email], "Sign up", "sign up info code")
93 return await self.user_service.create(
94 UserModel(
95 email=user.email,
96 password=self.hash_password(user.password),
97 is_email_verified=False,
98 is_enabled=True,
99 ),
100 auto_commit=True,
101 )
103 async def login(self, user: LoginUserDTO) -> str:
104 try:
105 login_user = await self.user_service.get_one(UserModel.email == user.email)
106 except advanced_alchemy.exceptions.NotFoundError:
107 raise InvalidEmailError(f"Email {user.email} not found")
109 if not self._check_password(user.password, login_user.password):
110 raise InvalidPasswordError
112 return self._encode_token(
113 AccountDTO(
114 id=login_user.id,
115 email=login_user.email,
116 )
117 )
119 @staticmethod
120 def _check_password(password: str, hashed_password: str) -> bool:
121 return bcrypt.checkpw(password.encode("utf-8"), hashed_password.encode("utf-8"))
123 @staticmethod
124 def hash_password(password: str) -> str:
125 return bcrypt.hashpw(password.encode("utf8"), bcrypt.gensalt()).decode("utf8")
127 async def get_account(self, token: str) -> AccountDTO:
128 try:
129 account_data = self._decode_token(token)
130 return AccountDTO(**account_data)
131 except Exception:
132 raise DecodeTokenError
134 def _encode_token(self, account: AccountDTO) -> str:
135 return jwt.encode(msgspec.to_builtins(account), settings.jwt_secret, algorithm="HS256")
137 def _decode_token(self, token: str) -> dict[str, Any]:
138 return jwt.decode(token, settings.jwt_secret, algorithms=["HS256"])
141async def provide_user_service(db_session: AsyncSession) -> UserService:
142 return UserService(session=db_session)
145async def provide_role_service(db_session: AsyncSession) -> RoleService:
146 return RoleService(session=db_session)
149async def provide_permission_service(db_session: AsyncSession) -> PermissionService:
150 return PermissionService(session=db_session)
153async def provide_auth_service(
154 db_session: AsyncSession,
155 mail_client: MailClient,
156 user_service: UserService,
157) -> AuthService:
158 return AuthService(session=db_session, mail_client=mail_client, user_service=user_service)