Coverage for dynamodx / repository.py: 36%

97 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-18 16:52 -0300

1import json 

2from base64 import urlsafe_b64decode, urlsafe_b64encode 

3from typing import TYPE_CHECKING, Any, TypedDict 

4from urllib.parse import quote, unquote 

5 

6from dynamodx.transact_get import TransactGet 

7 

8from .transact_writer import TransactWriter 

9from .types import deserialize, serialize 

10 

11if TYPE_CHECKING: 

12 from mypy_boto3_dynamodb.client import DynamoDBClient 

13 from mypy_boto3_dynamodb.literals import ( 

14 ReturnValuesOnConditionCheckFailureType, 

15 ReturnValueType, 

16 SelectType, 

17 ) 

18 from mypy_boto3_dynamodb.type_defs import ( 

19 AttributeValueTypeDef, 

20 DeleteItemOutputTypeDef, 

21 PutItemOutputTypeDef, 

22 UpdateItemOutputTypeDef, 

23 ) 

24else: 

25 DynamoDBClient = Any 

26 ReturnValueType = Any 

27 AttributeValueTypeDef = Any 

28 ReturnValuesOnConditionCheckFailureType = Any 

29 DeleteItemOutputTypeDef = Any 

30 PutItemOutputTypeDef = Any 

31 UpdateItemOutputTypeDef = Any 

32 

33 

34class QueryOutput(TypedDict): 

35 items: list[dict[str, Any]] 

36 count: int 

37 last_key: str | None 

38 

39 

40class Repository: 

41 def __init__( 

42 self, 

43 table_name: str, 

44 *, 

45 client: DynamoDBClient, 

46 ) -> None: 

47 self._table_name = table_name 

48 self._client = client 

49 

50 def query( 

51 self, 

52 key_cond_expr: str, 

53 *, 

54 select: SelectType | None = None, 

55 expr_attr_names: dict | None = None, 

56 expr_attr_values: dict | None = None, 

57 exclusive_start_key: str | None = None, 

58 filter_expr: str | None = None, 

59 projection_expr: str | None = None, 

60 limit: int | None = None, 

61 scan_index_forward: bool = True, 

62 table_name: str | None = None, 

63 ) -> QueryOutput: 

64 """You must provide the name of the partition key attribute 

65 and a single value for that attribute. 

66 

67 Query returns all items with that partition key value. 

68 Optionally, you can provide a sort key attribute and use a comparison operator 

69 to refine the search results. 

70 

71 ... 

72 

73 A `Query` operation always returns a result set. If no matching items are found, 

74 the result set will be empty. 

75 Queries that do not return results consume the minimum number 

76 of read capacity units for that type of read operation. 

77 

78 - https://docs.aws.amazon.com/boto3/latest/reference/services/dynamodb/client/query.html 

79 """ 

80 attrs: dict = { 

81 'TableName': table_name or self._table_name, 

82 'KeyConditionExpression': key_cond_expr, 

83 'ScanIndexForward': scan_index_forward, 

84 } 

85 

86 if select: 

87 attrs['Select'] = select 

88 

89 if limit: 

90 attrs['Limit'] = limit 

91 

92 if expr_attr_names: 

93 attrs['ExpressionAttributeNames'] = expr_attr_names 

94 

95 if expr_attr_values: 

96 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) 

97 

98 if exclusive_start_key: 

99 attrs['ExclusiveStartKey'] = _startkey_b64decode(exclusive_start_key) 

100 

101 if filter_expr: 

102 attrs['FilterExpression'] = filter_expr 

103 

104 if projection_expr: 

105 attrs['ProjectionExpression'] = projection_expr 

106 

107 output = self._client.query(**attrs) 

108 

109 return { 

110 'items': [deserialize(item) for item in output.get('Items', [])], 

111 'count': output.get('Count', 0), 

112 'last_key': _startkey_b64encode(output.get('LastEvaluatedKey', None)), 

113 } 

114 

115 def get_item( 

116 self, 

117 key: dict, 

118 *, 

119 table_name: str | None = None, 

120 expr_attr_names: dict | None = None, 

121 projection_expr: str | None = None, 

122 ) -> dict[str, Any]: 

123 """The GetItem operation returns a set of attributes for the item 

124 with the given primary key. 

125 

126 If there is no matching item, GetItem does not return any data and 

127 there will be no Item element in the response. 

128 

129 - https://docs.aws.amazon.com/boto3/latest/reference/services/dynamodb/client/get_item.html 

130 """ 

131 attrs = { 

132 'TableName': table_name or self._table_name, 

133 'Key': serialize(key), 

134 } 

