Coverage for dynamodx / repository.py: 39%

103 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-18 17:37 -0300

1import json 

2from base64 import urlsafe_b64decode, urlsafe_b64encode 

3from typing import TYPE_CHECKING, Any, Type, TypedDict 

4from urllib.parse import quote, unquote 

5 

6from dynamodx.keys import PrimaryKey 

7 

8from .transact_get import TransactGet, project_item 

9from .transact_writer import TransactWriter 

10from .types import deserialize, serialize 

11 

12if TYPE_CHECKING: 

13 from mypy_boto3_dynamodb.client import DynamoDBClient 

14 from mypy_boto3_dynamodb.literals import ( 

15 ReturnValuesOnConditionCheckFailureType, 

16 ReturnValueType, 

17 SelectType, 

18 ) 

19 from mypy_boto3_dynamodb.type_defs import ( 

20 AttributeValueTypeDef, 

21 DeleteItemOutputTypeDef, 

22 PutItemOutputTypeDef, 

23 UpdateItemOutputTypeDef, 

24 ) 

25else: 

26 DynamoDBClient = Any 

27 ReturnValueType = Any 

28 AttributeValueTypeDef = Any 

29 ReturnValuesOnConditionCheckFailureType = Any 

30 DeleteItemOutputTypeDef = Any 

31 PutItemOutputTypeDef = Any 

32 UpdateItemOutputTypeDef = Any 

33 

34 

35class QueryOutput(TypedDict): 

36 items: list[dict[str, Any]] 

37 count: int 

38 last_key: str | None 

39 

40 

41class Repository: 

42 def __init__( 

43 self, 

44 table_name: str, 

45 *, 

46 client: DynamoDBClient, 

47 ) -> None: 

48 self._table_name = table_name 

49 self._client = client 

50 

51 def query( 

52 self, 

53 key_cond_expr: str, 

54 *, 

55 select: SelectType | None = None, 

56 expr_attr_names: dict | None = None, 

57 expr_attr_values: dict | None = None, 

58 exclusive_start_key: str | None = None, 

59 filter_expr: str | None = None, 

60 projection_expr: str | None = None, 

61 limit: int | None = None, 

62 scan_index_forward: bool = True, 

63 table_name: str | None = None, 

64 ) -> QueryOutput: 

65 """You must provide the name of the partition key attribute 

66 and a single value for that attribute. 

67 

68 Query returns all items with that partition key value. 

69 Optionally, you can provide a sort key attribute and use a comparison operator 

70 to refine the search results. 

71 

72 ... 

73 

74 A `Query` operation always returns a result set. If no matching items are found, 

75 the result set will be empty. 

76 Queries that do not return results consume the minimum number 

77 of read capacity units for that type of read operation. 

78 

79 - https://docs.aws.amazon.com/boto3/latest/reference/services/dynamodb/client/query.html 

80 """ 

81 attrs: dict = { 

82 'TableName': table_name or self._table_name, 

83 'KeyConditionExpression': key_cond_expr, 

84 'ScanIndexForward': scan_index_forward, 

85 } 

86 

87 if select: 

88 attrs['Select'] = select 

89 

90 if limit: 

91 attrs['Limit'] = limit 

92 

93 if expr_attr_names: 

94 attrs['ExpressionAttributeNames'] = expr_attr_names 

95 

96 if expr_attr_values: 

97 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) 

98 

99 if exclusive_start_key: 

100 attrs['ExclusiveStartKey'] = _startkey_b64decode(exclusive_start_key) 

101 

102 if filter_expr: 

103 attrs['FilterExpression'] = filter_expr 

104 

105 if projection_expr: 

106 attrs['ProjectionExpression'] = projection_expr 

107 

108 output = self._client.query(**attrs) 

109 

110 return { 

111 'items': [deserialize(item) for item in output.get('Items', [])], 

112 'count': output.get('Count', 0), 

113 'last_key': _startkey_b64encode(output.get('LastEvaluatedKey', None)), 

114 } 

115 

116 def get_item( 

117 self, 

118 key: dict[str, str] | PrimaryKey, 

119 *, 

120 table_name: str | None = None, 

121 expr_attr_names: dict | None = None, 

122 projection_expr: str | None = None, 

123 raise_on_error: bool = True, 

124 exc_cls: Type[Exception] = Exception, 

125 default: Any = None, 

126 ) -> dict[str, Any]: 

127 """Get an item with the given primary key. 

128 

129 Parameters 

130 ---------- 

131 key : dict[str, str] | PrimaryKey 

132 Primary key of the item to be retrieved. 

133 table_name : str | None, optional 

134 Uses default table if not provided. 

135 expr_attr_names : dict | None, optional 

136 Expression attribute name mappings. 

137 projection_expr : str | None, optional 

138 Attributes to return. Returns full item if None. 

139 raise_on_error : bool, optional 

140 If True, raises ``exc_cls`` when item is not found. 

141 exc_cls : Type[Exception], optional 

142 Exception class to be used if the item is not found. 

143 default : Any, optional 

144 Default value returned if the item is not found. 

145 

146 Returns 

147 ------- 

148 dict[str, Any] 

149 Data of the retrieved item or the default value if not found. 

150 

151 Raises 

152 ------ 

153 Exception 

154 If item is not found and ``raise_on_error`` is True. 

155 """ 

