Coverage for src/extratools_cloud/aws/helpers.py: 0%
17 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-21 09:29 -0700
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-21 09:29 -0700
1from collections.abc import Callable
2from typing import Any
4from botocore.exceptions import ClientError
7class ClientErrorHandler:
8 """
9 Based on https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
10 """
12 def __init__(
13 self,
14 error_code: str,
15 exception_class: type[Exception],
16 ) -> None:
17 self.__error_code = error_code
18 self.__exception_class = exception_class
20 def __call__(self, f: Callable[..., Any]) -> Any:
21 def wrapper(*args: Any, **kwargs: Any) -> Any:
22 try:
23 return f(*args, **kwargs)
24 except ClientError as e:
25 error = e.response["Error"]
26 # For SQS, somehow error code is found in
27 # both `Code` and `QueryErrorCode` with different values
28 # (`AWS.SimpleQueueService.NonExistentQueue` vs `QueueDoesNotExist`).
29 # The value in `QueryErrorCode` seems to match public API doc.
30 # https://github.com/aws/aws-sdk/issues/105
31 if error.get("QueryErrorCode", error["Code"]) == self.__error_code:
32 raise self.__exception_class from e
34 raise
36 return wrapper