Coverage for src/extratools_cloud/aws/lambda_.py: 0%

35 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-20 20:51 -0700

1import json 

2import logging 

3from typing import Any 

4 

5import boto3 

6from botocore.client import BaseClient 

7from botocore.config import Config 

8from extratools_core.func import Intercept 

9from extratools_core.json import JsonDict 

10 

11logger = logging.getLogger(__name__) 

12 

13# Lambda can run at most 15 minutes 

14MAX_FUNCTION_DURATION: int = 60 * 15 

15 

16default_client: BaseClient = boto3.client( 

17 "lambda", 

18 # To prevent client timeout during long invocation 

19 config=Config( 

20 connect_timeout=MAX_FUNCTION_DURATION, 

21 read_timeout=MAX_FUNCTION_DURATION, 

22 ), 

23) 

24 

25 

26class InvocationError(RuntimeError): 

27 def __init__(self, message: str, error: JsonDict) -> None: 

28 self.message = message 

29 self.error = error 

30 

31 

32def invoke( 

33 function_name: str, 

34 payload: Any, 

35 *, 

36 client: BaseClient | None = None, 

37 wait: bool = True, 

38) -> Any: 

39 logger.info( 

40 f"Invoking Lambda function {function_name}" 

41 f" with payload:\n{json.dumps(payload)}", 

42 ) 

43 

44 response: JsonDict = (client or default_client).invoke( 

45 FunctionName=function_name, 

46 Payload=json.dumps(payload).encode(), 

47 InvocationType=( 

48 "RequestResponse" if wait 

49 else "Event" 

50 ), 

51 ) 

52 if not wait: 

53 return None 

54 

55 response_payload: Any = json.load(response["Payload"]) 

56 if func_error := response.get("FunctionError"): 

57 logger.error( 

58 f"Error during invocation of Lambda function {function_name}" 

59 f":\n{func_error}", 

60 ) 

61 raise InvocationError(func_error, response_payload) 

62 

63 return response_payload 

64 

65 

66class RequestResponseFunction(Intercept[JsonDict]): 

67 def __init__( 

68 self, 

69 function_name: str, 

70 *, 

71 client: BaseClient | None = None, 

72 ) -> None: 

73 def replacement(args: JsonDict) -> JsonDict: 

74 return invoke(function_name, args, client=client, wait=True) 

75 

76 super().__init__(replacement) 

77 

78 

79class EventFunction(Intercept[None]): 

80 def __init__( 

81 self, 

82 function_name: str, 

83 *, 

84 client: BaseClient | None = None, 

85 ) -> None: 

86 def replacement(args: JsonDict) -> None: 

87 invoke(function_name, args, client=client, wait=False) 

88 

89 super().__init__(replacement)