Coverage for src/extratools_cloud/aws/lambda_.py: 0%
35 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-21 09:29 -0700
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-21 09:29 -0700
1import json
2import logging
3from typing import Any
5import boto3
6from botocore.client import BaseClient
7from botocore.config import Config
8from extratools_core.func import Intercept
9from extratools_core.json import JsonDict
11logger = logging.getLogger(__name__)
13# Lambda can run at most 15 minutes
14MAX_FUNCTION_DURATION: int = 60 * 15
16default_client: BaseClient = boto3.client(
17 "lambda",
18 # To prevent client timeout during long invocation
19 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/invoke.html
20 config=Config(
21 connect_timeout=MAX_FUNCTION_DURATION,
22 read_timeout=MAX_FUNCTION_DURATION,
23 ),
24)
27class InvocationError(RuntimeError):
28 def __init__(self, message: str, error: JsonDict) -> None:
29 self.message = message
30 self.error = error
33def invoke(
34 function_name: str,
35 payload: Any,
36 *,
37 client: BaseClient | None = None,
38 wait: bool = True,
39) -> Any:
40 logger.info(
41 f"Invoking Lambda function {function_name}"
42 f" with payload:\n{json.dumps(payload)}",
43 )
45 response: JsonDict = (client or default_client).invoke(
46 FunctionName=function_name,
47 Payload=json.dumps(payload).encode(),
48 InvocationType=(
49 "RequestResponse" if wait
50 else "Event"
51 ),
52 )
53 if not wait:
54 return None
56 response_payload: Any = json.load(response["Payload"])
57 if func_error := response.get("FunctionError"):
58 logger.error(
59 f"Error during invocation of Lambda function {function_name}"
60 f":\n{func_error}",
61 )
62 raise InvocationError(func_error, response_payload)
64 return response_payload
67class RequestResponseFunction(Intercept[JsonDict]):
68 def __init__(
69 self,
70 function_name: str,
71 *,
72 client: BaseClient | None = None,
73 ) -> None:
74 def replacement(args: JsonDict) -> JsonDict:
75 return invoke(function_name, args, client=client, wait=True)
77 super().__init__(replacement)
80class EventFunction(Intercept[None]):
81 def __init__(
82 self,
83 function_name: str,
84 *,
85 client: BaseClient | None = None,
86 ) -> None:
87 def replacement(args: JsonDict) -> None:
88 invoke(function_name, args, client=client, wait=False)
90 super().__init__(replacement)