156 attrs = { 

157 'TableName': table_name or self._table_name, 

158 'Key': serialize(key), 

159 } 

160 

161 if expr_attr_names: 

162 attrs['ExpressionAttributeNames'] = expr_attr_names 

163 

164 if projection_expr: 

165 attrs['ProjectionExpression'] = projection_expr 

166 

167 output = self._client.get_item(**attrs) 

168 item = deserialize(output.get('Item', {})) 

169 

170 if raise_on_error and not item: 

171 raise exc_cls(f'Item not found ({key!r})') 

172 

173 if isinstance(key, PrimaryKey): 

174 return project_item(key, item) 

175 

176 return item or default 

177 

178 def put_item( 

179 self, 

180 item: dict, 

181 *, 

182 cond_expr: str | None = None, 

183 expr_attr_names: dict | None = None, 

184 expr_attr_values: dict | None = None, 

185 table_name: str | None = None, 

186 return_values: ReturnValueType | None = None, 

187 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None, 

188 ) -> PutItemOutputTypeDef: 

189 attrs = { 

190 'TableName': table_name or self._table_name, 

191 'Item': serialize(item), 

192 } 

193 

194 if cond_expr: 

195 attrs['ConditionExpression'] = cond_expr 

196 

197 if expr_attr_names: 

198 attrs['ExpressionAttributeNames'] = expr_attr_names 

199 

200 if expr_attr_values: 

201 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) 

202 

203 if return_values: 

204 attrs['ReturnValues'] = return_values 

205 

206 if return_on_cond_fail: 

207 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail 

208 

209 return self._client.put_item(**attrs) 

210 

211 def update_item( 

212 self, 

213 key: dict, 

214 *, 

215 update_expr: str, 

216 cond_expr: str | None = None, 

217 expr_attr_names: dict | None = None, 

218 expr_attr_values: dict | None = None, 

219 table_name: str | None = None, 

220 return_values: ReturnValueType | None = None, 

221 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None, 

222 ) -> UpdateItemOutputTypeDef: 

223 attrs: dict = { 

224 'TableName': table_name or self._table_name, 

225 'Key': serialize(key), 

226 'UpdateExpression': update_expr, 

227 } 

228 

229 if cond_expr: 

230 attrs['ConditionExpression'] = cond_expr 

231 

232 if expr_attr_names: 

233 attrs['ExpressionAttributeNames'] = expr_attr_names 

234 

235 if expr_attr_values: 

236 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) 

237 

238 if return_values: 

239 attrs['ReturnValues'] = return_values 

240 

241 if return_on_cond_fail: 

242 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail 

243 

244 return self._client.update_item(**attrs) 

245 

246 def delete_item( 

247 self, 

248 key: dict, 

249 *, 

250 cond_expr: str | None = None, 

251 expr_attr_names: dict | None = None, 

252 expr_attr_values: dict | None = None, 

253 table_name: str | None = None, 

254 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None, 

255 ) -> DeleteItemOutputTypeDef: 

256 """Deletes a single item in a table by primary key. You can perform 

257 a conditional delete operation that deletes the item if it exists, 

258 or if it has an expected attribute value. 

259 """ 

260 attrs: dict = { 

261 'TableName': table_name or self._table_name, 

262 'Key': serialize(key), 

263 } 

264 

265 if cond_expr: 

266 attrs['ConditionExpression'] = cond_expr 

267 

268 if expr_attr_names: 

269 attrs['ExpressionAttributeNames'] = expr_attr_names 

270 

271 if expr_attr_values: 

272 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) 

273 

274 if return_on_cond_fail: 

275 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail 

276 

277 return self._client.delete_item(**attrs) 

278 

279 def transact_writer( 

280 self, 

281 flush_amount: int = 50, 

282 table_name: str | None = None, 

283 ) -> TransactWriter: 

284 return TransactWriter( 

285 table_name=table_name or self._table_name, 

286 client=self._client, 

287 flush_amount=flush_amount, 

288 ) 

289 

290 def transact_get( 

291 self, 

292 table_name: str | None = None, 

293 ) -> TransactGet: 

294 return TransactGet( 

295 table_name=table_name or self._table_name, 

296 client=self._client, 

297 ) 

298 

299 

300def _startkey_b64encode(obj: dict[str, AttributeValueTypeDef] | None) -> str | None: 

301 if not obj: 

302 return None 

303 

304 s = json.dumps(obj) 

305 b = urlsafe_b64encode(s.encode('utf-8')).decode('utf-8') 

306 return quote(b) 

307 

308 

309def _startkey_b64decode(s: str) -> dict[str, AttributeValueTypeDef]: 

310 b = unquote(s).encode('utf-8') 

311 s = urlsafe_b64decode(b).decode('utf-8') 

312 return json.loads(s) 

313 

314 

315DynamoDBRepository = Repository