135 

136 if expr_attr_names: 

137 attrs['ExpressionAttributeNames'] = expr_attr_names 

138 

139 if projection_expr: 

140 attrs['ProjectionExpression'] = projection_expr 

141 

142 output = self._client.get_item(**attrs) 

143 

144 return deserialize(output.get('Item', {})) 

145 

146 def put_item( 

147 self, 

148 item: dict, 

149 *, 

150 cond_expr: str | None = None, 

151 expr_attr_names: dict | None = None, 

152 expr_attr_values: dict | None = None, 

153 table_name: str | None = None, 

154 return_values: ReturnValueType | None = None, 

155 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None, 

156 ) -> PutItemOutputTypeDef: 

157 attrs = { 

158 'TableName': table_name or self._table_name, 

159 'Item': serialize(item), 

160 } 

161 

162 if cond_expr: 

163 attrs['ConditionExpression'] = cond_expr 

164 

165 if expr_attr_names: 

166 attrs['ExpressionAttributeNames'] = expr_attr_names 

167 

168 if expr_attr_values: 

169 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) 

170 

171 if return_values: 

172 attrs['ReturnValues'] = return_values 

173 

174 if return_on_cond_fail: 

175 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail 

176 

177 return self._client.put_item(**attrs) 

178 

179 def update_item( 

180 self, 

181 key: dict, 

182 *, 

183 update_expr: str, 

184 cond_expr: str | None = None, 

185 expr_attr_names: dict | None = None, 

186 expr_attr_values: dict | None = None, 

187 table_name: str | None = None, 

188 return_values: ReturnValueType | None = None, 

189 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None, 

190 ) -> UpdateItemOutputTypeDef: 

191 attrs: dict = { 

192 'TableName': table_name or self._table_name, 

193 'Key': serialize(key), 

194 'UpdateExpression': update_expr, 

195 } 

196 

197 if cond_expr: 

198 attrs['ConditionExpression'] = cond_expr 

199 

200 if expr_attr_names: 

201 attrs['ExpressionAttributeNames'] = expr_attr_names 

202 

203 if expr_attr_values: 

204 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) 

205 

206 if return_values: 

207 attrs['ReturnValues'] = return_values 

208 

209 if return_on_cond_fail: 

210 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail 

211 

212 return self._client.update_item(**attrs) 

213 

214 def delete_item( 

215 self, 

216 key: dict, 

217 *, 

218 cond_expr: str | None = None, 

219 expr_attr_names: dict | None = None, 

220 expr_attr_values: dict | None = None, 

221 table_name: str | None = None, 

222 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None, 

223 ) -> DeleteItemOutputTypeDef: 

224 """Deletes a single item in a table by primary key. You can perform 

225 a conditional delete operation that deletes the item if it exists, 

226 or if it has an expected attribute value. 

227 """ 

228 attrs: dict = { 

229 'TableName': table_name or self._table_name, 

230 'Key': serialize(key), 

231 } 

232 

233 if cond_expr: 

234 attrs['ConditionExpression'] = cond_expr 

235 

236 if expr_attr_names: 

237 attrs['ExpressionAttributeNames'] = expr_attr_names 

238 

239 if expr_attr_values: 

240 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) 

241 

242 if return_on_cond_fail: 

243 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail 

244 

245 return self._client.delete_item(**attrs) 

246 

247 def transact_writer( 

248 self, 

249 flush_amount: int = 50, 

250 table_name: str | None = None, 

251 ) -> TransactWriter: 

252 return TransactWriter( 

253 table_name=table_name or self._table_name, 

254 client=self._client, 

255 flush_amount=flush_amount, 

256 ) 

257 

258 def transact_get( 

259 self, 

260 table_name: str | None = None, 

261 ) -> TransactGet: 

262 return TransactGet( 

263 table_name=table_name or self._table_name, 

264 client=self._client, 

265 ) 

266 

267 

268def _startkey_b64encode(obj: dict[str, AttributeValueTypeDef] | None) -> str | None: 

269 if not obj: 

270 return None 

271 

272 s = json.dumps(obj) 

273 b = urlsafe_b64encode(s.encode('utf-8')).decode('utf-8') 

274 return quote(b) 

275 

276 

277def _startkey_b64decode(s: str) -> dict[str, AttributeValueTypeDef]: 

278 b = unquote(s).encode('utf-8') 

279 s = urlsafe_b64decode(b).decode('utf-8') 

280 return json.loads(s) 

281 

282 

283DynamoDBRepository = Repository