Coverage for src/extratools_cloud/aws/lambda_.py: 0%
35 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-20 20:51 -0700
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-20 20:51 -0700
1import json
2import logging
3from typing import Any
5import boto3
6from botocore.client import BaseClient
7from botocore.config import Config
8from extratools_core.func import Intercept
9from extratools_core.json import JsonDict
11logger = logging.getLogger(__name__)
13# Lambda can run at most 15 minutes
14MAX_FUNCTION_DURATION: int = 60 * 15
16default_client: BaseClient = boto3.client(
17 "lambda",
18 # To prevent client timeout during long invocation
19 config=Config(
20 connect_timeout=MAX_FUNCTION_DURATION,
21 read_timeout=MAX_FUNCTION_DURATION,
22 ),
23)
26class InvocationError(RuntimeError):
27 def __init__(self, message: str, error: JsonDict) -> None:
28 self.message = message
29 self.error = error
32def invoke(
33 function_name: str,
34 payload: Any,
35 *,
36 client: BaseClient | None = None,
37 wait: bool = True,
38) -> Any:
39 logger.info(
40 f"Invoking Lambda function {function_name}"
41 f" with payload:\n{json.dumps(payload)}",
42 )
44 response: JsonDict = (client or default_client).invoke(
45 FunctionName=function_name,
46 Payload=json.dumps(payload).encode(),
47 InvocationType=(
48 "RequestResponse" if wait
49 else "Event"
50 ),
51 )
52 if not wait:
53 return None
55 response_payload: Any = json.load(response["Payload"])
56 if func_error := response.get("FunctionError"):
57 logger.error(
58 f"Error during invocation of Lambda function {function_name}"
59 f":\n{func_error}",
60 )
61 raise InvocationError(func_error, response_payload)
63 return response_payload
66class RequestResponseFunction(Intercept[JsonDict]):
67 def __init__(
68 self,
69 function_name: str,
70 *,
71 client: BaseClient | None = None,
72 ) -> None:
73 def replacement(args: JsonDict) -> JsonDict:
74 return invoke(function_name, args, client=client, wait=True)
76 super().__init__(replacement)
79class EventFunction(Intercept[None]):
80 def __init__(
81 self,
82 function_name: str,
83 *,
84 client: BaseClient | None = None,
85 ) -> None:
86 def replacement(args: JsonDict) -> None:
87 invoke(function_name, args, client=client, wait=False)
89 super().__init__(replacement)