Coverage for dynamodx / repository.py: 52%

116 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-23 16:15 -0300

1import json 

2from base64 import urlsafe_b64decode, urlsafe_b64encode 

3from typing import TYPE_CHECKING, Any, Type, TypedDict, TypeVar 

4from urllib.parse import quote, unquote 

5 

6from dynamodx.keys import PrimaryKey 

7 

8from .transact_get import TransactGet, project_item 

9from .transact_writer import TransactWriter 

10from .types import deserialize, serialize, to_dict 

11 

12if TYPE_CHECKING: 

13 from mypy_boto3_dynamodb.client import DynamoDBClient 

14 from mypy_boto3_dynamodb.literals import ( 

15 ReturnValuesOnConditionCheckFailureType, 

16 ReturnValueType, 

17 SelectType, 

18 ) 

19 from mypy_boto3_dynamodb.type_defs import ( 

20 AttributeValueTypeDef, 

21 DeleteItemOutputTypeDef, 

22 PutItemOutputTypeDef, 

23 UpdateItemOutputTypeDef, 

24 ) 

25else: 

26 DynamoDBClient = Any 

27 ReturnValuesOnConditionCheckFailureType = Any 

28 ReturnValueType = Any 

29 SelectType = Any 

30 AttributeValueTypeDef = Any 

31 DeleteItemOutputTypeDef = Any 

32 PutItemOutputTypeDef = Any 

33 UpdateItemOutputTypeDef = Any 

34 

35 

36class QueryOutput(TypedDict): 

37 items: list[dict[str, Any]] 

38 count: int 

39 last_key: str | None 

40 

41 

42class Repository: 

43 def __init__( 

44 self, 

45 table_name: str, 

46 *, 

47 client: DynamoDBClient, 

48 ) -> None: 

49 self._table_name = table_name 

50 self._client = client 

51 

52 def query( 

53 self, 

54 key_cond_expr: str, 

55 *, 

56 select: SelectType | None = None, 

57 expr_attr_names: dict | None = None, 

58 expr_attr_values: dict | None = None, 

59 exclusive_start_key: str | None = None, 

60 filter_expr: str | None = None, 

61 projection_expr: str | None = None, 

62 limit: int | None = None, 

63 scan_index_forward: bool = True, 

64 table_name: str | None = None, 

65 ) -> QueryOutput: 

66 """You must provide the name of the partition key attribute 

67 and a single value for that attribute. 

68 

69 Query returns all items with that partition key value. 

70 Optionally, you can provide a sort key attribute and use a comparison operator 

71 to refine the search results. 

72 

73 ... 

74 

75 A `Query` operation always returns a result set. If no matching items are found, 

76 the result set will be empty. 

77 Queries that do not return results consume the minimum number 

78 of read capacity units for that type of read operation. 

79 

80 - https://docs.aws.amazon.com/boto3/latest/reference/services/dynamodb/client/query.html 

81 """ 

82 attrs: dict = { 

83 'TableName': table_name or self._table_name, 

84 'KeyConditionExpression': key_cond_expr, 

85 'ScanIndexForward': scan_index_forward, 

86 } 

87 

88 if select: 

89 attrs['Select'] = select 

90 

91 if limit: 

92 attrs['Limit'] = limit 

93 

94 if expr_attr_names: 

95 attrs['ExpressionAttributeNames'] = expr_attr_names 

96 

97 if expr_attr_values: 

98 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) 

99 

100 if exclusive_start_key: 

101 attrs['ExclusiveStartKey'] = _startkey_b64decode(exclusive_start_key) 

102 

103 if filter_expr: 

104 attrs['FilterExpression'] = filter_expr 

105 

106 if projection_expr: 

107 attrs['ProjectionExpression'] = projection_expr 

108 

109 output = self._client.query(**attrs) 

110 

111 return { 

112 'items': [deserialize(item) for item in output.get('Items', [])], 

113 'count': output.get('Count', 0), 

114 'last_key': _startkey_b64encode(output.get('LastEvaluatedKey', None)), 

115 } 

116 

117 def get_item( 

118 self, 

119 key: dict[str, str] | PrimaryKey, 

120 *, 

121 table_name: str | None = None, 

122 expr_attr_names: dict | None = None, 

123 projection_expr: str | None = None, 

124 raise_on_error: bool = True, 

125 exc_cls: Type[Exception] = Exception, 

126 default: Any = None, 

127 ) -> dict[str, Any]: 

128 """Get an item with the given primary key. 

129 

130 Parameters 

131 ---------- 

132 key : dict[str, str] | PrimaryKey 

133 Primary key of the item to be retrieved. 

134 table_name : str | None, optional 

135 Uses default table if not provided. 

136 expr_attr_names : dict | None, optional 

137 Expression attribute name mappings. 

138 projection_expr : str | None, optional 

139 Attributes to return. Returns full item if None. 

140 raise_on_error : bool, optional 

141 If True, raises ``exc_cls`` when item is not found. 

142 exc_cls : Type[Exception], optional 

143 Exception class to be used if the item is not found. 

144 default : Any, optional 

145 Default value returned if the item is not found. 

146 

147 Returns 

148 ------- 

149 dict[str, Any] 

150 Data of the retrieved item or the default value if not found. 

151 

152 Raises 

153 ------ 

154 Exception 

155 If item is not found and ``raise_on_error`` is True. 

156 """ 

157 attrs = { 

158 'TableName': table_name or self._table_name, 

159 'Key': serialize(key), 

160 } 

161 

162 if expr_attr_names: 

