Coverage for src/extratools_cloud/aws/lambda_.py: 0%
35 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-21 21:33 -0700
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-21 21:33 -0700
1import json
2import logging
3from typing import Any
5from botocore.client import BaseClient
6from botocore.config import Config
7from extratools_core.func import Intercept
8from extratools_core.json import JsonDict
10from .helpers import get_client
12logger = logging.getLogger(__name__)
14# Lambda can run at most 15 minutes
15MAX_FUNCTION_DURATION: int = 60 * 15
17default_client: BaseClient = get_client(
18 "lambda",
19 # To prevent client timeout during long invocation
20 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/invoke.html
21 config=Config(
22 connect_timeout=MAX_FUNCTION_DURATION,
23 read_timeout=MAX_FUNCTION_DURATION,
24 ),
25)
28class InvocationError(RuntimeError):
29 def __init__(self, message: str, error: JsonDict) -> None:
30 self.message = message
31 self.error = error
34def invoke(
35 function_name: str,
36 payload: Any,
37 *,
38 client: BaseClient | None = None,
39 wait: bool = True,
40) -> Any:
41 logger.info(
42 f"Invoking Lambda function {function_name}"
43 f" with payload:\n{json.dumps(payload)}",
44 )
46 response: JsonDict = (client or default_client).invoke(
47 FunctionName=function_name,
48 Payload=json.dumps(payload).encode(),
49 InvocationType=(
50 "RequestResponse" if wait
51 else "Event"
52 ),
53 )
54 if not wait:
55 return None
57 response_payload: Any = json.load(response["Payload"])
58 if func_error := response.get("FunctionError"):
59 logger.error(
60 f"Error during invocation of Lambda function {function_name}"
61 f":\n{func_error}",
62 )
63 raise InvocationError(func_error, response_payload)
65 return response_payload
68class RequestResponseFunction(Intercept[JsonDict]):
69 def __init__(
70 self,
71 function_name: str,
72 *,
73 client: BaseClient | None = None,
74 ) -> None:
75 def replacement(args: JsonDict) -> JsonDict:
76 return invoke(function_name, args, client=client, wait=True)
78 super().__init__(replacement)
81class EventFunction(Intercept[None]):
82 def __init__(
83 self,
84 function_name: str,
85 *,
86 client: BaseClient | None = None,
87 ) -> None:
88 def replacement(args: JsonDict) -> None:
89 invoke(function_name, args, client=client, wait=False)
91 super().__init__(replacement)