Coverage for src/extratools_cloud/aws/s3.py: 0%
42 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-19 05:39 -0700
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-19 05:39 -0700
1from collections.abc import Iterable
2from os import getenv
3from typing import Any, cast
5import boto3
6from boto3.resources.base import ServiceResource
7from extratools_core.crudl import CRUDLDict
8from extratools_core.json import JsonDict
10from .helpers import ClientErrorHandler
12STAGE: str = getenv("STAGE", "local")
15default_service_resource: ServiceResource = boto3.resource(
16 "s3",
17 endpoint_url=(
18 # Setting endpoint URL will automatically use path-style request
19 # https://docs.localstack.cloud/user-guide/aws/s3/#path-style-and-virtual-hosted-style-requests
20 "http://localhost:4566" if STAGE == "local"
21 else None
22 ),
23)
25type Queue = Any
27FIFO_QUEUE_NAME_SUFFIX = ".fifo"
30def get_queue_json(queue: Queue) -> JsonDict:
31 return {
32 "url": queue.url,
33 "attributes": queue.attributes,
34 }
37def get_resource_dict(
38 *,
39 service_resource: ServiceResource | None = None,
40 queue_name_prefix: str | None = None,
41 json_only: bool = False,
42) -> CRUDLDict[str, Queue | JsonDict]:
43 service_resource = service_resource or default_service_resource
45 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/service-resource/index.html
47 def check_queue_name(queue_name: str) -> None:
48 if queue_name_prefix and not queue_name.startswith(queue_name_prefix):
49 raise ValueError
51 def create_func(queue_name: str | None, attributes: dict[str, str]) -> None:
52 if queue_name is None:
53 raise ValueError
55 check_queue_name(queue_name)
57 service_resource.create_queue(
58 QueueName=queue_name,
59 Attributes={
60 "FifoQueue": str(queue_name.endswith(FIFO_QUEUE_NAME_SUFFIX)).lower(),
61 **attributes,
62 },
63 )
65 @ClientErrorHandler(
66 "QueueDoesNotExist",
67 KeyError,
68 )
69 def read_func(queue_name: str) -> Queue:
70 check_queue_name(queue_name)
72 queue = service_resource.get_queue_by_name(
73 QueueName=queue_name,
74 )
75 if not json_only:
76 return queue
78 return get_queue_json(queue)
80 def update_func(queue_name: str, attributes: dict[str, str]) -> None:
81 check_queue_name(queue_name)
83 service_resource.get_queue_by_name(
84 QueueName=queue_name,
85 ).set_attributes(
86 Attributes={
87 **attributes,
88 },
89 )
91 def delete_func(queue_name: str) -> None:
92 check_queue_name(queue_name)
94 service_resource.get_queue_by_name(
95 QueueName=queue_name,
96 ).delete()
98 def list_func(_: None) -> Iterable[tuple[str, Queue]]:
99 for queue in (
100 service_resource.queues.filter(
101 QueueNamePrefix=queue_name_prefix,
102 )
103 if queue_name_prefix
104 else service_resource.queues.all()
105 ):
106 queue_name = cast("str", queue.url).rsplit('/', maxsplit=1)[-1]
107 yield queue_name, (
108 get_queue_json(queue) if json_only
109 else queue
110 )
112 return CRUDLDict[str, Queue](
113 create_func=create_func,
114 read_func=read_func,
115 update_func=update_func,
116 delete_func=delete_func,
117 list_func=list_func,
118 )