163 attrs['ExpressionAttributeNames'] = expr_attr_names 

164 

165 if projection_expr: 

166 attrs['ProjectionExpression'] = projection_expr 

167 

168 output = self._client.get_item(**attrs) 

169 item = deserialize(output.get('Item', {})) 

170 

171 if raise_on_error and not item: 

172 raise exc_cls(f'Item not found ({key!r})') 

173 

174 if isinstance(key, PrimaryKey): 

175 return project_item(key, item) 

176 

177 return item or default 

178 

179 def put_item( 

180 self, 

181 item: dict, 

182 *, 

183 cond_expr: str | None = None, 

184 expr_attr_names: dict | None = None, 

185 expr_attr_values: dict | None = None, 

186 table_name: str | None = None, 

187 return_values: ReturnValueType | None = None, 

188 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None, 

189 ) -> PutItemOutputTypeDef: 

190 is_dynamodb_mapped = getattr(item.__class__, '_is_dynamodb_mapped', False) 

191 serialized = serialize(to_dict(item) if is_dynamodb_mapped else item) # type: ignore 

192 attrs = { 

193 'TableName': table_name or self._table_name, 

194 'Item': serialized, 

195 } 

196 

197 if cond_expr: 

198 attrs['ConditionExpression'] = cond_expr 

199 

200 if expr_attr_names: 

201 attrs['ExpressionAttributeNames'] = expr_attr_names 

202 

203 if expr_attr_values: 

204 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) 

205 

206 if return_values: 

207 attrs['ReturnValues'] = return_values 

208 

209 if return_on_cond_fail: 

210 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail 

211 

212 return self._client.put_item(**attrs) 

213 

214 def update_item( 

215 self, 

216 key: dict, 

217 *, 

218 update_expr: str, 

219 cond_expr: str | None = None, 

220 expr_attr_names: dict | None = None, 

221 expr_attr_values: dict | None = None, 

222 table_name: str | None = None, 

223 return_values: ReturnValueType | None = None, 

224 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None, 

225 ) -> UpdateItemOutputTypeDef: 

226 attrs: dict = { 

227 'TableName': table_name or self._table_name, 

228 'Key': serialize(key), 

229 'UpdateExpression': update_expr, 

230 } 

231 

232 if cond_expr: 

233 attrs['ConditionExpression'] = cond_expr 

234 

235 if expr_attr_names: 

236 attrs['ExpressionAttributeNames'] = expr_attr_names 

237 

238 if expr_attr_values: 

239 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) 

240 

241 if return_values: 

242 attrs['ReturnValues'] = return_values 

243 

244 if return_on_cond_fail: 

245 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail 

246 

247 return self._client.update_item(**attrs) 

248 

249 def delete_item( 

250 self, 

251 key: dict, 

252 *, 

253 cond_expr: str | None = None, 

254 expr_attr_names: dict | None = None, 

255 expr_attr_values: dict | None = None, 

256 table_name: str | None = None, 

257 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None, 

258 ) -> DeleteItemOutputTypeDef: 

259 """Deletes a single item in a table by primary key. You can perform 

260 a conditional delete operation that deletes the item if it exists, 

261 or if it has an expected attribute value. 

262 """ 

263 attrs: dict = { 

264 'TableName': table_name or self._table_name, 

265 'Key': serialize(key), 

266 } 

267 

268 if cond_expr: 

269 attrs['ConditionExpression'] = cond_expr 

270 

271 if expr_attr_names: 

272 attrs['ExpressionAttributeNames'] = expr_attr_names 

273 

274 if expr_attr_values: 

275 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) 

276 

277 if return_on_cond_fail: 

278 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail 

279 

280 return self._client.delete_item(**attrs) 

281 

282 def transact_writer( 

283 self, 

284 flush_amount: int = 50, 

285 table_name: str | None = None, 

286 ) -> TransactWriter: 

287 return TransactWriter( 

288 table_name=table_name or self._table_name, 

289 client=self._client, 

290 flush_amount=flush_amount, 

291 ) 

292 

293 def transact_get( 

294 self, 

295 table_name: str | None = None, 

296 ) -> TransactGet: 

297 return TransactGet( 

298 table_name=table_name or self._table_name, 

299 client=self._client, 

300 ) 

301 

302 

303def _startkey_b64encode(obj: dict[str, AttributeValueTypeDef] | None) -> str | None: 

304 if not obj: 

305 return None 

306 

307 s = json.dumps(obj) 

308 b = urlsafe_b64encode(s.encode('utf-8')).decode('utf-8') 

309 return quote(b) 

310 

311 

312def _startkey_b64decode(s: str) -> dict[str, AttributeValueTypeDef]: 

313 b = unquote(s).encode('utf-8') 

314 s = urlsafe_b64decode(b).decode('utf-8') 

315 return json.loads(s) 

316 

317 

318DynamoDBRepository = Repository 

319 

320T = TypeVar('T') 

321 

322 

323def dynamodb_mapping( 

324 table: str, 

325 partition_key: str, 

326 sort_key: str | None = None, 

327) -> Any: 

328 def decorator(cls: type[T]) -> type[T]: 

329 cls._dynamodb_table = table # type: ignore 

330 cls._dynamodb_partition_key = partition_key # type: ignore 

331 cls._dynampdb_sort_key = sort_key # type: ignore 

332 cls._dynampdb_sort_key = sort_key # type: ignore 

333 cls._is_dynamodb_mapped = True # type: ignore 

334 

335 return cls 

336 

337 return decorator