Coverage for dynamodx / repository.py: 52%
116 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-23 16:15 -0300
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-23 16:15 -0300
1import json
2from base64 import urlsafe_b64decode, urlsafe_b64encode
3from typing import TYPE_CHECKING, Any, Type, TypedDict, TypeVar
4from urllib.parse import quote, unquote
6from dynamodx.keys import PrimaryKey
8from .transact_get import TransactGet, project_item
9from .transact_writer import TransactWriter
10from .types import deserialize, serialize, to_dict
12if TYPE_CHECKING:
13 from mypy_boto3_dynamodb.client import DynamoDBClient
14 from mypy_boto3_dynamodb.literals import (
15 ReturnValuesOnConditionCheckFailureType,
16 ReturnValueType,
17 SelectType,
18 )
19 from mypy_boto3_dynamodb.type_defs import (
20 AttributeValueTypeDef,
21 DeleteItemOutputTypeDef,
22 PutItemOutputTypeDef,
23 UpdateItemOutputTypeDef,
24 )
25else:
26 DynamoDBClient = Any
27 ReturnValuesOnConditionCheckFailureType = Any
28 ReturnValueType = Any
29 SelectType = Any
30 AttributeValueTypeDef = Any
31 DeleteItemOutputTypeDef = Any
32 PutItemOutputTypeDef = Any
33 UpdateItemOutputTypeDef = Any
36class QueryOutput(TypedDict):
37 items: list[dict[str, Any]]
38 count: int
39 last_key: str | None
42class Repository:
43 def __init__(
44 self,
45 table_name: str,
46 *,
47 client: DynamoDBClient,
48 ) -> None:
49 self._table_name = table_name
50 self._client = client
52 def query(
53 self,
54 key_cond_expr: str,
55 *,
56 select: SelectType | None = None,
57 expr_attr_names: dict | None = None,
58 expr_attr_values: dict | None = None,
59 exclusive_start_key: str | None = None,
60 filter_expr: str | None = None,
61 projection_expr: str | None = None,
62 limit: int | None = None,
63 scan_index_forward: bool = True,
64 table_name: str | None = None,
65 ) -> QueryOutput:
66 """You must provide the name of the partition key attribute
67 and a single value for that attribute.
69 Query returns all items with that partition key value.
70 Optionally, you can provide a sort key attribute and use a comparison operator
71 to refine the search results.
73 ...
75 A `Query` operation always returns a result set. If no matching items are found,
76 the result set will be empty.
77 Queries that do not return results consume the minimum number
78 of read capacity units for that type of read operation.
80 - https://docs.aws.amazon.com/boto3/latest/reference/services/dynamodb/client/query.html
81 """
82 attrs: dict = {
83 'TableName': table_name or self._table_name,
84 'KeyConditionExpression': key_cond_expr,
85 'ScanIndexForward': scan_index_forward,
86 }
88 if select:
89 attrs['Select'] = select
91 if limit:
92 attrs['Limit'] = limit
94 if expr_attr_names:
95 attrs['ExpressionAttributeNames'] = expr_attr_names
97 if expr_attr_values:
98 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
100 if exclusive_start_key:
101 attrs['ExclusiveStartKey'] = _startkey_b64decode(exclusive_start_key)
103 if filter_expr:
104 attrs['FilterExpression'] = filter_expr
106 if projection_expr:
107 attrs['ProjectionExpression'] = projection_expr
109 output = self._client.query(**attrs)
111 return {
112 'items': [deserialize(item) for item in output.get('Items', [])],
113 'count': output.get('Count', 0),
114 'last_key': _startkey_b64encode(output.get('LastEvaluatedKey', None)),
115 }
117 def get_item(
118 self,
119 key: dict[str, str] | PrimaryKey,
120 *,
121 table_name: str | None = None,
122 expr_attr_names: dict | None = None,
123 projection_expr: str | None = None,
124 raise_on_error: bool = True,
125 exc_cls: Type[Exception] = Exception,
126 default: Any = None,
127 ) -> dict[str, Any]:
128 """Get an item with the given primary key.
130 Parameters
131 ----------
132 key : dict[str, str] | PrimaryKey
133 Primary key of the item to be retrieved.
134 table_name : str | None, optional
135 Uses default table if not provided.
136 expr_attr_names : dict | None, optional
137 Expression attribute name mappings.
138 projection_expr : str | None, optional
139 Attributes to return. Returns full item if None.
140 raise_on_error : bool, optional
141 If True, raises ``exc_cls`` when item is not found.
142 exc_cls : Type[Exception], optional
143 Exception class to be used if the item is not found.
144 default : Any, optional
145 Default value returned if the item is not found.
147 Returns
148 -------
149 dict[str, Any]
150 Data of the retrieved item or the default value if not found.
152 Raises
153 ------
154 Exception
155 If item is not found and ``raise_on_error`` is True.
156 """
157 attrs = {
158 'TableName': table_name or self._table_name,
159 'Key': serialize(key),
160 }
162 if expr_attr_names:
163 attrs['ExpressionAttributeNames'] = expr_attr_names
165 if projection_expr:
166 attrs['ProjectionExpression'] = projection_expr
168 output = self._client.get_item(**attrs)
169 item = deserialize(output.get('Item', {}))
171 if raise_on_error and not item:
172 raise exc_cls(f'Item not found ({key!r})')
174 if isinstance(key, PrimaryKey):
175 return project_item(key, item)
177 return item or default
179 def put_item(
180 self,
181 item: dict,
182 *,
183 cond_expr: str | None = None,
184 expr_attr_names: dict | None = None,
185 expr_attr_values: dict | None = None,
186 table_name: str | None = None,
187 return_values: ReturnValueType | None = None,
188 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None,
189 ) -> PutItemOutputTypeDef:
190 is_dynamodb_mapped = getattr(item.__class__, '_is_dynamodb_mapped', False)
191 serialized = serialize(to_dict(item) if is_dynamodb_mapped else item) # type: ignore
192 attrs = {
193 'TableName': table_name or self._table_name,
194 'Item': serialized,
195 }
197 if cond_expr:
198 attrs['ConditionExpression'] = cond_expr
200 if expr_attr_names:
201 attrs['ExpressionAttributeNames'] = expr_attr_names
203 if expr_attr_values:
204 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
206 if return_values:
207 attrs['ReturnValues'] = return_values
209 if return_on_cond_fail:
210 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail
212 return self._client.put_item(**attrs)
214 def update_item(
215 self,
216 key: dict,
217 *,
218 update_expr: str,
219 cond_expr: str | None = None,
220 expr_attr_names: dict | None = None,
221 expr_attr_values: dict | None = None,
222 table_name: str | None = None,
223 return_values: ReturnValueType | None = None,
224 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None,
225 ) -> UpdateItemOutputTypeDef:
226 attrs: dict = {
227 'TableName': table_name or self._table_name,
228 'Key': serialize(key),
229 'UpdateExpression': update_expr,
230 }
232 if cond_expr:
233 attrs['ConditionExpression'] = cond_expr
235 if expr_attr_names:
236 attrs['ExpressionAttributeNames'] = expr_attr_names
238 if expr_attr_values:
239 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
241 if return_values:
242 attrs['ReturnValues'] = return_values
244 if return_on_cond_fail:
245 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail
247 return self._client.update_item(**attrs)
249 def delete_item(
250 self,
251 key: dict,
252 *,
253 cond_expr: str | None = None,
254 expr_attr_names: dict | None = None,
255 expr_attr_values: dict | None = None,
256 table_name: str | None = None,
257 return_on_cond_fail: ReturnValuesOnConditionCheckFailureType | None = None,
258 ) -> DeleteItemOutputTypeDef:
259 """Deletes a single item in a table by primary key. You can perform
260 a conditional delete operation that deletes the item if it exists,
261 or if it has an expected attribute value.
262 """
263 attrs: dict = {
264 'TableName': table_name or self._table_name,
265 'Key': serialize(key),
266 }
268 if cond_expr:
269 attrs['ConditionExpression'] = cond_expr
271 if expr_attr_names:
272 attrs['ExpressionAttributeNames'] = expr_attr_names
274 if expr_attr_values:
275 attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
277 if return_on_cond_fail:
278 attrs['ReturnValuesOnConditionCheckFailure'] = return_on_cond_fail
280 return self._client.delete_item(**attrs)
282 def transact_writer(
283 self,
284 flush_amount: int = 50,
285 table_name: str | None = None,
286 ) -> TransactWriter:
287 return TransactWriter(
288 table_name=table_name or self._table_name,
289 client=self._client,
290 flush_amount=flush_amount,
291 )
293 def transact_get(
294 self,
295 table_name: str | None = None,
296 ) -> TransactGet:
297 return TransactGet(
298 table_name=table_name or self._table_name,
299 client=self._client,
300 )
303def _startkey_b64encode(obj: dict[str, AttributeValueTypeDef] | None) -> str | None:
304 if not obj:
305 return None
307 s = json.dumps(obj)
308 b = urlsafe_b64encode(s.encode('utf-8')).decode('utf-8')
309 return quote(b)
312def _startkey_b64decode(s: str) -> dict[str, AttributeValueTypeDef]:
313 b = unquote(s).encode('utf-8')
314 s = urlsafe_b64decode(b).decode('utf-8')
315 return json.loads(s)
318DynamoDBRepository = Repository
320T = TypeVar('T')
323def dynamodb_mapping(
324 table: str,
325 partition_key: str,
326 sort_key: str | None = None,
327) -> Any:
328 def decorator(cls: type[T]) -> type[T]:
329 cls._dynamodb_table = table # type: ignore
330 cls._dynamodb_partition_key = partition_key # type: ignore
331 cls._dynampdb_sort_key = sort_key # type: ignore
332 cls._dynampdb_sort_key = sort_key # type: ignore
333 cls._is_dynamodb_mapped = True # type: ignore
335 return cls
337 return decorator