Coverage for src/extratools_cloud/aws/s3.py: 0%

42 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-19 05:39 -0700

1from collections.abc import Iterable 

2from os import getenv 

3from typing import Any, cast 

4 

5import boto3 

6from boto3.resources.base import ServiceResource 

7from extratools_core.crudl import CRUDLDict 

8from extratools_core.json import JsonDict 

9 

10from .helpers import ClientErrorHandler 

11 

12STAGE: str = getenv("STAGE", "local") 

13 

14 

15default_service_resource: ServiceResource = boto3.resource( 

16 "s3", 

17 endpoint_url=( 

18 # Setting endpoint URL will automatically use path-style request 

19 # https://docs.localstack.cloud/user-guide/aws/s3/#path-style-and-virtual-hosted-style-requests 

20 "http://localhost:4566" if STAGE == "local" 

21 else None 

22 ), 

23) 

24 

25type Queue = Any 

26 

27FIFO_QUEUE_NAME_SUFFIX = ".fifo" 

28 

29 

30def get_queue_json(queue: Queue) -> JsonDict: 

31 return { 

32 "url": queue.url, 

33 "attributes": queue.attributes, 

34 } 

35 

36 

37def get_resource_dict( 

38 *, 

39 service_resource: ServiceResource | None = None, 

40 queue_name_prefix: str | None = None, 

41 json_only: bool = False, 

42) -> CRUDLDict[str, Queue | JsonDict]: 

43 service_resource = service_resource or default_service_resource 

44 

45 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/service-resource/index.html 

46 

47 def check_queue_name(queue_name: str) -> None: 

48 if queue_name_prefix and not queue_name.startswith(queue_name_prefix): 

49 raise ValueError 

50 

51 def create_func(queue_name: str | None, attributes: dict[str, str]) -> None: 

52 if queue_name is None: 

53 raise ValueError 

54 

55 check_queue_name(queue_name) 

56 

57 service_resource.create_queue( 

58 QueueName=queue_name, 

59 Attributes={ 

60 "FifoQueue": str(queue_name.endswith(FIFO_QUEUE_NAME_SUFFIX)).lower(), 

61 **attributes, 

62 }, 

63 ) 

64 

65 @ClientErrorHandler( 

66 "QueueDoesNotExist", 

67 KeyError, 

68 ) 

69 def read_func(queue_name: str) -> Queue: 

70 check_queue_name(queue_name) 

71 

72 queue = service_resource.get_queue_by_name( 

73 QueueName=queue_name, 

74 ) 

75 if not json_only: 

76 return queue 

77 

78 return get_queue_json(queue) 

79 

80 def update_func(queue_name: str, attributes: dict[str, str]) -> None: 

81 check_queue_name(queue_name) 

82 

83 service_resource.get_queue_by_name( 

84 QueueName=queue_name, 

85 ).set_attributes( 

86 Attributes={ 

87 **attributes, 

88 }, 

89 ) 

90 

91 def delete_func(queue_name: str) -> None: 

92 check_queue_name(queue_name) 

93 

94 service_resource.get_queue_by_name( 

95 QueueName=queue_name, 

96 ).delete() 

97 

98 def list_func(_: None) -> Iterable[tuple[str, Queue]]: 

99 for queue in ( 

100 service_resource.queues.filter( 

101 QueueNamePrefix=queue_name_prefix, 

102 ) 

103 if queue_name_prefix 

104 else service_resource.queues.all() 

105 ): 

106 queue_name = cast("str", queue.url).rsplit('/', maxsplit=1)[-1] 

107 yield queue_name, ( 

108 get_queue_json(queue) if json_only 

109 else queue 

110 ) 

111 

112 return CRUDLDict[str, Queue]( 

113 create_func=create_func, 

114 read_func=read_func, 

115 update_func=update_func, 

116 delete_func=delete_func, 

117 list_func=list_func, 

118 )