Coverage for src/auth/api/user_controller.py: 100%
31 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-26 17:22 +0300
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-26 17:22 +0300
1from typing import Annotated
3from advanced_alchemy.extensions.litestar.dto import SQLAlchemyDTO, SQLAlchemyDTOConfig
4from advanced_alchemy.extensions.litestar.providers import create_service_dependencies
5from advanced_alchemy.filters import FilterTypes
6from advanced_alchemy.service import OffsetPagination
7from litestar import delete, get, post
8from litestar.controller import Controller
9from litestar.params import Dependency
10import msgspec
12from ..models import UserModel
13from ..services import UserService
16class UserCreateRequest(msgspec.Struct):
17 email: str
18 password: str
19 is_email_verified: bool
20 is_enabled: bool
23class UserDTO(SQLAlchemyDTO[UserModel]):
24 config = SQLAlchemyDTOConfig(exclude={"password"})
27class UserController(Controller):
28 dependencies = create_service_dependencies(
29 UserService,
30 key="service",
31 filters={
32 "created_at": True,
33 "updated_at": True,
34 "sort_field": "email",
35 "search": "email",
36 },
37 )
38 return_dto = UserDTO
40 @get(operation_id="ListUsers", path="/users")
41 async def list_users(
42 self,
43 service: UserService,
44 filters: Annotated[list[FilterTypes], Dependency(skip_validation=True)],
45 ) -> OffsetPagination[UserModel]:
46 results, total = await service.list_and_count(*filters)
47 return service.to_schema(data=results, total=total, filters=filters)
49 @post(operation_id="CreateUser", path="/users")
50 async def create_user(self, service: UserService, data: UserCreateRequest) -> UserModel:
51 return await service.create(UserModel(**msgspec.to_builtins(data)), auto_commit=True)
53 @delete(operation_id="DeleteUser", path="/users/{item_id:str}")
54 async def delete_user(self, service: UserService, item_id: str) -> None:
55 await service.delete(item_id, auto_commit=True)