Coverage for src/extratools_cloud/aws/lambda_.py: 0%

35 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-21 09:29 -0700

1import json 

2import logging 

3from typing import Any 

4 

5import boto3 

6from botocore.client import BaseClient 

7from botocore.config import Config 

8from extratools_core.func import Intercept 

9from extratools_core.json import JsonDict 

10 

11logger = logging.getLogger(__name__) 

12 

13# Lambda can run at most 15 minutes 

14MAX_FUNCTION_DURATION: int = 60 * 15 

15 

16default_client: BaseClient = boto3.client( 

17 "lambda", 

18 # To prevent client timeout during long invocation 

19 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/invoke.html 

20 config=Config( 

21 connect_timeout=MAX_FUNCTION_DURATION, 

22 read_timeout=MAX_FUNCTION_DURATION, 

23 ), 

24) 

25 

26 

27class InvocationError(RuntimeError): 

28 def __init__(self, message: str, error: JsonDict) -> None: 

29 self.message = message 

30 self.error = error 

31 

32 

33def invoke( 

34 function_name: str, 

35 payload: Any, 

36 *, 

37 client: BaseClient | None = None, 

38 wait: bool = True, 

39) -> Any: 

40 logger.info( 

41 f"Invoking Lambda function {function_name}" 

42 f" with payload:\n{json.dumps(payload)}", 

43 ) 

44 

45 response: JsonDict = (client or default_client).invoke( 

46 FunctionName=function_name, 

47 Payload=json.dumps(payload).encode(), 

48 InvocationType=( 

49 "RequestResponse" if wait 

50 else "Event" 

51 ), 

52 ) 

53 if not wait: 

54 return None 

55 

56 response_payload: Any = json.load(response["Payload"]) 

57 if func_error := response.get("FunctionError"): 

58 logger.error( 

59 f"Error during invocation of Lambda function {function_name}" 

60 f":\n{func_error}", 

61 ) 

62 raise InvocationError(func_error, response_payload) 

63 

64 return response_payload 

65 

66 

67class RequestResponseFunction(Intercept[JsonDict]): 

68 def __init__( 

69 self, 

70 function_name: str, 

71 *, 

72 client: BaseClient | None = None, 

73 ) -> None: 

74 def replacement(args: JsonDict) -> JsonDict: 

75 return invoke(function_name, args, client=client, wait=True) 

76 

77 super().__init__(replacement) 

78 

79 

80class EventFunction(Intercept[None]): 

81 def __init__( 

82 self, 

83 function_name: str, 

84 *, 

85 client: BaseClient | None = None, 

86 ) -> None: 

87 def replacement(args: JsonDict) -> None: 

88 invoke(function_name, args, client=client, wait=False) 

89 

90 super().__init__(replacement)