Coverage for src/extratools_cloud/aws/lambda_.py: 0%

35 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-21 20:05 -0700

1import json 

2import logging 

3from typing import Any 

4 

5from botocore.client import BaseClient 

6from botocore.config import Config 

7from extratools_core.func import Intercept 

8from extratools_core.json import JsonDict 

9 

10from .helpers import get_client 

11 

12logger = logging.getLogger(__name__) 

13 

14# Lambda can run at most 15 minutes 

15MAX_FUNCTION_DURATION: int = 60 * 15 

16 

17default_client: BaseClient = get_client( 

18 "lambda", 

19 # To prevent client timeout during long invocation 

20 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/invoke.html 

21 config=Config( 

22 connect_timeout=MAX_FUNCTION_DURATION, 

23 read_timeout=MAX_FUNCTION_DURATION, 

24 ), 

25) 

26 

27 

28class InvocationError(RuntimeError): 

29 def __init__(self, message: str, error: JsonDict) -> None: 

30 self.message = message 

31 self.error = error 

32 

33 

34def invoke( 

35 function_name: str, 

36 payload: Any, 

37 *, 

38 client: BaseClient | None = None, 

39 wait: bool = True, 

40) -> Any: 

41 logger.info( 

42 f"Invoking Lambda function {function_name}" 

43 f" with payload:\n{json.dumps(payload)}", 

44 ) 

45 

46 response: JsonDict = (client or default_client).invoke( 

47 FunctionName=function_name, 

48 Payload=json.dumps(payload).encode(), 

49 InvocationType=( 

50 "RequestResponse" if wait 

51 else "Event" 

52 ), 

53 ) 

54 if not wait: 

55 return None 

56 

57 response_payload: Any = json.load(response["Payload"]) 

58 if func_error := response.get("FunctionError"): 

59 logger.error( 

60 f"Error during invocation of Lambda function {function_name}" 

61 f":\n{func_error}", 

62 ) 

63 raise InvocationError(func_error, response_payload) 

64 

65 return response_payload 

66 

67 

68class RequestResponseFunction(Intercept[JsonDict]): 

69 def __init__( 

70 self, 

71 function_name: str, 

72 *, 

73 client: BaseClient | None = None, 

74 ) -> None: 

75 def replacement(args: JsonDict) -> JsonDict: 

76 return invoke(function_name, args, client=client, wait=True) 

77 

78 super().__init__(replacement) 

79 

80 

81class EventFunction(Intercept[None]): 

82 def __init__( 

83 self, 

84 function_name: str, 

85 *, 

86 client: BaseClient | None = None, 

87 ) -> None: 

88 def replacement(args: JsonDict) -> None: 

89 invoke(function_name, args, client=client, wait=False) 

90 

91 super().__init__(replacement)