Coverage for src/extratools_cloud/aws/sqs.py: 0%

79 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-21 21:33 -0700

1import json 

2from collections.abc import Iterable 

3from typing import Any, Literal, cast, override 

4from uuid import uuid4 

5 

6from boto3.resources.base import ServiceResource 

7from extratools_core.crudl import CRUDLDict 

8from extratools_core.json import JsonDict 

9from extratools_core.str import encode 

10from toolz.itertoolz import partition_all 

11 

12from ..common.router import BaseRouter 

13from .helpers import ClientErrorHandler, get_service_resource 

14 

15default_service_resource: ServiceResource = get_service_resource("sqs") 

16 

17type Queue = Any 

18 

19FIFO_QUEUE_NAME_SUFFIX = ".fifo" 

20 

21 

22def get_queue_json(queue: Queue) -> JsonDict: 

23 return { 

24 "url": queue.url, 

25 "attributes": queue.attributes, 

26 } 

27 

28 

29def get_resource_dict( 

30 *, 

31 service_resource: ServiceResource | None = None, 

32 queue_name_prefix: str | None = None, 

33 json_only: bool = False, 

34) -> CRUDLDict[str, Queue | JsonDict]: 

35 service_resource = service_resource or default_service_resource 

36 

37 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/service-resource/index.html 

38 

39 def check_queue_name(queue_name: str) -> None: 

40 if queue_name_prefix and not queue_name.startswith(queue_name_prefix): 

41 raise ValueError 

42 

43 def create_func(queue_name: str | None, attributes: dict[str, str]) -> None: 

44 if queue_name is None: 

45 raise ValueError 

46 

47 check_queue_name(queue_name) 

48 

49 service_resource.create_queue( 

50 QueueName=queue_name, 

51 Attributes={ 

52 "FifoQueue": str(queue_name.endswith(FIFO_QUEUE_NAME_SUFFIX)).lower(), 

53 **attributes, 

54 }, 

55 ) 

56 

57 @ClientErrorHandler( 

58 "QueueDoesNotExist", 

59 KeyError, 

60 ) 

61 def read_func(queue_name: str) -> Queue: 

62 check_queue_name(queue_name) 

63 

64 queue = service_resource.get_queue_by_name( 

65 QueueName=queue_name, 

66 ) 

67 if not json_only: 

68 return queue 

69 

70 return get_queue_json(queue) 

71 

72 def update_func(queue_name: str, attributes: dict[str, str]) -> None: 

73 check_queue_name(queue_name) 

74 

75 service_resource.get_queue_by_name( 

76 QueueName=queue_name, 

77 ).set_attributes( 

78 Attributes={ 

79 **attributes, 

80 }, 

81 ) 

82 

83 def delete_func(queue_name: str) -> None: 

84 check_queue_name(queue_name) 

85 

86 service_resource.get_queue_by_name( 

87 QueueName=queue_name, 

88 ).delete() 

89 

90 def list_func(_: None) -> Iterable[tuple[str, Queue]]: 

91 for queue in ( 

92 service_resource.queues.filter( 

93 QueueNamePrefix=queue_name_prefix, 

94 ) 

95 if queue_name_prefix 

96 else service_resource.queues.all() 

97 ): 

98 queue_name = cast("str", queue.url).rsplit('/', maxsplit=1)[-1] 

99 yield queue_name, ( 

100 get_queue_json(queue) if json_only 

101 else queue 

102 ) 

103 

104 return CRUDLDict[str, Queue]( 

105 create_func=create_func, 

106 read_func=read_func, 

107 update_func=update_func, 

108 delete_func=delete_func, 

109 list_func=list_func, 

110 ) 

111 

112 

113MESSAGE_BATCH_SIZE = 10 

114 

115 

116def send_messages( 

117 queue: Queue, 

118 messages: Iterable[JsonDict], 

119 group: str | None = None, 

120 *, 

121 encoding: Literal["gzip", "zstd"] | None = None, 

122) -> Iterable[JsonDict]: 

123 batch_id = str(uuid4()) 

