Coverage for dynamodx / repository.py: 36%
97 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 16:52 -0300
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 16:52 -0300
1import json
2from base64 import urlsafe_b64decode, urlsafe_b64encode
3from typing import TYPE_CHECKING, Any, TypedDict
4from urllib.parse import quote, unquote
6from dynamodx.transact_get import TransactGet
8from .transact_writer import TransactWriter
9from .types import deserialize, serialize
11if TYPE_CHECKING:
12 from mypy_boto3_dynamodb.client import DynamoDBClient
13 from mypy_boto3_dynamodb.literals import (
14 ReturnValuesOnConditionCheckFailureType,
15 ReturnValueType,
16 SelectType,
17 )
18 from mypy_boto3_dynamodb.type_defs import (
19 AttributeValueTypeDef,
20 DeleteItemOutputTypeDef,
21 PutItemOutputTypeDef,
22 UpdateItemOutputTypeDef,
23 )
24else:
25 DynamoDBClient = Any
26 ReturnValueType = Any
27 AttributeValueTypeDef = Any
28 ReturnValuesOnConditionCheckFailureType = Any
29 DeleteItemOutputTypeDef = Any
30 PutItemOutputTypeDef = Any
31 UpdateItemOutputTypeDef = Any
34class QueryOutput(TypedDict):
35 items: list[dict[str, Any]]
36 count: int
37 last_key: str | None
40class Repository:
41 def __init__(
42 self,
43 table_name: str,
44 *,
45 client: DynamoDBClient,
46 ) -> None:
47 self._table_name = table_name
48 self._client = client
50 def query(
51 self,
52 key_cond_expr: str,
53 *,
54 select: SelectType | None = None,
55 expr_attr_names: dict | None = None,
56 expr_attr_values: dict | None = None,
57 exclusive_start_key: str | None = None,
58 filter_expr: str | None = None,
59 projection_expr: str | None = None,
60 limit: int | None = None,
61 scan_index_forward: bool = True,
62 table_name: str | None = None,
63 ) -> QueryOutput:
64 """You must provide the name of the partition key attribute
65 and a single value for that attribute.
67 Query returns all items with that partition key value.
68 Optionally, you can provide a sort key attribute and use a comparison operator
69 to refine the search results.
71 ...
73 A `Query` operation always returns a result set. If no matching items are found,
74 the result set will be empty.
75 Queries that do not return results consume the minimum number
76 of read capacity units for that type of read operation.
78 - https://docs.aws.amazon.com/boto3/latest/reference/services/dynamodb/client/query.html
79 """
80 attrs: dict = {
81 'TableName': table_name or self._table_name,
82 'KeyConditionExpression': key_cond_expr,
83 'ScanIndexForward': scan_index_forward,
84 }
86 if select:
87 attrs['Select'] = select
89 if limit:
90 attrs['Limit'] = limit
92 if expr_attr_names:
93 attrs['ExpressionAttributeNames'] = expr_attr_names
95 if expr_attr_values:
96 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
98 if exclusive_start_key:
99 attrs['ExclusiveStartKey'] = _startkey_b64decode(exclusive_start_key)
101 if filter_expr:
102 attrs['FilterExpression'] = filter_expr
104 if projection_expr:
105 attrs['ProjectionExpression'] = projection_expr
107 output = self._client.query(**attrs)
109 return {
110 'items': [deserialize(item) for item in output.get('Items', [])],
111 'count': output.get('Count', 0),
112 'last_key': _startkey_b64encode(output.get('LastEvaluatedKey', None)),
113 }
115 def get_item(
116 self,
117 key: dict,
118 *,
119 table_name: str | None = None,
120 expr_attr_names: dict | None = None,
121 projection_expr: str | None = None,
122 ) -> dict[str, Any]:
123 """The GetItem operation returns a set of attributes for the item
124 with the given primary key.
126 If there is no matching item, GetItem does not return any data and
127 there will be no Item element in the response.
129 - https://docs.aws.amazon.com/boto3/latest/reference/services/dynamodb/client/get_item.html
130 """
131 attrs = {
132 'TableName': table_name or self._table_name,
133 'Key': serialize(key),
134 }
136 if expr_attr_names:
137 attrs['ExpressionAttributeNames'] = expr_attr_names
139 if projection_expr:
140 attrs['ProjectionExpression'] = projection_expr
142 output = self._client.get_item(**attrs)
144 return deserialize(output.get('Item', {}))
146 def put_item(
147 self,
148 item: dict,
149 *,
150 cond_expr: str | None = None,
151 expr_attr_names: dict | None = None,
152 expr_attr_values: dict | None = None,
153 table_name: str | None = None,
154 return_values: ReturnValueType | None = None,
155 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None,
156 ) -> PutItemOutputTypeDef:
157 attrs = {
158 'TableName': table_name or self._table_name,
159 'Item': serialize(item),
160 }
162 if cond_expr:
163 attrs['ConditionExpression'] = cond_expr
165 if expr_attr_names:
166 attrs['ExpressionAttributeNames'] = expr_attr_names
168 if expr_attr_values:
169 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
171 if return_values:
172 attrs['ReturnValues'] = return_values
174 if return_on_cond_fail:
175 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail
177 return self._client.put_item(**attrs)
179 def update_item(
180 self,
181 key: dict,
182 *,
183 update_expr: str,
184 cond_expr: str | None = None,
185 expr_attr_names: dict | None = None,
186 expr_attr_values: dict | None = None,
187 table_name: str | None = None,
188 return_values: ReturnValueType | None = None,
189 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None,
190 ) -> UpdateItemOutputTypeDef:
191 attrs: dict = {
192 'TableName': table_name or self._table_name,
193 'Key': serialize(key),
194 'UpdateExpression': update_expr,
195 }
197 if cond_expr:
198 attrs['ConditionExpression'] = cond_expr
200 if expr_attr_names:
201 attrs['ExpressionAttributeNames'] = expr_attr_names
203 if expr_attr_values:
204 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
206 if return_values:
207 attrs['ReturnValues'] = return_values
209 if return_on_cond_fail:
210 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail
212 return self._client.update_item(**attrs)
214 def delete_item(
215 self,
216 key: dict,
217 *,
218 cond_expr: str | None = None,
219 expr_attr_names: dict | None = None,
220 expr_attr_values: dict | None = None,
221 table_name: str | None = None,
222 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None,
223 ) -> DeleteItemOutputTypeDef:
224 """Deletes a single item in a table by primary key. You can perform
225 a conditional delete operation that deletes the item if it exists,
226 or if it has an expected attribute value.
227 """
228 attrs: dict = {
229 'TableName': table_name or self._table_name,
230 'Key': serialize(key),
231 }
233 if cond_expr:
234 attrs['ConditionExpression'] = cond_expr
236 if expr_attr_names:
237 attrs['ExpressionAttributeNames'] = expr_attr_names
239 if expr_attr_values:
240 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
242 if return_on_cond_fail:
243 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail
245 return self._client.delete_item(**attrs)
247 def transact_writer(
248 self,
249 flush_amount: int = 50,
250 table_name: str | None = None,
251 ) -> TransactWriter:
252 return TransactWriter(
253 table_name=table_name or self._table_name,
254 client=self._client,
255 flush_amount=flush_amount,
256 )
258 def transact_get(
259 self,
260 table_name: str | None = None,
261 ) -> TransactGet:
262 return TransactGet(
263 table_name=table_name or self._table_name,
264 client=self._client,
265 )
268def _startkey_b64encode(obj: dict[str, AttributeValueTypeDef] | None) -> str | None:
269 if not obj:
270 return None
272 s = json.dumps(obj)
273 b = urlsafe_b64encode(s.encode('utf-8')).decode('utf-8')
274 return quote(b)
277def _startkey_b64decode(s: str) -> dict[str, AttributeValueTypeDef]:
278 b = unquote(s).encode('utf-8')
279 s = urlsafe_b64decode(b).decode('utf-8')
280 return json.loads(s)
283DynamoDBRepository = Repository