Coverage for fss\starter\system\service\impl\user_service_impl.py: 62%

93 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-13 15:26 +0800

1"""User domain service impl""" 

2 

3import io 

4from datetime import timedelta, datetime 

5from typing import Optional 

6 

7import pandas as pd 

8from fastapi import UploadFile 

9from fastapi_pagination import Params 

10from loguru import logger 

11from starlette.responses import StreamingResponse 

12 

13from fss.common.cache.cache import get_cache_client, Cache 

14from fss.common.config import configs 

15from fss.common.enum.enum import TokenTypeEnum 

16from fss.common.schema.schema import Token 

17from fss.common.service.impl.service_impl import ServiceImpl 

18from fss.common.util import security 

19from fss.common.util.security import verify_password, get_password_hash 

20from fss.starter.system.enum.system import SystemResponseCode, SystemConstantCode 

21from fss.starter.system.exception.system import SystemException 

22from fss.starter.system.mapper.user_mapper import UserMapper, userMapper 

23from fss.starter.system.schema.user_schema import UserQuery, LoginCmd, UserDO 

24from fss.starter.system.service.user_service import UserService 

25 

26 

27class UserServiceImpl(ServiceImpl[UserMapper, UserDO], UserService): 

28 def __init__(self, mapper: UserMapper): 

29 super(UserServiceImpl, self).__init__(mapper=mapper) 

30 self.mapper = mapper 

31 

32 async def find_by_id(self, id: int) -> Optional[UserQuery]: 

33 """ 

34 Retrieval user through user id 

35 :param id: user id 

36 :return: user or none 

37 """ 

38 user_do = await self.mapper.select_by_id(id=id) 

39 if user_do: 

40 return UserQuery(**user_do.model_dump()) 

41 else: 

42 return None 

43 

44 async def login(self, loginCmd: LoginCmd) -> Token: 

45 """ 

46 Do log in 

47 :param loginCmd: loginCmd 

48 :return: access token and refresh token 

49 """ 

50 username: str = loginCmd.username 

51 userDO: UserDO = await self.mapper.get_user_by_username(username=username) 

52 if not userDO or not await verify_password(loginCmd.password, userDO.password): 

53 raise SystemException( 

54 SystemResponseCode.AUTH_FAILED.code, SystemResponseCode.AUTH_FAILED.msg 

55 ) 

56 access_token_expires = timedelta(minutes=configs.access_token_expire_minutes) 

57 refresh_token_expires = timedelta(minutes=configs.refresh_token_expire_minutes) 

58 access_token = await security.create_token( 

59 subject=userDO.id, 

60 expires_delta=access_token_expires, 

61 token_type=TokenTypeEnum.access, 

62 ) 

63 refresh_token = await security.create_token( 

64 subject=userDO.id, 

65 expires_delta=refresh_token_expires, 

66 token_type=TokenTypeEnum.refresh, 

67 ) 

68 token = Token( 

69 access_token=access_token, 

70 expired_at=int(access_token_expires.total_seconds()), 

71 token_type="bearer", 

72 refresh_token=refresh_token, 

73 re_expired_at=int(refresh_token_expires.total_seconds()), 

74 ) 

75 cache_client: Cache = await get_cache_client() 

76 await cache_client.set( 

77 f"{SystemConstantCode.USER_KEY.msg}{userDO.id}", 

78 access_token, 

79 access_token_expires, 

80 ) 

81 return token 

82 

83 async def export_user_template( 

84 self, 

85 ) -> StreamingResponse: 

86 """ 

87 Export empty user import template 

88 """ 

89 excel_writer = None 

90 field_names = UserDO.__fields__ 

91 user_export_df = pd.DataFrame(columns=field_names) 

92 filename = f"user_template_{datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx" 

93 stream = io.BytesIO() 

94 try: 

95 excel_writer = pd.ExcelWriter(stream, engine="xlsxwriter") 

96 user_export_df.to_excel(excel_writer, index=False) 

97 excel_writer._save() 

98 stream.seek(0) 

99 return StreamingResponse( 

100 io.BytesIO(stream.getvalue()), 

101 media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", 

102 headers={"Content-Disposition": f"attachment; filename={filename}"}, 

103 ) 

104 except Exception as e: 

105 logger.error(f"{e}") 

106 finally: 

107 excel_writer.close() 

108 

109 async def import_user(self, file: UploadFile): 

110 """ 

111 Import user data 

112 """ 

113 contents = await file.read() 

114 import_df = pd.read_excel(io.BytesIO(contents)) 

115 user_datas = [user_data for user_data in import_df.to_dict(orient="records")] 

116 user_import_list = [] 

117 for user_data in user_datas: 

118 user_import = UserDO(**user_data) 

119 user_import.password = await get_password_hash(user_import.password) 

120 user_import_list.append(user_import) 

121 await file.close() 

122 await self.mapper.insert_batch(data_list=user_import_list) 

123 

124 async def export_user(self, params: Params) -> StreamingResponse: 

125 global excel_writer 

126 user_pages = await self.mapper.select_list_page_ordered(params=params) 

127 print(type(user_pages)) 

128 print(user_pages) 

129 user_items = user_pages.__dict__["items"] 

130 user_data = [] 

131 for user in user_items: 

132 user_data.append(UserQuery(**user.model_dump())) 

133 field_names = UserQuery.__fields__ 

134 user_export_df = pd.DataFrame(columns=field_names) 

135 user_dicts = [item.dict() for item in user_data] 

136 user_export_df = user_export_df._append(user_dicts, ignore_index=True) 

137 filename = f"user_data_{datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx" 

138 stream = io.BytesIO() 

139 try: 

140 excel_writer = pd.ExcelWriter(stream, engine="xlsxwriter") 

141 user_export_df.to_excel(excel_writer, index=False) 

142 excel_writer._save() 

143 stream.seek(0) 

144 return StreamingResponse( 

145 io.BytesIO(stream.getvalue()), 

146 media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", 

147 headers={"Content-Disposition": f"attachment; filename={filename}"}, 

148 ) 

149 except Exception as e: 

150 logger.error(f"{e}") 

151 finally: 

152 excel_writer.close() 

153 

154 

155def get_user_service() -> UserService: 

156 return UserServiceImpl(mapper=userMapper)