Coverage for src/extratools_cloud/aws/cloud_control.py: 0%

18 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-21 21:33 -0700

1import json 

2from collections.abc import Iterable 

3 

4from botocore.client import BaseClient 

5from extratools_core.crudl import CRUDLDict 

6from extratools_core.json import JsonDict 

7 

8from .helpers import ClientErrorHandler, get_client 

9 

10default_client: BaseClient = get_client("cloudcontrol") 

11 

12 

13def get_resource_dict( 

14 resource_type: str, 

15 *, 

16 client: BaseClient | None = None, 

17) -> CRUDLDict[str, JsonDict]: 

18 client = client or default_client 

19 

20 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudcontrol.html 

21 

22 @ClientErrorHandler( 

23 "ResourceNotFoundException", 

24 KeyError, 

25 ) 

26 def read_func(identifier: str) -> JsonDict: 

27 return json.loads(client.get_resource( 

28 TypeName=resource_type, 

29 Identifier=identifier, 

30 )["ResourceDescription"]["Properties"]) 

31 

32 # Resource model may be treated as a special filter. 

33 # It must be specified for certain resource types, 

34 # yet must be unspecified for certain other resource types. 

35 # https://docs.aws.amazon.com/cloudcontrolapi/latest/userguide/resource-operations-list.html#resource-operations-list-containers 

36 def list_func(resource_model: JsonDict | None) -> Iterable[tuple[str, JsonDict]]: 

37 paginator = client.get_paginator("list_resources") 

38 for page in paginator.paginate( 

39 TypeName=resource_type, 

40 **({} if resource_model is None else dict( 

41 ResourceModel=json.dumps(resource_model), 

42 )), 

43 ): 

44 for resource in page.get("ResourceDescriptions", []): 

45 yield (resource["Identifier"], json.loads(resource["Properties"])) 

46 

47 return CRUDLDict[str, JsonDict]( 

48 create_func=lambda _, desired_state: client.create_resource( 

49 TypeName=resource_type, 

50 DesiredState=json.dumps(desired_state), 

51 ), 

52 read_func=read_func, 

53 update_func=lambda identifier, patch: client.update_resource( 

54 TypeName=resource_type, 

55 Identifier=identifier, 

56 PatchDocument=json.dumps(patch), 

57 ), 

58 delete_func=lambda identifier: client.delete_resource( 

59 TypeName=resource_type, 

60 Identifier=identifier, 

61 ), 

62 list_func=list_func, 

63 )