124 

125 fifo: bool = queue.url.endswith(FIFO_QUEUE_NAME_SUFFIX) 

126 if fifo and not group: 

127 raise ValueError 

128 

129 for message_batch in partition_all( 

130 MESSAGE_BATCH_SIZE, 

131 ( 

132 (f"{batch_id}_{i}", message_data) 

133 for i, message_data in enumerate(messages) 

134 ), 

135 ): 

136 response: JsonDict = queue.send_messages(Entries=[ 

137 dict( 

138 Id=message_id, 

139 MessageBody=encode( 

140 json.dumps(message_data), 

141 encoding=encoding, 

142 ), 

143 ) | ( 

144 dict( 

145 MessageAttributes={ 

146 "ContentEncoding": { 

147 "StringValue": encoding, 

148 "DataType": "String", 

149 }, 

150 }, 

151 ) 

152 if encoding else {} 

153 ) | ( 

154 dict( 

155 MessageDeduplicationId=message_id, 

156 MessageGroupId=group, 

157 ) 

158 if fifo else {} 

159 ) 

160 for message_id, message_data in message_batch 

161 ]) 

162 

163 yield from response.get("Successful", []) 

164 yield from response.get("Failed", []) 

165 

166 

167class FifoRouter(BaseRouter[str, str]): 

168 """ 

169 Router utilizing FIFO queues and groups 

170 - Each resource is queue base name (excluding specified prefix and `.fifo` suffix) 

171 - Each target is group name 

172 - Assuming each group name is unique across all queues in router 

173 - Each resource is also a target 

174 - Including existing ones 

175 """ 

176 

177 def __init__( 

178 self, 

179 *, 

180 service_resource: ServiceResource | None = None, 

181 queue_name_prefix: str, 

182 default_target_resource: str, 

183 encoding: Literal["gzip", "zstd"] | None = None, 

184 ) -> None: 

185 super().__init__( 

186 default_target_resource=default_target_resource, 

187 ) 

188 

189 self.__resource_dict: CRUDLDict[str, Queue] = get_resource_dict( 

190 service_resource=service_resource, 

191 queue_name_prefix=queue_name_prefix, 

192 ) 

193 

194 default_queue_name = queue_name_prefix + default_target_resource + FIFO_QUEUE_NAME_SUFFIX 

195 

196 self.__queue_name_prefix = queue_name_prefix 

197 

198 queue_name_prefix_len = len(queue_name_prefix) 

199 queue_name_suffix_len = len(FIFO_QUEUE_NAME_SUFFIX) 

200 self.__queues: dict[str, Queue] = { 

201 default_target_resource: self.__resource_dict[default_queue_name], 

202 } | { 

203 (queue_name[queue_name_prefix_len:])[:-queue_name_suffix_len]: queue 

204 for queue_name, queue in self.__resource_dict.items() 

205 } 

206 for resource in self.__queues: 

207 super().register_targets(resource, [resource]) 

208 

209 self.__encoding: Literal["gzip", "zstd"] | None = encoding 

210 

211 @override 

212 def register_targets( 

213 self, 

214 resource: str, 

215 targets: Iterable[str], 

216 *, 

217 create: bool = True, 

218 ) -> None: 

219 super().register_targets(resource, targets) 

220 super().register_targets(resource, [resource]) 

221 

222 queue_name = self.__queue_name_prefix + resource + FIFO_QUEUE_NAME_SUFFIX 

223 

224 if queue_name not in self.__resource_dict: 

225 if create: 

226 self.__resource_dict[queue_name] = {} 

227 else: 

228 raise KeyError 

229 

230 self.__queues[resource] = self.__resource_dict[queue_name] 

231 

232 @override 

233 def _route_to_resource( 

234 self, 

235 data: Iterable[JsonDict], 

236 resource: str, 

237 target: str, 

238 ) -> Iterable[JsonDict]: 

239 yield from send_messages( 

240 self.__queues[resource], 

241 data, 

242 target, 

243 encoding=self.__encoding, 

244 )