Coverage for src/extratools_cloud/aws/sqs.py: 0%
79 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-21 23:17 -0700
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-21 23:17 -0700
1import json
2from collections.abc import Iterable
3from typing import Any, Literal, cast, override
4from uuid import uuid4
6from boto3.resources.base import ServiceResource
7from extratools_core.crudl import CRUDLDict
8from extratools_core.json import JsonDict
9from extratools_core.str import encode
10from toolz.itertoolz import partition_all
12from ..common.router import BaseRouter
13from .helpers import ClientErrorHandler, get_service_resource
15default_service_resource: ServiceResource = get_service_resource("sqs")
17type Queue = Any
19FIFO_QUEUE_NAME_SUFFIX = ".fifo"
22def get_queue_json(queue: Queue) -> JsonDict:
23 return {
24 "url": queue.url,
25 "attributes": queue.attributes,
26 }
29def get_resource_dict(
30 *,
31 service_resource: ServiceResource | None = None,
32 queue_name_prefix: str | None = None,
33 json_only: bool = False,
34) -> CRUDLDict[str, Queue | JsonDict]:
35 service_resource = service_resource or default_service_resource
37 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/service-resource/index.html
39 def check_queue_name(queue_name: str) -> None:
40 if queue_name_prefix and not queue_name.startswith(queue_name_prefix):
41 raise ValueError
43 def create_func(queue_name: str | None, attributes: dict[str, str]) -> None:
44 if queue_name is None:
45 raise ValueError
47 check_queue_name(queue_name)
49 service_resource.create_queue(
50 QueueName=queue_name,
51 Attributes={
52 "FifoQueue": str(queue_name.endswith(FIFO_QUEUE_NAME_SUFFIX)).lower(),
53 **attributes,
54 },
55 )
57 @ClientErrorHandler(
58 "QueueDoesNotExist",
59 KeyError,
60 )
61 def read_func(queue_name: str) -> Queue:
62 check_queue_name(queue_name)
64 queue = service_resource.get_queue_by_name(
65 QueueName=queue_name,
66 )
67 if not json_only:
68 return queue
70 return get_queue_json(queue)
72 def update_func(queue_name: str, attributes: dict[str, str]) -> None:
73 check_queue_name(queue_name)
75 service_resource.get_queue_by_name(
76 QueueName=queue_name,
77 ).set_attributes(
78 Attributes={
79 **attributes,
80 },
81 )
83 def delete_func(queue_name: str) -> None:
84 check_queue_name(queue_name)
86 service_resource.get_queue_by_name(
87 QueueName=queue_name,
88 ).delete()
90 def list_func(_: None) -> Iterable[tuple[str, Queue]]:
91 for queue in (
92 service_resource.queues.filter(
93 QueueNamePrefix=queue_name_prefix,
94 )
95 if queue_name_prefix
96 else service_resource.queues.all()
97 ):
98 queue_name = cast("str", queue.url).rsplit('/', maxsplit=1)[-1]
99 yield queue_name, (
100 get_queue_json(queue) if json_only
101 else queue
102 )
104 return CRUDLDict[str, Queue](
105 create_func=create_func,
106 read_func=read_func,
107 update_func=update_func,
108 delete_func=delete_func,
109 list_func=list_func,
110 )
113MESSAGE_BATCH_SIZE = 10
116def send_messages(
117 queue: Queue,
118 messages: Iterable[JsonDict],
119 group: str | None = None,
120 *,
121 encoding: Literal["gzip", "zstd"] | None = None,
122) -> Iterable[JsonDict]:
123 batch_id = str(uuid4())
125 fifo: bool = queue.url.endswith(FIFO_QUEUE_NAME_SUFFIX)
126 if fifo and not group:
127 raise ValueError
129 for message_batch in partition_all(
130 MESSAGE_BATCH_SIZE,
131 (
132 (f"{batch_id}_{i}", message_data)
133 for i, message_data in enumerate(messages)
134 ),
135 ):
136 response: JsonDict = queue.send_messages(Entries=[
137 dict(
138 Id=message_id,
139 MessageBody=encode(
140 json.dumps(message_data),
141 encoding=encoding,
142 ),
143 ) | (
144 dict(
145 MessageAttributes={
146 "ContentEncoding": {
147 "StringValue": encoding,
148 "DataType": "String",
149 },
150 },
151 )
152 if encoding else {}
153 ) | (
154 dict(
155 MessageDeduplicationId=message_id,
156 MessageGroupId=group,
157 )
158 if fifo else {}
159 )
160 for message_id, message_data in message_batch
161 ])
163 yield from response.get("Successful", [])
164 yield from response.get("Failed", [])
167class FifoRouter(BaseRouter[str, str]):
168 """
169 Router utilizing FIFO queues and groups
170 - Each resource is queue base name (excluding specified prefix and `.fifo` suffix)
171 - Each target is group name
172 - Assuming each group name is unique across all queues in router
173 - Each resource is also a target
174 - Including existing ones
175 """
177 def __init__(
178 self,
179 *,
180 service_resource: ServiceResource | None = None,
181 queue_name_prefix: str,
182 default_target_resource: str,
183 encoding: Literal["gzip", "zstd"] | None = None,
184 ) -> None:
185 super().__init__(
186 default_target_resource=default_target_resource,
187 )
189 self.__resource_dict: CRUDLDict[str, Queue] = get_resource_dict(
190 service_resource=service_resource,
191 queue_name_prefix=queue_name_prefix,
192 )
194 default_queue_name = queue_name_prefix + default_target_resource + FIFO_QUEUE_NAME_SUFFIX
196 self.__queue_name_prefix = queue_name_prefix
198 queue_name_prefix_len = len(queue_name_prefix)
199 queue_name_suffix_len = len(FIFO_QUEUE_NAME_SUFFIX)
200 self.__queues: dict[str, Queue] = {
201 default_target_resource: self.__resource_dict[default_queue_name],
202 } | {
203 (queue_name[queue_name_prefix_len:])[:-queue_name_suffix_len]: queue
204 for queue_name, queue in self.__resource_dict.items()
205 }
206 for resource in self.__queues:
207 super().register_targets(resource, [resource])
209 self.__encoding: Literal["gzip", "zstd"] | None = encoding
211 @override
212 def register_targets(
213 self,
214 resource: str,
215 targets: Iterable[str],
216 *,
217 create: bool = True,
218 ) -> None:
219 super().register_targets(resource, targets)
220 super().register_targets(resource, [resource])
222 queue_name = self.__queue_name_prefix + resource + FIFO_QUEUE_NAME_SUFFIX
224 if queue_name not in self.__resource_dict:
225 if create:
226 self.__resource_dict[queue_name] = {}
227 else:
228 raise KeyError
230 self.__queues[resource] = self.__resource_dict[queue_name]
232 @override
233 def _route_to_resource(
234 self,
235 data: Iterable[JsonDict],
236 resource: str,
237 target: str,
238 ) -> Iterable[JsonDict]:
239 yield from send_messages(
240 self.__queues[resource],
241 data,
242 target,
243 encoding=self.__encoding,
244 )