Coverage for dynamodx / repository.py: 39%
103 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 17:37 -0300
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-18 17:37 -0300
1import json
2from base64 import urlsafe_b64decode, urlsafe_b64encode
3from typing import TYPE_CHECKING, Any, Type, TypedDict
4from urllib.parse import quote, unquote
6from dynamodx.keys import PrimaryKey
8from .transact_get import TransactGet, project_item
9from .transact_writer import TransactWriter
10from .types import deserialize, serialize
12if TYPE_CHECKING:
13 from mypy_boto3_dynamodb.client import DynamoDBClient
14 from mypy_boto3_dynamodb.literals import (
15 ReturnValuesOnConditionCheckFailureType,
16 ReturnValueType,
17 SelectType,
18 )
19 from mypy_boto3_dynamodb.type_defs import (
20 AttributeValueTypeDef,
21 DeleteItemOutputTypeDef,
22 PutItemOutputTypeDef,
23 UpdateItemOutputTypeDef,
24 )
25else:
26 DynamoDBClient = Any
27 ReturnValueType = Any
28 AttributeValueTypeDef = Any
29 ReturnValuesOnConditionCheckFailureType = Any
30 DeleteItemOutputTypeDef = Any
31 PutItemOutputTypeDef = Any
32 UpdateItemOutputTypeDef = Any
35class QueryOutput(TypedDict):
36 items: list[dict[str, Any]]
37 count: int
38 last_key: str | None
41class Repository:
42 def __init__(
43 self,
44 table_name: str,
45 *,
46 client: DynamoDBClient,
47 ) -> None:
48 self._table_name = table_name
49 self._client = client
51 def query(
52 self,
53 key_cond_expr: str,
54 *,
55 select: SelectType | None = None,
56 expr_attr_names: dict | None = None,
57 expr_attr_values: dict | None = None,
58 exclusive_start_key: str | None = None,
59 filter_expr: str | None = None,
60 projection_expr: str | None = None,
61 limit: int | None = None,
62 scan_index_forward: bool = True,
63 table_name: str | None = None,
64 ) -> QueryOutput:
65 """You must provide the name of the partition key attribute
66 and a single value for that attribute.
68 Query returns all items with that partition key value.
69 Optionally, you can provide a sort key attribute and use a comparison operator
70 to refine the search results.
72 ...
74 A `Query` operation always returns a result set. If no matching items are found,
75 the result set will be empty.
76 Queries that do not return results consume the minimum number
77 of read capacity units for that type of read operation.
79 - https://docs.aws.amazon.com/boto3/latest/reference/services/dynamodb/client/query.html
80 """
81 attrs: dict = {
82 'TableName': table_name or self._table_name,
83 'KeyConditionExpression': key_cond_expr,
84 'ScanIndexForward': scan_index_forward,
85 }
87 if select:
88 attrs['Select'] = select
90 if limit:
91 attrs['Limit'] = limit
93 if expr_attr_names:
94 attrs['ExpressionAttributeNames'] = expr_attr_names
96 if expr_attr_values:
97 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
99 if exclusive_start_key:
100 attrs['ExclusiveStartKey'] = _startkey_b64decode(exclusive_start_key)
102 if filter_expr:
103 attrs['FilterExpression'] = filter_expr
105 if projection_expr:
106 attrs['ProjectionExpression'] = projection_expr
108 output = self._client.query(**attrs)
110 return {
111 'items': [deserialize(item) for item in output.get('Items', [])],
112 'count': output.get('Count', 0),
113 'last_key': _startkey_b64encode(output.get('LastEvaluatedKey', None)),
114 }
116 def get_item(
117 self,
118 key: dict[str, str] | PrimaryKey,
119 *,
120 table_name: str | None = None,
121 expr_attr_names: dict | None = None,
122 projection_expr: str | None = None,
123 raise_on_error: bool = True,
124 exc_cls: Type[Exception] = Exception,
125 default: Any = None,
126 ) -> dict[str, Any]:
127 """Get an item with the given primary key.
129 Parameters
130 ----------
131 key : dict[str, str] | PrimaryKey
132 Primary key of the item to be retrieved.
133 table_name : str | None, optional
134 Uses default table if not provided.
135 expr_attr_names : dict | None, optional
136 Expression attribute name mappings.
137 projection_expr : str | None, optional
138 Attributes to return. Returns full item if None.
139 raise_on_error : bool, optional
140 If True, raises ``exc_cls`` when item is not found.
141 exc_cls : Type[Exception], optional
142 Exception class to be used if the item is not found.
143 default : Any, optional
144 Default value returned if the item is not found.
146 Returns
147 -------
148 dict[str, Any]
149 Data of the retrieved item or the default value if not found.
151 Raises
152 ------
153 Exception
154 If item is not found and ``raise_on_error`` is True.
155 """
156 attrs = {
157 'TableName': table_name or self._table_name,
158 'Key': serialize(key),
159 }
161 if expr_attr_names:
162 attrs['ExpressionAttributeNames'] = expr_attr_names
164 if projection_expr:
165 attrs['ProjectionExpression'] = projection_expr
167 output = self._client.get_item(**attrs)
168 item = deserialize(output.get('Item', {}))
170 if raise_on_error and not item:
171 raise exc_cls(f'Item not found ({key!r})')
173 if isinstance(key, PrimaryKey):
174 return project_item(key, item)
176 return item or default
178 def put_item(
179 self,
180 item: dict,
181 *,
182 cond_expr: str | None = None,
183 expr_attr_names: dict | None = None,
184 expr_attr_values: dict | None = None,
185 table_name: str | None = None,
186 return_values: ReturnValueType | None = None,
187 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None,
188 ) -> PutItemOutputTypeDef:
189 attrs = {
190 'TableName': table_name or self._table_name,
191 'Item': serialize(item),
192 }
194 if cond_expr:
195 attrs['ConditionExpression'] = cond_expr
197 if expr_attr_names:
198 attrs['ExpressionAttributeNames'] = expr_attr_names
200 if expr_attr_values:
201 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
203 if return_values:
204 attrs['ReturnValues'] = return_values
206 if return_on_cond_fail:
207 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail
209 return self._client.put_item(**attrs)
211 def update_item(
212 self,
213 key: dict,
214 *,
215 update_expr: str,
216 cond_expr: str | None = None,
217 expr_attr_names: dict | None = None,
218 expr_attr_values: dict | None = None,
219 table_name: str | None = None,
220 return_values: ReturnValueType | None = None,
221 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None,
222 ) -> UpdateItemOutputTypeDef:
223 attrs: dict = {
224 'TableName': table_name or self._table_name,
225 'Key': serialize(key),
226 'UpdateExpression': update_expr,
227 }
229 if cond_expr:
230 attrs['ConditionExpression'] = cond_expr
232 if expr_attr_names:
233 attrs['ExpressionAttributeNames'] = expr_attr_names
235 if expr_attr_values:
236 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
238 if return_values:
239 attrs['ReturnValues'] = return_values
241 if return_on_cond_fail:
242 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail
244 return self._client.update_item(**attrs)
246 def delete_item(
247 self,
248 key: dict,
249 *,
250 cond_expr: str | None = None,
251 expr_attr_names: dict | None = None,
252 expr_attr_values: dict | None = None,
253 table_name: str | None = None,
254 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None,
255 ) -> DeleteItemOutputTypeDef:
256 """Deletes a single item in a table by primary key. You can perform
257 a conditional delete operation that deletes the item if it exists,
258 or if it has an expected attribute value.
259 """
260 attrs: dict = {
261 'TableName': table_name or self._table_name,
262 'Key': serialize(key),
263 }
265 if cond_expr:
266 attrs['ConditionExpression'] = cond_expr
268 if expr_attr_names:
269 attrs['ExpressionAttributeNames'] = expr_attr_names
271 if expr_attr_values:
272 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
274 if return_on_cond_fail:
275 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail
277 return self._client.delete_item(**attrs)
279 def transact_writer(
280 self,
281 flush_amount: int = 50,
282 table_name: str | None = None,
283 ) -> TransactWriter:
284 return TransactWriter(
285 table_name=table_name or self._table_name,
286 client=self._client,
287 flush_amount=flush_amount,
288 )
290 def transact_get(
291 self,
292 table_name: str | None = None,
293 ) -> TransactGet:
294 return TransactGet(
295 table_name=table_name or self._table_name,
296 client=self._client,
297 )
300def _startkey_b64encode(obj: dict[str, AttributeValueTypeDef] | None) -> str | None:
301 if not obj:
302 return None
304 s = json.dumps(obj)
305 b = urlsafe_b64encode(s.encode('utf-8')).decode('utf-8')
306 return quote(b)
309def _startkey_b64decode(s: str) -> dict[str, AttributeValueTypeDef]:
310 b = unquote(s).encode('utf-8')
311 s = urlsafe_b64decode(b).decode('utf-8')
312 return json.loads(s)
315DynamoDBRepository = Repository