autohive_integrations_sdk.integration
Autohive Integrations SDK — core module.
Provides the building blocks for creating Autohive integrations:
Integration— load config and register action/trigger/connected-account handlersExecutionContext— authenticated HTTP client passed to every handlerActionHandler— base class for action implementations (returnActionResult)ConnectedAccountHandler— base class for connected-account lookups (returnConnectedAccountInfo)ActionResult— standard return type wrapping action output data and optional billing costActionError— return type for expected application-level errors (bypasses output schema validation)FetchResponse— response object fromcontext.fetch()with.status,.headers, and.dataConnectedAccountInfo— structured account info returned by connected-account handlersHTTPError/RateLimitError— exceptions raised bycontext.fetch()for non-2xx responses
Typical usage::
from autohive_integrations_sdk import Integration, ActionHandler, ActionResult, ExecutionContext
integration = Integration.load()
@integration.action("my_action")
class MyAction(ActionHandler):
async def execute(self, inputs, context):
response = await context.fetch("https://api.example.com/resource")
return ActionResult(data=response.data)
1"""Autohive Integrations SDK — core module. 2 3Provides the building blocks for creating Autohive integrations: 4 5- `Integration` — load config and register action/trigger/connected-account handlers 6- `ExecutionContext` — authenticated HTTP client passed to every handler 7- `ActionHandler` — base class for action implementations (return `ActionResult`) 8- `ConnectedAccountHandler` — base class for connected-account lookups (return `ConnectedAccountInfo`) 9- `ActionResult` — standard return type wrapping action output data and optional billing cost 10- `ActionError` — return type for expected application-level errors (bypasses output schema validation) 11- `FetchResponse` — response object from ``context.fetch()`` with ``.status``, ``.headers``, and ``.data`` 12- `ConnectedAccountInfo` — structured account info returned by connected-account handlers 13- `HTTPError` / `RateLimitError` — exceptions raised by ``context.fetch()`` for non-2xx responses 14 15Typical usage:: 16 17 from autohive_integrations_sdk import Integration, ActionHandler, ActionResult, ExecutionContext 18 19 integration = Integration.load() 20 21 @integration.action("my_action") 22 class MyAction(ActionHandler): 23 async def execute(self, inputs, context): 24 response = await context.fetch("https://api.example.com/resource") 25 return ActionResult(data=response.data) 26""" 27 28# Standard Library Imports 29from abc import ABC, abstractmethod 30import asyncio 31from dataclasses import dataclass, field, asdict 32from datetime import timedelta 33from enum import Enum 34import json 35import json as jsonX # Keep alias to avoid conflict with 'json' parameter in fetch 36import logging 37import os 38import sys 39from pathlib import Path 40from typing import Dict, Any, List, Optional, Union, Type, TypeVar, Generic, ClassVar 41from urllib.parse import urlencode 42 43# Third-Party Imports 44import aiohttp 45from jsonschema import validate, Draft7Validator 46 47 48# Local Imports 49from autohive_integrations_sdk import __version__ 50 51 52# ---- Type Definitions ---- 53T = TypeVar('T') 54 55# ---- Auth Types ---- 56class AuthType(Enum): 57 """Authentication strategy used by an integration. 58 59 The platform stores the auth type alongside credentials and passes both 60 to ``ExecutionContext``. ``context.fetch()`` uses the type to decide 61 whether to auto-inject an ``Authorization`` header. 62 63 Members: 64 PlatformOauth2: Platform-managed OAuth 2.0 — the platform handles the 65 token lifecycle and injects ``Bearer <access_token>`` automatically. 66 PlatformTeams: Platform-managed Microsoft Teams auth. 67 ApiKey: A single API key provided by the user. 68 Basic: Username/password (HTTP Basic) credentials. 69 Custom: Free-form credential fields defined by the integration's 70 ``config.json`` auth schema. The integration is responsible for 71 reading individual fields from ``context.auth``. 72 """ 73 PlatformOauth2 = "PlatformOauth2" 74 PlatformTeams = "PlatformTeams" 75 ApiKey = "ApiKey" 76 Basic = "Basic" 77 Custom = "Custom" 78 79class ResultType(Enum): 80 """Type of result being returned""" 81 ACTION = "action" 82 ACTION_ERROR = "action_error" 83 CONNECTED_ACCOUNT = "connected_account" 84 ERROR = "error" 85 VALIDATION_ERROR = "validation_error" 86 87# ---- Exceptions ---- 88class ValidationError(Exception): 89 """Raised when SDK validation fails. 90 91 This covers several cases: 92 93 - Action inputs don't match the ``input_schema`` in ``config.json`` 94 - Action outputs don't match the ``output_schema`` 95 - Auth credentials don't match the ``auth.fields`` schema 96 - An action handler returns something other than ``ActionResult`` 97 - A handler name isn't registered 98 """ 99 def __init__(self, message: str, schema: str = None, inputs: str = None, source: str = "legacy"): 100 self.schema = schema 101 """The schema that failed validation""" 102 self.inputs = inputs 103 """The data that failed validation""" 104 self.message = message 105 """The error message""" 106 self.source = source 107 """Where the validation failed: 'input', 'output', or 'legacy' (pre-versioning default)""" 108 super().__init__(message) 109 110class ConfigurationError(Exception): 111 """Raised when integration configuration is invalid""" 112 pass 113 114class HTTPError(Exception): 115 """Raised by ``ExecutionContext.fetch()`` for non-2xx HTTP responses (except 429).""" 116 def __init__(self, status: int, message: str, response_data: Any = None): 117 self.status = status 118 """Status code""" 119 self.message = message 120 """Error message""" 121 self.response_data = response_data 122 """Response data""" 123 super().__init__(f"HTTP {status}: {message}") 124 125class RateLimitError(HTTPError): 126 """Raised by ``ExecutionContext.fetch()`` on HTTP 429 (Too Many Requests). 127 128 Attributes: 129 retry_after: Seconds to wait before retrying, taken from the 130 ``Retry-After`` response header (defaults to 60 if absent). 131 """ 132 def __init__(self, retry_after: int, *args, **kwargs): 133 self.retry_after = retry_after 134 """Seconds to wait before retrying.""" 135 super().__init__(*args, **kwargs) 136 137# ---- Result Classes ---- 138@dataclass 139class FetchResponse: 140 """Response object returned by ``ExecutionContext.fetch()``. 141 142 Wraps the full HTTP response so callers can inspect status codes and 143 headers in addition to the parsed body. 144 145 Attributes: 146 status: HTTP status code (e.g. ``200``, ``201``). 147 headers: Response headers as a plain ``dict``. 148 data: Parsed JSON (``dict``/``list``) when the response is 149 ``application/json``, otherwise the raw response text. 150 ``None`` for empty 200/201/204 responses. 151 """ 152 status: int 153 headers: Dict[str, str] 154 data: Any 155 156@dataclass 157class ActionResult: 158 """Result returned by action handlers. 159 160 This class encapsulates the data returned by an action along with optional 161 billing information for cost tracking. 162 163 Args: 164 data: The actual result data from the action 165 cost_usd: Optional USD cost for billing purposes 166 167 Example: 168 ```python 169 return ActionResult( 170 data={"message": "Success", "result": 42}, 171 cost_usd=0.05 172 ) 173 ``` 174 """ 175 data: Any 176 cost_usd: Optional[float] = None 177 178@dataclass 179class ActionError: 180 """Error result returned by action handlers for expected/application-level errors. 181 182 When returned from an action handler, output schema validation is skipped 183 and the error is returned to the caller as a ResultType.ERROR result. 184 185 Args: 186 message: Human-readable error message 187 cost_usd: Optional USD cost incurred before the error occurred 188 189 Example: 190 ```python 191 return ActionError( 192 message="User not found", 193 cost_usd=0.01 194 ) 195 ``` 196 """ 197 message: str 198 cost_usd: Optional[float] = None 199 200@dataclass 201class ConnectedAccountInfo: 202 """Account metadata returned by a ``ConnectedAccountHandler``. 203 204 The platform calls the connected-account handler after a user links 205 their external account. The returned info is displayed in the 206 Autohive UI (avatar, name, email, etc.). 207 208 All fields are optional — populate whichever ones the external API provides. 209 """ 210 email: Optional[str] = None 211 first_name: Optional[str] = None 212 last_name: Optional[str] = None 213 username: Optional[str] = None 214 user_id: Optional[str] = None 215 avatar_url: Optional[str] = None 216 organization: Optional[str] = None 217 218@dataclass 219class IntegrationResult: 220 """Result format sent from lambda wrapper to backend. 221 222 This class represents the standardized format that the lambda wrapper 223 sends to the Autohive backend, including SDK version and type-specific data. 224 225 Args: 226 version: SDK version (auto-populated) 227 type: Type of result payload (ResultType enum: ACTION, CONNECTED_ACCOUNT, ERROR) 228 result: The result object - ActionResult for actions, ActionError for 229 application-level action errors, or ConnectedAccountInfo for 230 connected accounts. 231 The lambda wrapper serializes these to dicts using asdict(). 232 233 Note: 234 This type is returned by Integration methods and serialized by the lambda wrapper. 235 Integration developers should use ActionResult for action handlers and 236 ActionError for expected error conditions. 237 """ 238 version: str 239 type: ResultType 240 result: Union[ActionResult, ActionError, ConnectedAccountInfo] 241 242# ---- Configuration Classes ---- 243 244@dataclass 245class Parameter: 246 """Definition of a parameter""" 247 name: str 248 type: str 249 description: str 250 enum: Optional[List[str]] = None 251 required: bool = True 252 default: Any = None 253 254@dataclass 255class SchemaDefinition: 256 """Base class for components that have input/output schemas""" 257 name: str 258 description: str 259 input_schema: List[Parameter] 260 output_schema: Optional[Dict[str, Any]] = None 261 262@dataclass 263class Action(SchemaDefinition): 264 """Empty dataclass that inherits from SchemaDefinition""" 265 pass 266 267@dataclass 268class PollingTrigger(SchemaDefinition): 269 """Definition of a polling trigger""" 270 polling_interval: timedelta = field(default_factory=timedelta) 271 272@dataclass 273class IntegrationConfig: 274 """Configuration for an integration""" 275 name: str 276 version: str 277 description: str 278 auth: Dict[str, Any] 279 actions: Dict[str, Action] 280 polling_triggers: Dict[str, PollingTrigger] 281 282# ---- Base Handler Classes ---- 283class ActionHandler(ABC): 284 """Base class for action handlers. 285 286 Subclass this and implement ``execute()`` to handle a specific action. 287 Register it with the ``@integration.action("action_name")`` decorator. 288 289 Example:: 290 291 @integration.action("get_user") 292 class GetUser(ActionHandler): 293 async def execute(self, inputs, context): 294 user = (await context.fetch(f"https://api.example.com/users/{inputs['id']}")).data 295 return ActionResult(data=user) 296 """ 297 @abstractmethod 298 async def execute(self, inputs: Dict[str, Any], context: 'ExecutionContext') -> Any: 299 """Run the action logic. 300 301 Args: 302 inputs: Validated action inputs matching the ``input_schema`` from ``config.json``. 303 context: Execution context providing ``fetch()``, ``auth``, and logging. 304 305 Returns: 306 An ``ActionResult`` containing the output data and optional ``cost_usd``. 307 """ 308 pass 309 310class PollingTriggerHandler(ABC): 311 """Base class for polling trigger handlers""" 312 @abstractmethod 313 async def poll(self, inputs: Dict[str, Any], last_poll_ts: Optional[str], context: 'ExecutionContext') -> List[Dict[str, Any]]: 314 """Execute the polling trigger""" 315 pass 316 317class ConnectedAccountHandler(ABC): 318 """Base class for connected-account handlers. 319 320 The platform calls this after a user links their external account. 321 The returned ``ConnectedAccountInfo`` is shown in the Autohive UI. 322 323 Register with the ``@integration.connected_account()`` decorator. 324 325 Example:: 326 327 @integration.connected_account() 328 class MyAccountHandler(ConnectedAccountHandler): 329 async def get_account_info(self, context): 330 me = (await context.fetch("https://api.example.com/me")).data 331 return ConnectedAccountInfo( 332 email=me["email"], 333 first_name=me["first_name"], 334 last_name=me["last_name"], 335 ) 336 """ 337 @abstractmethod 338 async def get_account_info(self, context: 'ExecutionContext') -> ConnectedAccountInfo: 339 """Fetch account metadata from the external service. 340 341 For platform OAuth integrations, ``context.fetch()`` auto-injects 342 the Bearer token — no manual auth handling needed. 343 344 Returns: 345 A ``ConnectedAccountInfo`` with whichever fields the API provides. 346 """ 347 pass 348 349# ---- Core SDK Classes ---- 350class ExecutionContext: 351 """Context provided to integration handlers for making authenticated HTTP requests. 352 353 Manages an ``aiohttp`` session with automatic retries, error handling, and 354 optional Bearer-token injection for platform OAuth integrations. 355 356 Use as an async context manager:: 357 358 async with ExecutionContext(auth=auth) as context: 359 result = await integration.execute_action("my_action", inputs, context) 360 361 Args: 362 auth: Authentication data. In **local tests** this is a flat dict 363 matching the ``auth.fields`` schema in ``config.json`` 364 (e.g. ``{"api_key": "..."}``). In **production** the platform 365 wraps credentials as ``{"auth_type": "...", "credentials": {...}}``. 366 request_config: Override default ``max_retries`` (3) and ``timeout`` (30 s). 367 metadata: Arbitrary metadata forwarded to handlers. 368 logger: Custom logger; falls back to ``logging.getLogger(__name__)``. 369 """ 370 def __init__( 371 self, 372 auth: Dict[str, Any] = {}, 373 request_config: Optional[Dict[str, Any]] = None, 374 metadata: Optional[Dict[str, Any]] = None, 375 logger: Optional[logging.Logger] = None 376 ): 377 self.auth = auth 378 """Authentication configuration""" 379 self.config = request_config or {"max_retries": 3, "timeout": 30} 380 """Request configuration""" 381 self.metadata = metadata or {} 382 """Additional metadata""" 383 self.logger = logger or logging.getLogger(__name__) 384 """Logger instance""" 385 self._session: Optional[aiohttp.ClientSession] = None 386 387 async def __aenter__(self): 388 if not self._session: 389 self._session = aiohttp.ClientSession() 390 return self 391 392 async def __aexit__(self, exc_type, exc_val, exc_tb): 393 if self._session: 394 await self._session.close() 395 self._session = None 396 397 async def fetch( 398 self, 399 url: str, 400 method: str = "GET", 401 params: Optional[Dict[str, Any]] = None, 402 data: Any = None, 403 json: Any = None, 404 headers: Optional[Dict[str, str]] = None, 405 content_type: Optional[str] = None, 406 timeout: Optional[int] = None, 407 retry_count: int = 0 408 ) -> FetchResponse: 409 """Make an HTTP request with automatic retries and error handling. 410 411 For **platform OAuth** integrations (``auth_type == "PlatformOauth2"``), 412 a ``Bearer`` token is auto-injected from ``auth.credentials.access_token`` 413 unless an ``Authorization`` header is explicitly provided. 414 415 Retries up to ``max_retries`` (default 3) on transient network errors 416 with exponential back-off. HTTP 429 responses raise ``RateLimitError`` 417 immediately (no automatic retry). 418 419 Args: 420 url: The URL to request. 421 method: HTTP method (``"GET"``, ``"POST"``, ``"PUT"``, etc.). 422 params: Query parameters appended to the URL. Nested dicts/lists 423 are JSON-serialized automatically. 424 data: Raw request body. Encoding depends on ``content_type``. 425 json: JSON-serializable payload. Sets ``content_type`` to 426 ``application/json`` automatically. 427 headers: Additional HTTP headers. Merged *after* any auto-injected 428 auth header, so an explicit ``Authorization`` takes precedence. 429 content_type: ``Content-Type`` header value. 430 timeout: Per-request timeout in seconds (overrides ``request_config``). 431 retry_count: Internal — current retry attempt number. 432 433 Returns: 434 A ``FetchResponse`` containing the HTTP status code, response 435 headers, and parsed body data. 436 437 Raises: 438 RateLimitError: On HTTP 429 with the ``Retry-After`` value. 439 HTTPError: On any other non-2xx status. 440 """ 441 if not self._session: 442 self._session = aiohttp.ClientSession() 443 444 # Prepare request 445 if json is not None: 446 data = json 447 content_type = "application/json" 448 449 final_headers = {} 450 451 if self.auth and "Authorization" not in (headers or {}): 452 auth_type = AuthType(self.auth.get("auth_type", "PlatformOauth2")) 453 credentials = self.auth.get("credentials", {}) 454 455 if auth_type == AuthType.PlatformOauth2 and "access_token" in credentials: 456 final_headers["Authorization"] = f"Bearer {credentials['access_token']}" 457 458 if content_type: 459 final_headers["Content-Type"] = content_type 460 if headers: 461 final_headers.update(headers) 462 463 if params: 464 # Handle nested dictionary parameters 465 flat_params = {} 466 for key, value in params.items(): 467 if isinstance(value, (dict, list)): 468 flat_params[key] = jsonX.dumps(value) 469 elif value is not None: 470 flat_params[key] = str(value) 471 query_string = urlencode(flat_params) 472 url = f"{url}{'&' if '?' in url else '?'}{query_string}" 473 474 # Prepare body 475 if data is not None: 476 if content_type == "application/json": 477 data = jsonX.dumps(data) 478 elif content_type == "application/x-www-form-urlencoded": 479 data = urlencode(data) if isinstance(data, dict) else data 480 481 # Store the original timeout numeric value 482 original_timeout = timeout or self.config["timeout"] 483 484 # Convert the numeric timeout to a ClientTimeout instance for this request 485 client_timeout = aiohttp.ClientTimeout(total=original_timeout) 486 487 try: 488 async with self._session.request( 489 method=method, 490 url=url, 491 data=data, 492 headers=final_headers, 493 timeout=client_timeout, 494 ssl=True 495 ) as response: 496 content_type = response.headers.get("Content-Type", "") 497 498 if response.status == 429: # Rate limit 499 retry_after = int(response.headers.get("Retry-After", 60)) 500 raise RateLimitError( 501 retry_after, 502 response.status, 503 "Rate limit exceeded", 504 await response.text() 505 ) 506 507 try: 508 if "application/json" in content_type: 509 result = await response.json() 510 else: 511 result = await response.text() 512 if not result and response.status in {200, 201, 204}: 513 result = None 514 except Exception as e: 515 self.logger.error(f"Error parsing response: {e}") 516 result = await response.text() 517 518 response_headers = dict(response.headers) 519 520 if not response.ok: 521 print(f"HTTP error encountered. Status: {response.status}. Result: {result}") 522 raise HTTPError(response.status, str(result), result) 523 524 return FetchResponse( 525 status=response.status, 526 headers=response_headers, 527 data=result, 528 ) 529 530 except RateLimitError: 531 raise 532 except (aiohttp.ClientError, asyncio.TimeoutError) as e: 533 # Don't want to send this to Raygun here because this will be retried. 534 print(f"Error encountered: {e}. Retry count: {retry_count}. Backing off.") 535 if retry_count < self.config["max_retries"]: 536 await asyncio.sleep(2 ** retry_count) # Exponential backoff 537 print("Retrying request...") 538 # Use original_timeout (numeric) for recursive calls 539 return await self.fetch( 540 url, method, params, data, json, 541 headers, content_type, original_timeout, retry_count + 1 542 ) 543 else: 544 print("Max retries reached. Raising error.") 545 raise 546 except Exception as e: 547 self.logger.error(f"Unexpected error during {method} {url}: {e}") 548 print(f"Unexpected error encountered: {e}") 549 raise 550 551 552class Integration: 553 """Base integration class with handler registration and execution. 554 555 This class manages the integration configuration, handler registration, 556 and provides methods to execute actions and triggers. 557 558 Args: 559 config: Integration configuration 560 561 Attributes: 562 config: Integration configuration 563 """ 564 565 def __init__(self, config: IntegrationConfig): 566 self.config = config 567 """Integration configuration""" 568 self._action_handlers: Dict[str, Type[ActionHandler]] = {} 569 """Action handlers""" 570 self._polling_handlers: Dict[str, Type[PollingTriggerHandler]] = {} 571 """Polling handlers""" 572 self._connected_account_handler: Optional[Type[ConnectedAccountHandler]] = None 573 """Connected account handler""" 574 575 @classmethod 576 def load(cls, config_path: Union[str, Path] = None) -> 'Integration': 577 """Load an integration from its ``config.json``. 578 579 Args: 580 config_path: Explicit path to ``config.json``. When omitted the 581 SDK resolves the path relative to its own package location, 582 which works when the SDK is vendored via 583 ``pip install --target dependencies``. Multi-file integrations 584 that use ``actions/`` sub-packages should pass an explicit path 585 (e.g. ``Integration.load("config.json")``). 586 587 Returns: 588 A fully initialised ``Integration`` ready for handler registration. 589 590 Raises: 591 ConfigurationError: If the file is missing or contains invalid JSON. 592 """ 593 if config_path is None: 594 config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'config.json') 595 596 config_path = Path(config_path) 597 598 if not config_path.exists(): 599 raise ConfigurationError(f"Configuration file not found: {config_path}") 600 601 try: 602 with open(config_path, 'r') as f: 603 config_data = json.load(f) 604 except json.JSONDecodeError as e: 605 raise ConfigurationError(f"Invalid JSON configuration: {e}") 606 607 # Parse configuration sections 608 actions = cls._parse_actions(config_data.get("actions", {})) 609 polling_triggers = cls._parse_polling_triggers(config_data.get("polling_triggers", {})) 610 611 config = IntegrationConfig( 612 name=config_data["name"], 613 version=config_data["version"], 614 description=config_data["description"], 615 auth=config_data.get("auth", {}), 616 actions=actions, 617 polling_triggers=polling_triggers 618 ) 619 620 return cls(config) 621 622 @staticmethod 623 def _parse_interval(interval_str: str) -> timedelta: 624 """Parse interval string into timedelta""" 625 unit = interval_str[-1].lower() 626 value = int(interval_str[:-1]) 627 628 if unit == 's': 629 return timedelta(seconds=value) 630 elif unit == 'm': 631 return timedelta(minutes=value) 632 elif unit == 'h': 633 return timedelta(hours=value) 634 elif unit == 'd': 635 return timedelta(days=value) 636 else: 637 raise ConfigurationError(f"Invalid interval format: {interval_str}") 638 639 @classmethod 640 def _parse_actions(cls, actions_config: Dict[str, Any]) -> Dict[str, Action]: 641 """Parse action configurations""" 642 actions = {} 643 for name, data in actions_config.items(): 644 actions[name] = Action( 645 name=name, 646 description=data["description"], 647 input_schema=data["input_schema"], 648 output_schema=data["output_schema"] 649 ) 650 651 return actions 652 653 @classmethod 654 def _parse_polling_triggers(cls, triggers_config: Dict[str, Any]) -> Dict[str, PollingTrigger]: 655 """Parse polling trigger configurations""" 656 triggers = {} 657 for name, data in triggers_config.items(): 658 interval = cls._parse_interval(data["polling_interval"]) 659 660 triggers[name] = PollingTrigger( 661 name=name, 662 description=data["description"], 663 polling_interval=interval, 664 input_schema=data["input_schema"], 665 output_schema=data["output_schema"] 666 ) 667 668 return triggers 669 670 def action(self, name: str): 671 """Decorator to register an action handler. 672 673 Args: 674 name: Name of the action to register 675 676 Returns: 677 Decorator function 678 679 Raises: 680 ConfigurationError: If action is not defined in config 681 682 Example: 683 ```python 684 @integration.action("my_action") 685 class MyActionHandler(ActionHandler): 686 async def execute(self, inputs, context): 687 # Implementation 688 return result 689 ``` 690 """ 691 def decorator(handler_class: Type[ActionHandler]): 692 if name not in self.config.actions: 693 raise ConfigurationError(f"Action '{name}' not defined in config") 694 self._action_handlers[name] = handler_class 695 return handler_class 696 return decorator 697 698 def polling_trigger(self, name: str): 699 """Decorator to register a polling trigger handler 700 701 Args: 702 name: Name of the polling trigger to register 703 704 Returns: 705 Decorator function 706 707 Raises: 708 ConfigurationError: If polling trigger is not defined in config 709 710 Example: 711 ```python 712 @integration.polling_trigger("my_polling_trigger") 713 class MyPollingTriggerHandler(PollingTriggerHandler): 714 async def poll(self, inputs, last_poll_ts, context): 715 # Implementation 716 return result 717 ``` 718 """ 719 def decorator(handler_class: Type[PollingTriggerHandler]): 720 if name not in self.config.polling_triggers: 721 raise ConfigurationError(f"Polling trigger '{name}' not defined in config") 722 self._polling_handlers[name] = handler_class 723 return handler_class 724 return decorator 725 726 def connected_account(self): 727 """Decorator to register a connected account handler 728 729 Returns: 730 Decorator function 731 732 Example: 733 ```python 734 @integration.connected_account() 735 class MyConnectedAccountHandler(ConnectedAccountHandler): 736 async def get_account_info(self, context): 737 # Implementation 738 return {"email": "user@example.com", "name": "John Doe"} 739 ``` 740 """ 741 def decorator(handler_class: Type[ConnectedAccountHandler]): 742 self._connected_account_handler = handler_class 743 return handler_class 744 return decorator 745 746 async def execute_action(self, 747 name: str, 748 inputs: Dict[str, Any], 749 context: ExecutionContext) -> IntegrationResult: 750 """Execute a registered action. 751 752 Args: 753 name: Name of the action to execute 754 inputs: Action inputs 755 context: Execution context 756 757 Returns: 758 IntegrationResult with action data (ResultType.ACTION), 759 action error (ResultType.ACTION_ERROR) if the handler returned ActionError, 760 or validation error (ResultType.VALIDATION_ERROR) if schema validation fails. 761 """ 762 try: 763 if name not in self._action_handlers: 764 raise ValidationError(f"Action '{name}' not registered") 765 766 # Validate inputs against action schema 767 action_config = self.config.actions[name] 768 validator = Draft7Validator(action_config.input_schema) 769 errors = sorted(validator.iter_errors(inputs), key=lambda e: e.path) 770 if errors: 771 message = "" 772 for error in errors: 773 message += f"{list(error.schema_path)}, {error.message},\n " 774 raise ValidationError(message, action_config.input_schema, inputs, source="input") 775 776 if "fields" in self.config.auth: 777 auth_config = self.config.auth["fields"] 778 validator = Draft7Validator(auth_config) 779 errors = sorted(validator.iter_errors(context.auth), key=lambda e: e.path) 780 if errors: 781 message = "" 782 for error in errors: 783 message += f"{list(error.schema_path)}, {error.message},\n " 784 raise ValidationError(message, auth_config, context.auth, source="input") 785 786 # Create handler instance and execute 787 handler = self._action_handlers[name]() 788 result = await handler.execute(inputs, context) 789 790 # Handle ActionError - skip output schema validation 791 if isinstance(result, ActionError): 792 return IntegrationResult( 793 version=__version__, 794 type=ResultType.ACTION_ERROR, 795 result=result 796 ) 797 798 # Validate that result is ActionResult 799 if not isinstance(result, ActionResult): 800 raise ValidationError( 801 f"Action handler '{name}' must return ActionResult or ActionError, got {type(result).__name__}", 802 source="output" 803 ) 804 805 # Validate output schema against the data inside ActionResult 806 validator = Draft7Validator(action_config.output_schema) 807 errors = sorted(validator.iter_errors(result.data), key=lambda e: e.path) 808 if errors: 809 message = "" 810 for error in errors: 811 message += f"{list(error.schema_path)}, {error.message},\n " 812 raise ValidationError(message, action_config.output_schema, result.data, source="output") 813 814 # Return IntegrationResult with ActionResult directly 815 return IntegrationResult( 816 version=__version__, 817 type=ResultType.ACTION, 818 result=result 819 ) 820 except ValidationError as e: 821 return IntegrationResult( 822 version=__version__, 823 type=ResultType.VALIDATION_ERROR, 824 result={ 825 'message': str(e), 826 'property': None, 827 'value': None, 828 'source': getattr(e, 'source', 'legacy') 829 } 830 ) 831 832 async def execute_polling_trigger(self, 833 name: str, 834 inputs: Dict[str, Any], 835 last_poll_ts: Optional[str], 836 context: ExecutionContext) -> List[Dict[str, Any]]: 837 """Execute a registered polling trigger 838 839 Args: 840 name: Name of the polling trigger to execute 841 inputs: Trigger inputs 842 last_poll_ts: Last poll timestamp 843 context: Execution context 844 845 Returns: 846 List of records 847 848 Raises: 849 ValidationError: If inputs or outputs don't match schema 850 """ 851 if name not in self._polling_handlers: 852 raise ValidationError(f"Polling trigger '{name}' not registered") 853 854 # Validate trigger configuration 855 trigger_config = self.config.polling_triggers[name] 856 try: 857 validate(inputs, trigger_config.input_schema) 858 except Exception as e: 859 raise ValidationError(e.message, e.schema, e.instance) 860 861 try: 862 auth_config = self.config.auth["fields"] 863 validate(context.auth, auth_config) 864 except Exception as e: 865 raise ValidationError(e.message, e.schema, e.instance) 866 867 # Create handler instance and execute 868 handler = self._polling_handlers[name]() 869 records = await handler.poll(inputs, last_poll_ts, context) 870 # Validate each record 871 for record in records: 872 if "id" not in record: 873 raise ValidationError( 874 f"Polling trigger '{name}' returned record without required 'id' field") 875 if "data" not in record: 876 raise ValidationError( 877 f"Polling trigger '{name}' returned record without required 'data' field") 878 879 # Validate record data against output schema 880 try: 881 validate(record["data"], trigger_config.output_schema) 882 except Exception as e: 883 raise ValidationError(e.message, e.schema, e.instance) 884 885 return records 886 887 async def get_connected_account(self, context: ExecutionContext) -> IntegrationResult: 888 """Get connected account information 889 890 Args: 891 context: Execution context 892 893 Returns: 894 IntegrationResult containing connected account data 895 896 Raises: 897 ValidationError: If no connected account handler is registered or auth is invalid 898 """ 899 if not self._connected_account_handler: 900 raise ValidationError("No connected account handler registered") 901 902 if "fields" in self.config.auth: 903 auth_config = self.config.auth["fields"] 904 validator = Draft7Validator(auth_config) 905 errors = sorted(validator.iter_errors(context.auth), key=lambda e: e.path) 906 if errors: 907 message = "" 908 for error in errors: 909 message += f"{list(error.schema_path)}, {error.message},\n " 910 raise ValidationError(message, auth_config, context.auth) 911 912 handler = self._connected_account_handler() 913 account_info = await handler.get_account_info(context) 914 915 if not isinstance(account_info, ConnectedAccountInfo): 916 raise ValidationError( 917 f"Connected account handler must return ConnectedAccountInfo, got {type(account_info).__name__}" 918 ) 919 920 # Return IntegrationResult with ConnectedAccountInfo object directly 921 return IntegrationResult( 922 version=__version__, 923 type=ResultType.CONNECTED_ACCOUNT, 924 result=account_info 925 )
57class AuthType(Enum): 58 """Authentication strategy used by an integration. 59 60 The platform stores the auth type alongside credentials and passes both 61 to ``ExecutionContext``. ``context.fetch()`` uses the type to decide 62 whether to auto-inject an ``Authorization`` header. 63 64 Members: 65 PlatformOauth2: Platform-managed OAuth 2.0 — the platform handles the 66 token lifecycle and injects ``Bearer <access_token>`` automatically. 67 PlatformTeams: Platform-managed Microsoft Teams auth. 68 ApiKey: A single API key provided by the user. 69 Basic: Username/password (HTTP Basic) credentials. 70 Custom: Free-form credential fields defined by the integration's 71 ``config.json`` auth schema. The integration is responsible for 72 reading individual fields from ``context.auth``. 73 """ 74 PlatformOauth2 = "PlatformOauth2" 75 PlatformTeams = "PlatformTeams" 76 ApiKey = "ApiKey" 77 Basic = "Basic" 78 Custom = "Custom"
Authentication strategy used by an integration.
The platform stores the auth type alongside credentials and passes both
to ExecutionContext. context.fetch() uses the type to decide
whether to auto-inject an Authorization header.
Members:
PlatformOauth2: Platform-managed OAuth 2.0 — the platform handles the
token lifecycle and injects Bearer <access_token> automatically.
PlatformTeams: Platform-managed Microsoft Teams auth.
ApiKey: A single API key provided by the user.
Basic: Username/password (HTTP Basic) credentials.
Custom: Free-form credential fields defined by the integration's
config.json auth schema. The integration is responsible for
reading individual fields from context.auth.
80class ResultType(Enum): 81 """Type of result being returned""" 82 ACTION = "action" 83 ACTION_ERROR = "action_error" 84 CONNECTED_ACCOUNT = "connected_account" 85 ERROR = "error" 86 VALIDATION_ERROR = "validation_error"
Type of result being returned
89class ValidationError(Exception): 90 """Raised when SDK validation fails. 91 92 This covers several cases: 93 94 - Action inputs don't match the ``input_schema`` in ``config.json`` 95 - Action outputs don't match the ``output_schema`` 96 - Auth credentials don't match the ``auth.fields`` schema 97 - An action handler returns something other than ``ActionResult`` 98 - A handler name isn't registered 99 """ 100 def __init__(self, message: str, schema: str = None, inputs: str = None, source: str = "legacy"): 101 self.schema = schema 102 """The schema that failed validation""" 103 self.inputs = inputs 104 """The data that failed validation""" 105 self.message = message 106 """The error message""" 107 self.source = source 108 """Where the validation failed: 'input', 'output', or 'legacy' (pre-versioning default)""" 109 super().__init__(message)
Raised when SDK validation fails.
This covers several cases:
- Action inputs don't match the
input_schemainconfig.json - Action outputs don't match the
output_schema - Auth credentials don't match the
auth.fieldsschema - An action handler returns something other than
ActionResult - A handler name isn't registered
100 def __init__(self, message: str, schema: str = None, inputs: str = None, source: str = "legacy"): 101 self.schema = schema 102 """The schema that failed validation""" 103 self.inputs = inputs 104 """The data that failed validation""" 105 self.message = message 106 """The error message""" 107 self.source = source 108 """Where the validation failed: 'input', 'output', or 'legacy' (pre-versioning default)""" 109 super().__init__(message)
111class ConfigurationError(Exception): 112 """Raised when integration configuration is invalid""" 113 pass
Raised when integration configuration is invalid
115class HTTPError(Exception): 116 """Raised by ``ExecutionContext.fetch()`` for non-2xx HTTP responses (except 429).""" 117 def __init__(self, status: int, message: str, response_data: Any = None): 118 self.status = status 119 """Status code""" 120 self.message = message 121 """Error message""" 122 self.response_data = response_data 123 """Response data""" 124 super().__init__(f"HTTP {status}: {message}")
Raised by ExecutionContext.fetch() for non-2xx HTTP responses (except 429).
126class RateLimitError(HTTPError): 127 """Raised by ``ExecutionContext.fetch()`` on HTTP 429 (Too Many Requests). 128 129 Attributes: 130 retry_after: Seconds to wait before retrying, taken from the 131 ``Retry-After`` response header (defaults to 60 if absent). 132 """ 133 def __init__(self, retry_after: int, *args, **kwargs): 134 self.retry_after = retry_after 135 """Seconds to wait before retrying.""" 136 super().__init__(*args, **kwargs)
Raised by ExecutionContext.fetch() on HTTP 429 (Too Many Requests).
Attributes:
retry_after: Seconds to wait before retrying, taken from the
Retry-After response header (defaults to 60 if absent).
Inherited Members
139@dataclass 140class FetchResponse: 141 """Response object returned by ``ExecutionContext.fetch()``. 142 143 Wraps the full HTTP response so callers can inspect status codes and 144 headers in addition to the parsed body. 145 146 Attributes: 147 status: HTTP status code (e.g. ``200``, ``201``). 148 headers: Response headers as a plain ``dict``. 149 data: Parsed JSON (``dict``/``list``) when the response is 150 ``application/json``, otherwise the raw response text. 151 ``None`` for empty 200/201/204 responses. 152 """ 153 status: int 154 headers: Dict[str, str] 155 data: Any
Response object returned by ExecutionContext.fetch().
Wraps the full HTTP response so callers can inspect status codes and headers in addition to the parsed body.
Attributes:
status: HTTP status code (e.g. 200, 201).
headers: Response headers as a plain dict.
data: Parsed JSON (dict/list) when the response is
application/json, otherwise the raw response text.
None for empty 200/201/204 responses.
157@dataclass 158class ActionResult: 159 """Result returned by action handlers. 160 161 This class encapsulates the data returned by an action along with optional 162 billing information for cost tracking. 163 164 Args: 165 data: The actual result data from the action 166 cost_usd: Optional USD cost for billing purposes 167 168 Example: 169 ```python 170 return ActionResult( 171 data={"message": "Success", "result": 42}, 172 cost_usd=0.05 173 ) 174 ``` 175 """ 176 data: Any 177 cost_usd: Optional[float] = None
Result returned by action handlers.
This class encapsulates the data returned by an action along with optional billing information for cost tracking.
Args: data: The actual result data from the action cost_usd: Optional USD cost for billing purposes
Example:
return ActionResult(
data={"message": "Success", "result": 42},
cost_usd=0.05
)
179@dataclass 180class ActionError: 181 """Error result returned by action handlers for expected/application-level errors. 182 183 When returned from an action handler, output schema validation is skipped 184 and the error is returned to the caller as a ResultType.ERROR result. 185 186 Args: 187 message: Human-readable error message 188 cost_usd: Optional USD cost incurred before the error occurred 189 190 Example: 191 ```python 192 return ActionError( 193 message="User not found", 194 cost_usd=0.01 195 ) 196 ``` 197 """ 198 message: str 199 cost_usd: Optional[float] = None
Error result returned by action handlers for expected/application-level errors.
When returned from an action handler, output schema validation is skipped and the error is returned to the caller as a ResultType.ERROR result.
Args: message: Human-readable error message cost_usd: Optional USD cost incurred before the error occurred
Example:
return ActionError(
message="User not found",
cost_usd=0.01
)
201@dataclass 202class ConnectedAccountInfo: 203 """Account metadata returned by a ``ConnectedAccountHandler``. 204 205 The platform calls the connected-account handler after a user links 206 their external account. The returned info is displayed in the 207 Autohive UI (avatar, name, email, etc.). 208 209 All fields are optional — populate whichever ones the external API provides. 210 """ 211 email: Optional[str] = None 212 first_name: Optional[str] = None 213 last_name: Optional[str] = None 214 username: Optional[str] = None 215 user_id: Optional[str] = None 216 avatar_url: Optional[str] = None 217 organization: Optional[str] = None
Account metadata returned by a ConnectedAccountHandler.
The platform calls the connected-account handler after a user links their external account. The returned info is displayed in the Autohive UI (avatar, name, email, etc.).
All fields are optional — populate whichever ones the external API provides.
219@dataclass 220class IntegrationResult: 221 """Result format sent from lambda wrapper to backend. 222 223 This class represents the standardized format that the lambda wrapper 224 sends to the Autohive backend, including SDK version and type-specific data. 225 226 Args: 227 version: SDK version (auto-populated) 228 type: Type of result payload (ResultType enum: ACTION, CONNECTED_ACCOUNT, ERROR) 229 result: The result object - ActionResult for actions, ActionError for 230 application-level action errors, or ConnectedAccountInfo for 231 connected accounts. 232 The lambda wrapper serializes these to dicts using asdict(). 233 234 Note: 235 This type is returned by Integration methods and serialized by the lambda wrapper. 236 Integration developers should use ActionResult for action handlers and 237 ActionError for expected error conditions. 238 """ 239 version: str 240 type: ResultType 241 result: Union[ActionResult, ActionError, ConnectedAccountInfo]
Result format sent from lambda wrapper to backend.
This class represents the standardized format that the lambda wrapper sends to the Autohive backend, including SDK version and type-specific data.
Args: version: SDK version (auto-populated) type: Type of result payload (ResultType enum: ACTION, CONNECTED_ACCOUNT, ERROR) result: The result object - ActionResult for actions, ActionError for application-level action errors, or ConnectedAccountInfo for connected accounts. The lambda wrapper serializes these to dicts using asdict().
Note: This type is returned by Integration methods and serialized by the lambda wrapper. Integration developers should use ActionResult for action handlers and ActionError for expected error conditions.
245@dataclass 246class Parameter: 247 """Definition of a parameter""" 248 name: str 249 type: str 250 description: str 251 enum: Optional[List[str]] = None 252 required: bool = True 253 default: Any = None
Definition of a parameter
255@dataclass 256class SchemaDefinition: 257 """Base class for components that have input/output schemas""" 258 name: str 259 description: str 260 input_schema: List[Parameter] 261 output_schema: Optional[Dict[str, Any]] = None
Base class for components that have input/output schemas
263@dataclass 264class Action(SchemaDefinition): 265 """Empty dataclass that inherits from SchemaDefinition""" 266 pass
Empty dataclass that inherits from SchemaDefinition
Inherited Members
268@dataclass 269class PollingTrigger(SchemaDefinition): 270 """Definition of a polling trigger""" 271 polling_interval: timedelta = field(default_factory=timedelta)
Definition of a polling trigger
Inherited Members
273@dataclass 274class IntegrationConfig: 275 """Configuration for an integration""" 276 name: str 277 version: str 278 description: str 279 auth: Dict[str, Any] 280 actions: Dict[str, Action] 281 polling_triggers: Dict[str, PollingTrigger]
Configuration for an integration
284class ActionHandler(ABC): 285 """Base class for action handlers. 286 287 Subclass this and implement ``execute()`` to handle a specific action. 288 Register it with the ``@integration.action("action_name")`` decorator. 289 290 Example:: 291 292 @integration.action("get_user") 293 class GetUser(ActionHandler): 294 async def execute(self, inputs, context): 295 user = (await context.fetch(f"https://api.example.com/users/{inputs['id']}")).data 296 return ActionResult(data=user) 297 """ 298 @abstractmethod 299 async def execute(self, inputs: Dict[str, Any], context: 'ExecutionContext') -> Any: 300 """Run the action logic. 301 302 Args: 303 inputs: Validated action inputs matching the ``input_schema`` from ``config.json``. 304 context: Execution context providing ``fetch()``, ``auth``, and logging. 305 306 Returns: 307 An ``ActionResult`` containing the output data and optional ``cost_usd``. 308 """ 309 pass
Base class for action handlers.
Subclass this and implement execute() to handle a specific action.
Register it with the @integration.action("action_name") decorator.
Example::
@integration.action("get_user")
class GetUser(ActionHandler):
async def execute(self, inputs, context):
user = (await context.fetch(f"https://api.example.com/users/{inputs['id']}")).data
return ActionResult(data=user)
298 @abstractmethod 299 async def execute(self, inputs: Dict[str, Any], context: 'ExecutionContext') -> Any: 300 """Run the action logic. 301 302 Args: 303 inputs: Validated action inputs matching the ``input_schema`` from ``config.json``. 304 context: Execution context providing ``fetch()``, ``auth``, and logging. 305 306 Returns: 307 An ``ActionResult`` containing the output data and optional ``cost_usd``. 308 """ 309 pass
Run the action logic.
Args:
inputs: Validated action inputs matching the input_schema from config.json.
context: Execution context providing fetch(), auth, and logging.
Returns:
An ActionResult containing the output data and optional cost_usd.
311class PollingTriggerHandler(ABC): 312 """Base class for polling trigger handlers""" 313 @abstractmethod 314 async def poll(self, inputs: Dict[str, Any], last_poll_ts: Optional[str], context: 'ExecutionContext') -> List[Dict[str, Any]]: 315 """Execute the polling trigger""" 316 pass
Base class for polling trigger handlers
313 @abstractmethod 314 async def poll(self, inputs: Dict[str, Any], last_poll_ts: Optional[str], context: 'ExecutionContext') -> List[Dict[str, Any]]: 315 """Execute the polling trigger""" 316 pass
Execute the polling trigger
318class ConnectedAccountHandler(ABC): 319 """Base class for connected-account handlers. 320 321 The platform calls this after a user links their external account. 322 The returned ``ConnectedAccountInfo`` is shown in the Autohive UI. 323 324 Register with the ``@integration.connected_account()`` decorator. 325 326 Example:: 327 328 @integration.connected_account() 329 class MyAccountHandler(ConnectedAccountHandler): 330 async def get_account_info(self, context): 331 me = (await context.fetch("https://api.example.com/me")).data 332 return ConnectedAccountInfo( 333 email=me["email"], 334 first_name=me["first_name"], 335 last_name=me["last_name"], 336 ) 337 """ 338 @abstractmethod 339 async def get_account_info(self, context: 'ExecutionContext') -> ConnectedAccountInfo: 340 """Fetch account metadata from the external service. 341 342 For platform OAuth integrations, ``context.fetch()`` auto-injects 343 the Bearer token — no manual auth handling needed. 344 345 Returns: 346 A ``ConnectedAccountInfo`` with whichever fields the API provides. 347 """ 348 pass
Base class for connected-account handlers.
The platform calls this after a user links their external account.
The returned ConnectedAccountInfo is shown in the Autohive UI.
Register with the @integration.connected_account() decorator.
Example::
@integration.connected_account()
class MyAccountHandler(ConnectedAccountHandler):
async def get_account_info(self, context):
me = (await context.fetch("https://api.example.com/me")).data
return ConnectedAccountInfo(
email=me["email"],
first_name=me["first_name"],
last_name=me["last_name"],
)
338 @abstractmethod 339 async def get_account_info(self, context: 'ExecutionContext') -> ConnectedAccountInfo: 340 """Fetch account metadata from the external service. 341 342 For platform OAuth integrations, ``context.fetch()`` auto-injects 343 the Bearer token — no manual auth handling needed. 344 345 Returns: 346 A ``ConnectedAccountInfo`` with whichever fields the API provides. 347 """ 348 pass
Fetch account metadata from the external service.
For platform OAuth integrations, context.fetch() auto-injects
the Bearer token — no manual auth handling needed.
Returns:
A ConnectedAccountInfo with whichever fields the API provides.
351class ExecutionContext: 352 """Context provided to integration handlers for making authenticated HTTP requests. 353 354 Manages an ``aiohttp`` session with automatic retries, error handling, and 355 optional Bearer-token injection for platform OAuth integrations. 356 357 Use as an async context manager:: 358 359 async with ExecutionContext(auth=auth) as context: 360 result = await integration.execute_action("my_action", inputs, context) 361 362 Args: 363 auth: Authentication data. In **local tests** this is a flat dict 364 matching the ``auth.fields`` schema in ``config.json`` 365 (e.g. ``{"api_key": "..."}``). In **production** the platform 366 wraps credentials as ``{"auth_type": "...", "credentials": {...}}``. 367 request_config: Override default ``max_retries`` (3) and ``timeout`` (30 s). 368 metadata: Arbitrary metadata forwarded to handlers. 369 logger: Custom logger; falls back to ``logging.getLogger(__name__)``. 370 """ 371 def __init__( 372 self, 373 auth: Dict[str, Any] = {}, 374 request_config: Optional[Dict[str, Any]] = None, 375 metadata: Optional[Dict[str, Any]] = None, 376 logger: Optional[logging.Logger] = None 377 ): 378 self.auth = auth 379 """Authentication configuration""" 380 self.config = request_config or {"max_retries": 3, "timeout": 30} 381 """Request configuration""" 382 self.metadata = metadata or {} 383 """Additional metadata""" 384 self.logger = logger or logging.getLogger(__name__) 385 """Logger instance""" 386 self._session: Optional[aiohttp.ClientSession] = None 387 388 async def __aenter__(self): 389 if not self._session: 390 self._session = aiohttp.ClientSession() 391 return self 392 393 async def __aexit__(self, exc_type, exc_val, exc_tb): 394 if self._session: 395 await self._session.close() 396 self._session = None 397 398 async def fetch( 399 self, 400 url: str, 401 method: str = "GET", 402 params: Optional[Dict[str, Any]] = None, 403 data: Any = None, 404 json: Any = None, 405 headers: Optional[Dict[str, str]] = None, 406 content_type: Optional[str] = None, 407 timeout: Optional[int] = None, 408 retry_count: int = 0 409 ) -> FetchResponse: 410 """Make an HTTP request with automatic retries and error handling. 411 412 For **platform OAuth** integrations (``auth_type == "PlatformOauth2"``), 413 a ``Bearer`` token is auto-injected from ``auth.credentials.access_token`` 414 unless an ``Authorization`` header is explicitly provided. 415 416 Retries up to ``max_retries`` (default 3) on transient network errors 417 with exponential back-off. HTTP 429 responses raise ``RateLimitError`` 418 immediately (no automatic retry). 419 420 Args: 421 url: The URL to request. 422 method: HTTP method (``"GET"``, ``"POST"``, ``"PUT"``, etc.). 423 params: Query parameters appended to the URL. Nested dicts/lists 424 are JSON-serialized automatically. 425 data: Raw request body. Encoding depends on ``content_type``. 426 json: JSON-serializable payload. Sets ``content_type`` to 427 ``application/json`` automatically. 428 headers: Additional HTTP headers. Merged *after* any auto-injected 429 auth header, so an explicit ``Authorization`` takes precedence. 430 content_type: ``Content-Type`` header value. 431 timeout: Per-request timeout in seconds (overrides ``request_config``). 432 retry_count: Internal — current retry attempt number. 433 434 Returns: 435 A ``FetchResponse`` containing the HTTP status code, response 436 headers, and parsed body data. 437 438 Raises: 439 RateLimitError: On HTTP 429 with the ``Retry-After`` value. 440 HTTPError: On any other non-2xx status. 441 """ 442 if not self._session: 443 self._session = aiohttp.ClientSession() 444 445 # Prepare request 446 if json is not None: 447 data = json 448 content_type = "application/json" 449 450 final_headers = {} 451 452 if self.auth and "Authorization" not in (headers or {}): 453 auth_type = AuthType(self.auth.get("auth_type", "PlatformOauth2")) 454 credentials = self.auth.get("credentials", {}) 455 456 if auth_type == AuthType.PlatformOauth2 and "access_token" in credentials: 457 final_headers["Authorization"] = f"Bearer {credentials['access_token']}" 458 459 if content_type: 460 final_headers["Content-Type"] = content_type 461 if headers: 462 final_headers.update(headers) 463 464 if params: 465 # Handle nested dictionary parameters 466 flat_params = {} 467 for key, value in params.items(): 468 if isinstance(value, (dict, list)): 469 flat_params[key] = jsonX.dumps(value) 470 elif value is not None: 471 flat_params[key] = str(value) 472 query_string = urlencode(flat_params) 473 url = f"{url}{'&' if '?' in url else '?'}{query_string}" 474 475 # Prepare body 476 if data is not None: 477 if content_type == "application/json": 478 data = jsonX.dumps(data) 479 elif content_type == "application/x-www-form-urlencoded": 480 data = urlencode(data) if isinstance(data, dict) else data 481 482 # Store the original timeout numeric value 483 original_timeout = timeout or self.config["timeout"] 484 485 # Convert the numeric timeout to a ClientTimeout instance for this request 486 client_timeout = aiohttp.ClientTimeout(total=original_timeout) 487 488 try: 489 async with self._session.request( 490 method=method, 491 url=url, 492 data=data, 493 headers=final_headers, 494 timeout=client_timeout, 495 ssl=True 496 ) as response: 497 content_type = response.headers.get("Content-Type", "") 498 499 if response.status == 429: # Rate limit 500 retry_after = int(response.headers.get("Retry-After", 60)) 501 raise RateLimitError( 502 retry_after, 503 response.status, 504 "Rate limit exceeded", 505 await response.text() 506 ) 507 508 try: 509 if "application/json" in content_type: 510 result = await response.json() 511 else: 512 result = await response.text() 513 if not result and response.status in {200, 201, 204}: 514 result = None 515 except Exception as e: 516 self.logger.error(f"Error parsing response: {e}") 517 result = await response.text() 518 519 response_headers = dict(response.headers) 520 521 if not response.ok: 522 print(f"HTTP error encountered. Status: {response.status}. Result: {result}") 523 raise HTTPError(response.status, str(result), result) 524 525 return FetchResponse( 526 status=response.status, 527 headers=response_headers, 528 data=result, 529 ) 530 531 except RateLimitError: 532 raise 533 except (aiohttp.ClientError, asyncio.TimeoutError) as e: 534 # Don't want to send this to Raygun here because this will be retried. 535 print(f"Error encountered: {e}. Retry count: {retry_count}. Backing off.") 536 if retry_count < self.config["max_retries"]: 537 await asyncio.sleep(2 ** retry_count) # Exponential backoff 538 print("Retrying request...") 539 # Use original_timeout (numeric) for recursive calls 540 return await self.fetch( 541 url, method, params, data, json, 542 headers, content_type, original_timeout, retry_count + 1 543 ) 544 else: 545 print("Max retries reached. Raising error.") 546 raise 547 except Exception as e: 548 self.logger.error(f"Unexpected error during {method} {url}: {e}") 549 print(f"Unexpected error encountered: {e}") 550 raise
Context provided to integration handlers for making authenticated HTTP requests.
Manages an aiohttp session with automatic retries, error handling, and
optional Bearer-token injection for platform OAuth integrations.
Use as an async context manager::
async with ExecutionContext(auth=auth) as context:
result = await integration.execute_action("my_action", inputs, context)
Args:
auth: Authentication data. In local tests this is a flat dict
matching the auth.fields schema in config.json
(e.g. {"api_key": "..."}). In production the platform
wraps credentials as {"auth_type": "...", "credentials": {...}}.
request_config: Override default max_retries (3) and timeout (30 s).
metadata: Arbitrary metadata forwarded to handlers.
logger: Custom logger; falls back to logging.getLogger(__name__).
371 def __init__( 372 self, 373 auth: Dict[str, Any] = {}, 374 request_config: Optional[Dict[str, Any]] = None, 375 metadata: Optional[Dict[str, Any]] = None, 376 logger: Optional[logging.Logger] = None 377 ): 378 self.auth = auth 379 """Authentication configuration""" 380 self.config = request_config or {"max_retries": 3, "timeout": 30} 381 """Request configuration""" 382 self.metadata = metadata or {} 383 """Additional metadata""" 384 self.logger = logger or logging.getLogger(__name__) 385 """Logger instance""" 386 self._session: Optional[aiohttp.ClientSession] = None
398 async def fetch( 399 self, 400 url: str, 401 method: str = "GET", 402 params: Optional[Dict[str, Any]] = None, 403 data: Any = None, 404 json: Any = None, 405 headers: Optional[Dict[str, str]] = None, 406 content_type: Optional[str] = None, 407 timeout: Optional[int] = None, 408 retry_count: int = 0 409 ) -> FetchResponse: 410 """Make an HTTP request with automatic retries and error handling. 411 412 For **platform OAuth** integrations (``auth_type == "PlatformOauth2"``), 413 a ``Bearer`` token is auto-injected from ``auth.credentials.access_token`` 414 unless an ``Authorization`` header is explicitly provided. 415 416 Retries up to ``max_retries`` (default 3) on transient network errors 417 with exponential back-off. HTTP 429 responses raise ``RateLimitError`` 418 immediately (no automatic retry). 419 420 Args: 421 url: The URL to request. 422 method: HTTP method (``"GET"``, ``"POST"``, ``"PUT"``, etc.). 423 params: Query parameters appended to the URL. Nested dicts/lists 424 are JSON-serialized automatically. 425 data: Raw request body. Encoding depends on ``content_type``. 426 json: JSON-serializable payload. Sets ``content_type`` to 427 ``application/json`` automatically. 428 headers: Additional HTTP headers. Merged *after* any auto-injected 429 auth header, so an explicit ``Authorization`` takes precedence. 430 content_type: ``Content-Type`` header value. 431 timeout: Per-request timeout in seconds (overrides ``request_config``). 432 retry_count: Internal — current retry attempt number. 433 434 Returns: 435 A ``FetchResponse`` containing the HTTP status code, response 436 headers, and parsed body data. 437 438 Raises: 439 RateLimitError: On HTTP 429 with the ``Retry-After`` value. 440 HTTPError: On any other non-2xx status. 441 """ 442 if not self._session: 443 self._session = aiohttp.ClientSession() 444 445 # Prepare request 446 if json is not None: 447 data = json 448 content_type = "application/json" 449 450 final_headers = {} 451 452 if self.auth and "Authorization" not in (headers or {}): 453 auth_type = AuthType(self.auth.get("auth_type", "PlatformOauth2")) 454 credentials = self.auth.get("credentials", {}) 455 456 if auth_type == AuthType.PlatformOauth2 and "access_token" in credentials: 457 final_headers["Authorization"] = f"Bearer {credentials['access_token']}" 458 459 if content_type: 460 final_headers["Content-Type"] = content_type 461 if headers: 462 final_headers.update(headers) 463 464 if params: 465 # Handle nested dictionary parameters 466 flat_params = {} 467 for key, value in params.items(): 468 if isinstance(value, (dict, list)): 469 flat_params[key] = jsonX.dumps(value) 470 elif value is not None: 471 flat_params[key] = str(value) 472 query_string = urlencode(flat_params) 473 url = f"{url}{'&' if '?' in url else '?'}{query_string}" 474 475 # Prepare body 476 if data is not None: 477 if content_type == "application/json": 478 data = jsonX.dumps(data) 479 elif content_type == "application/x-www-form-urlencoded": 480 data = urlencode(data) if isinstance(data, dict) else data 481 482 # Store the original timeout numeric value 483 original_timeout = timeout or self.config["timeout"] 484 485 # Convert the numeric timeout to a ClientTimeout instance for this request 486 client_timeout = aiohttp.ClientTimeout(total=original_timeout) 487 488 try: 489 async with self._session.request( 490 method=method, 491 url=url, 492 data=data, 493 headers=final_headers, 494 timeout=client_timeout, 495 ssl=True 496 ) as response: 497 content_type = response.headers.get("Content-Type", "") 498 499 if response.status == 429: # Rate limit 500 retry_after = int(response.headers.get("Retry-After", 60)) 501 raise RateLimitError( 502 retry_after, 503 response.status, 504 "Rate limit exceeded", 505 await response.text() 506 ) 507 508 try: 509 if "application/json" in content_type: 510 result = await response.json() 511 else: 512 result = await response.text() 513 if not result and response.status in {200, 201, 204}: 514 result = None 515 except Exception as e: 516 self.logger.error(f"Error parsing response: {e}") 517 result = await response.text() 518 519 response_headers = dict(response.headers) 520 521 if not response.ok: 522 print(f"HTTP error encountered. Status: {response.status}. Result: {result}") 523 raise HTTPError(response.status, str(result), result) 524 525 return FetchResponse( 526 status=response.status, 527 headers=response_headers, 528 data=result, 529 ) 530 531 except RateLimitError: 532 raise 533 except (aiohttp.ClientError, asyncio.TimeoutError) as e: 534 # Don't want to send this to Raygun here because this will be retried. 535 print(f"Error encountered: {e}. Retry count: {retry_count}. Backing off.") 536 if retry_count < self.config["max_retries"]: 537 await asyncio.sleep(2 ** retry_count) # Exponential backoff 538 print("Retrying request...") 539 # Use original_timeout (numeric) for recursive calls 540 return await self.fetch( 541 url, method, params, data, json, 542 headers, content_type, original_timeout, retry_count + 1 543 ) 544 else: 545 print("Max retries reached. Raising error.") 546 raise 547 except Exception as e: 548 self.logger.error(f"Unexpected error during {method} {url}: {e}") 549 print(f"Unexpected error encountered: {e}") 550 raise
Make an HTTP request with automatic retries and error handling.
For platform OAuth integrations (auth_type == "PlatformOauth2"),
a Bearer token is auto-injected from auth.credentials.access_token
unless an Authorization header is explicitly provided.
Retries up to max_retries (default 3) on transient network errors
with exponential back-off. HTTP 429 responses raise RateLimitError
immediately (no automatic retry).
Args:
url: The URL to request.
method: HTTP method ("GET", "POST", "PUT", etc.).
params: Query parameters appended to the URL. Nested dicts/lists
are JSON-serialized automatically.
data: Raw request body. Encoding depends on content_type.
json: JSON-serializable payload. Sets content_type to
application/json automatically.
headers: Additional HTTP headers. Merged after any auto-injected
auth header, so an explicit Authorization takes precedence.
content_type: Content-Type header value.
timeout: Per-request timeout in seconds (overrides request_config).
retry_count: Internal — current retry attempt number.
Returns:
A FetchResponse containing the HTTP status code, response
headers, and parsed body data.
Raises:
RateLimitError: On HTTP 429 with the Retry-After value.
HTTPError: On any other non-2xx status.
553class Integration: 554 """Base integration class with handler registration and execution. 555 556 This class manages the integration configuration, handler registration, 557 and provides methods to execute actions and triggers. 558 559 Args: 560 config: Integration configuration 561 562 Attributes: 563 config: Integration configuration 564 """ 565 566 def __init__(self, config: IntegrationConfig): 567 self.config = config 568 """Integration configuration""" 569 self._action_handlers: Dict[str, Type[ActionHandler]] = {} 570 """Action handlers""" 571 self._polling_handlers: Dict[str, Type[PollingTriggerHandler]] = {} 572 """Polling handlers""" 573 self._connected_account_handler: Optional[Type[ConnectedAccountHandler]] = None 574 """Connected account handler""" 575 576 @classmethod 577 def load(cls, config_path: Union[str, Path] = None) -> 'Integration': 578 """Load an integration from its ``config.json``. 579 580 Args: 581 config_path: Explicit path to ``config.json``. When omitted the 582 SDK resolves the path relative to its own package location, 583 which works when the SDK is vendored via 584 ``pip install --target dependencies``. Multi-file integrations 585 that use ``actions/`` sub-packages should pass an explicit path 586 (e.g. ``Integration.load("config.json")``). 587 588 Returns: 589 A fully initialised ``Integration`` ready for handler registration. 590 591 Raises: 592 ConfigurationError: If the file is missing or contains invalid JSON. 593 """ 594 if config_path is None: 595 config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'config.json') 596 597 config_path = Path(config_path) 598 599 if not config_path.exists(): 600 raise ConfigurationError(f"Configuration file not found: {config_path}") 601 602 try: 603 with open(config_path, 'r') as f: 604 config_data = json.load(f) 605 except json.JSONDecodeError as e: 606 raise ConfigurationError(f"Invalid JSON configuration: {e}") 607 608 # Parse configuration sections 609 actions = cls._parse_actions(config_data.get("actions", {})) 610 polling_triggers = cls._parse_polling_triggers(config_data.get("polling_triggers", {})) 611 612 config = IntegrationConfig( 613 name=config_data["name"], 614 version=config_data["version"], 615 description=config_data["description"], 616 auth=config_data.get("auth", {}), 617 actions=actions, 618 polling_triggers=polling_triggers 619 ) 620 621 return cls(config) 622 623 @staticmethod 624 def _parse_interval(interval_str: str) -> timedelta: 625 """Parse interval string into timedelta""" 626 unit = interval_str[-1].lower() 627 value = int(interval_str[:-1]) 628 629 if unit == 's': 630 return timedelta(seconds=value) 631 elif unit == 'm': 632 return timedelta(minutes=value) 633 elif unit == 'h': 634 return timedelta(hours=value) 635 elif unit == 'd': 636 return timedelta(days=value) 637 else: 638 raise ConfigurationError(f"Invalid interval format: {interval_str}") 639 640 @classmethod 641 def _parse_actions(cls, actions_config: Dict[str, Any]) -> Dict[str, Action]: 642 """Parse action configurations""" 643 actions = {} 644 for name, data in actions_config.items(): 645 actions[name] = Action( 646 name=name, 647 description=data["description"], 648 input_schema=data["input_schema"], 649 output_schema=data["output_schema"] 650 ) 651 652 return actions 653 654 @classmethod 655 def _parse_polling_triggers(cls, triggers_config: Dict[str, Any]) -> Dict[str, PollingTrigger]: 656 """Parse polling trigger configurations""" 657 triggers = {} 658 for name, data in triggers_config.items(): 659 interval = cls._parse_interval(data["polling_interval"]) 660 661 triggers[name] = PollingTrigger( 662 name=name, 663 description=data["description"], 664 polling_interval=interval, 665 input_schema=data["input_schema"], 666 output_schema=data["output_schema"] 667 ) 668 669 return triggers 670 671 def action(self, name: str): 672 """Decorator to register an action handler. 673 674 Args: 675 name: Name of the action to register 676 677 Returns: 678 Decorator function 679 680 Raises: 681 ConfigurationError: If action is not defined in config 682 683 Example: 684 ```python 685 @integration.action("my_action") 686 class MyActionHandler(ActionHandler): 687 async def execute(self, inputs, context): 688 # Implementation 689 return result 690 ``` 691 """ 692 def decorator(handler_class: Type[ActionHandler]): 693 if name not in self.config.actions: 694 raise ConfigurationError(f"Action '{name}' not defined in config") 695 self._action_handlers[name] = handler_class 696 return handler_class 697 return decorator 698 699 def polling_trigger(self, name: str): 700 """Decorator to register a polling trigger handler 701 702 Args: 703 name: Name of the polling trigger to register 704 705 Returns: 706 Decorator function 707 708 Raises: 709 ConfigurationError: If polling trigger is not defined in config 710 711 Example: 712 ```python 713 @integration.polling_trigger("my_polling_trigger") 714 class MyPollingTriggerHandler(PollingTriggerHandler): 715 async def poll(self, inputs, last_poll_ts, context): 716 # Implementation 717 return result 718 ``` 719 """ 720 def decorator(handler_class: Type[PollingTriggerHandler]): 721 if name not in self.config.polling_triggers: 722 raise ConfigurationError(f"Polling trigger '{name}' not defined in config") 723 self._polling_handlers[name] = handler_class 724 return handler_class 725 return decorator 726 727 def connected_account(self): 728 """Decorator to register a connected account handler 729 730 Returns: 731 Decorator function 732 733 Example: 734 ```python 735 @integration.connected_account() 736 class MyConnectedAccountHandler(ConnectedAccountHandler): 737 async def get_account_info(self, context): 738 # Implementation 739 return {"email": "user@example.com", "name": "John Doe"} 740 ``` 741 """ 742 def decorator(handler_class: Type[ConnectedAccountHandler]): 743 self._connected_account_handler = handler_class 744 return handler_class 745 return decorator 746 747 async def execute_action(self, 748 name: str, 749 inputs: Dict[str, Any], 750 context: ExecutionContext) -> IntegrationResult: 751 """Execute a registered action. 752 753 Args: 754 name: Name of the action to execute 755 inputs: Action inputs 756 context: Execution context 757 758 Returns: 759 IntegrationResult with action data (ResultType.ACTION), 760 action error (ResultType.ACTION_ERROR) if the handler returned ActionError, 761 or validation error (ResultType.VALIDATION_ERROR) if schema validation fails. 762 """ 763 try: 764 if name not in self._action_handlers: 765 raise ValidationError(f"Action '{name}' not registered") 766 767 # Validate inputs against action schema 768 action_config = self.config.actions[name] 769 validator = Draft7Validator(action_config.input_schema) 770 errors = sorted(validator.iter_errors(inputs), key=lambda e: e.path) 771 if errors: 772 message = "" 773 for error in errors: 774 message += f"{list(error.schema_path)}, {error.message},\n " 775 raise ValidationError(message, action_config.input_schema, inputs, source="input") 776 777 if "fields" in self.config.auth: 778 auth_config = self.config.auth["fields"] 779 validator = Draft7Validator(auth_config) 780 errors = sorted(validator.iter_errors(context.auth), key=lambda e: e.path) 781 if errors: 782 message = "" 783 for error in errors: 784 message += f"{list(error.schema_path)}, {error.message},\n " 785 raise ValidationError(message, auth_config, context.auth, source="input") 786 787 # Create handler instance and execute 788 handler = self._action_handlers[name]() 789 result = await handler.execute(inputs, context) 790 791 # Handle ActionError - skip output schema validation 792 if isinstance(result, ActionError): 793 return IntegrationResult( 794 version=__version__, 795 type=ResultType.ACTION_ERROR, 796 result=result 797 ) 798 799 # Validate that result is ActionResult 800 if not isinstance(result, ActionResult): 801 raise ValidationError( 802 f"Action handler '{name}' must return ActionResult or ActionError, got {type(result).__name__}", 803 source="output" 804 ) 805 806 # Validate output schema against the data inside ActionResult 807 validator = Draft7Validator(action_config.output_schema) 808 errors = sorted(validator.iter_errors(result.data), key=lambda e: e.path) 809 if errors: 810 message = "" 811 for error in errors: 812 message += f"{list(error.schema_path)}, {error.message},\n " 813 raise ValidationError(message, action_config.output_schema, result.data, source="output") 814 815 # Return IntegrationResult with ActionResult directly 816 return IntegrationResult( 817 version=__version__, 818 type=ResultType.ACTION, 819 result=result 820 ) 821 except ValidationError as e: 822 return IntegrationResult( 823 version=__version__, 824 type=ResultType.VALIDATION_ERROR, 825 result={ 826 'message': str(e), 827 'property': None, 828 'value': None, 829 'source': getattr(e, 'source', 'legacy') 830 } 831 ) 832 833 async def execute_polling_trigger(self, 834 name: str, 835 inputs: Dict[str, Any], 836 last_poll_ts: Optional[str], 837 context: ExecutionContext) -> List[Dict[str, Any]]: 838 """Execute a registered polling trigger 839 840 Args: 841 name: Name of the polling trigger to execute 842 inputs: Trigger inputs 843 last_poll_ts: Last poll timestamp 844 context: Execution context 845 846 Returns: 847 List of records 848 849 Raises: 850 ValidationError: If inputs or outputs don't match schema 851 """ 852 if name not in self._polling_handlers: 853 raise ValidationError(f"Polling trigger '{name}' not registered") 854 855 # Validate trigger configuration 856 trigger_config = self.config.polling_triggers[name] 857 try: 858 validate(inputs, trigger_config.input_schema) 859 except Exception as e: 860 raise ValidationError(e.message, e.schema, e.instance) 861 862 try: 863 auth_config = self.config.auth["fields"] 864 validate(context.auth, auth_config) 865 except Exception as e: 866 raise ValidationError(e.message, e.schema, e.instance) 867 868 # Create handler instance and execute 869 handler = self._polling_handlers[name]() 870 records = await handler.poll(inputs, last_poll_ts, context) 871 # Validate each record 872 for record in records: 873 if "id" not in record: 874 raise ValidationError( 875 f"Polling trigger '{name}' returned record without required 'id' field") 876 if "data" not in record: 877 raise ValidationError( 878 f"Polling trigger '{name}' returned record without required 'data' field") 879 880 # Validate record data against output schema 881 try: 882 validate(record["data"], trigger_config.output_schema) 883 except Exception as e: 884 raise ValidationError(e.message, e.schema, e.instance) 885 886 return records 887 888 async def get_connected_account(self, context: ExecutionContext) -> IntegrationResult: 889 """Get connected account information 890 891 Args: 892 context: Execution context 893 894 Returns: 895 IntegrationResult containing connected account data 896 897 Raises: 898 ValidationError: If no connected account handler is registered or auth is invalid 899 """ 900 if not self._connected_account_handler: 901 raise ValidationError("No connected account handler registered") 902 903 if "fields" in self.config.auth: 904 auth_config = self.config.auth["fields"] 905 validator = Draft7Validator(auth_config) 906 errors = sorted(validator.iter_errors(context.auth), key=lambda e: e.path) 907 if errors: 908 message = "" 909 for error in errors: 910 message += f"{list(error.schema_path)}, {error.message},\n " 911 raise ValidationError(message, auth_config, context.auth) 912 913 handler = self._connected_account_handler() 914 account_info = await handler.get_account_info(context) 915 916 if not isinstance(account_info, ConnectedAccountInfo): 917 raise ValidationError( 918 f"Connected account handler must return ConnectedAccountInfo, got {type(account_info).__name__}" 919 ) 920 921 # Return IntegrationResult with ConnectedAccountInfo object directly 922 return IntegrationResult( 923 version=__version__, 924 type=ResultType.CONNECTED_ACCOUNT, 925 result=account_info 926 )
Base integration class with handler registration and execution.
This class manages the integration configuration, handler registration, and provides methods to execute actions and triggers.
Args: config: Integration configuration
Attributes: config: Integration configuration
566 def __init__(self, config: IntegrationConfig): 567 self.config = config 568 """Integration configuration""" 569 self._action_handlers: Dict[str, Type[ActionHandler]] = {} 570 """Action handlers""" 571 self._polling_handlers: Dict[str, Type[PollingTriggerHandler]] = {} 572 """Polling handlers""" 573 self._connected_account_handler: Optional[Type[ConnectedAccountHandler]] = None 574 """Connected account handler"""
576 @classmethod 577 def load(cls, config_path: Union[str, Path] = None) -> 'Integration': 578 """Load an integration from its ``config.json``. 579 580 Args: 581 config_path: Explicit path to ``config.json``. When omitted the 582 SDK resolves the path relative to its own package location, 583 which works when the SDK is vendored via 584 ``pip install --target dependencies``. Multi-file integrations 585 that use ``actions/`` sub-packages should pass an explicit path 586 (e.g. ``Integration.load("config.json")``). 587 588 Returns: 589 A fully initialised ``Integration`` ready for handler registration. 590 591 Raises: 592 ConfigurationError: If the file is missing or contains invalid JSON. 593 """ 594 if config_path is None: 595 config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'config.json') 596 597 config_path = Path(config_path) 598 599 if not config_path.exists(): 600 raise ConfigurationError(f"Configuration file not found: {config_path}") 601 602 try: 603 with open(config_path, 'r') as f: 604 config_data = json.load(f) 605 except json.JSONDecodeError as e: 606 raise ConfigurationError(f"Invalid JSON configuration: {e}") 607 608 # Parse configuration sections 609 actions = cls._parse_actions(config_data.get("actions", {})) 610 polling_triggers = cls._parse_polling_triggers(config_data.get("polling_triggers", {})) 611 612 config = IntegrationConfig( 613 name=config_data["name"], 614 version=config_data["version"], 615 description=config_data["description"], 616 auth=config_data.get("auth", {}), 617 actions=actions, 618 polling_triggers=polling_triggers 619 ) 620 621 return cls(config)
Load an integration from its config.json.
Args:
config_path: Explicit path to config.json. When omitted the
SDK resolves the path relative to its own package location,
which works when the SDK is vendored via
pip install --target dependencies. Multi-file integrations
that use actions/ sub-packages should pass an explicit path
(e.g. Integration.load("config.json")).
Returns:
A fully initialised Integration ready for handler registration.
Raises: ConfigurationError: If the file is missing or contains invalid JSON.
671 def action(self, name: str): 672 """Decorator to register an action handler. 673 674 Args: 675 name: Name of the action to register 676 677 Returns: 678 Decorator function 679 680 Raises: 681 ConfigurationError: If action is not defined in config 682 683 Example: 684 ```python 685 @integration.action("my_action") 686 class MyActionHandler(ActionHandler): 687 async def execute(self, inputs, context): 688 # Implementation 689 return result 690 ``` 691 """ 692 def decorator(handler_class: Type[ActionHandler]): 693 if name not in self.config.actions: 694 raise ConfigurationError(f"Action '{name}' not defined in config") 695 self._action_handlers[name] = handler_class 696 return handler_class 697 return decorator
Decorator to register an action handler.
Args: name: Name of the action to register
Returns: Decorator function
Raises: ConfigurationError: If action is not defined in config
Example:
@integration.action("my_action")
class MyActionHandler(ActionHandler):
async def execute(self, inputs, context):
# Implementation
return result
699 def polling_trigger(self, name: str): 700 """Decorator to register a polling trigger handler 701 702 Args: 703 name: Name of the polling trigger to register 704 705 Returns: 706 Decorator function 707 708 Raises: 709 ConfigurationError: If polling trigger is not defined in config 710 711 Example: 712 ```python 713 @integration.polling_trigger("my_polling_trigger") 714 class MyPollingTriggerHandler(PollingTriggerHandler): 715 async def poll(self, inputs, last_poll_ts, context): 716 # Implementation 717 return result 718 ``` 719 """ 720 def decorator(handler_class: Type[PollingTriggerHandler]): 721 if name not in self.config.polling_triggers: 722 raise ConfigurationError(f"Polling trigger '{name}' not defined in config") 723 self._polling_handlers[name] = handler_class 724 return handler_class 725 return decorator
Decorator to register a polling trigger handler
Args: name: Name of the polling trigger to register
Returns: Decorator function
Raises: ConfigurationError: If polling trigger is not defined in config
Example:
@integration.polling_trigger("my_polling_trigger")
class MyPollingTriggerHandler(PollingTriggerHandler):
async def poll(self, inputs, last_poll_ts, context):
# Implementation
return result
727 def connected_account(self): 728 """Decorator to register a connected account handler 729 730 Returns: 731 Decorator function 732 733 Example: 734 ```python 735 @integration.connected_account() 736 class MyConnectedAccountHandler(ConnectedAccountHandler): 737 async def get_account_info(self, context): 738 # Implementation 739 return {"email": "user@example.com", "name": "John Doe"} 740 ``` 741 """ 742 def decorator(handler_class: Type[ConnectedAccountHandler]): 743 self._connected_account_handler = handler_class 744 return handler_class 745 return decorator
Decorator to register a connected account handler
Returns: Decorator function
Example:
@integration.connected_account()
class MyConnectedAccountHandler(ConnectedAccountHandler):
async def get_account_info(self, context):
# Implementation
return {"email": "user@example.com", "name": "John Doe"}
747 async def execute_action(self, 748 name: str, 749 inputs: Dict[str, Any], 750 context: ExecutionContext) -> IntegrationResult: 751 """Execute a registered action. 752 753 Args: 754 name: Name of the action to execute 755 inputs: Action inputs 756 context: Execution context 757 758 Returns: 759 IntegrationResult with action data (ResultType.ACTION), 760 action error (ResultType.ACTION_ERROR) if the handler returned ActionError, 761 or validation error (ResultType.VALIDATION_ERROR) if schema validation fails. 762 """ 763 try: 764 if name not in self._action_handlers: 765 raise ValidationError(f"Action '{name}' not registered") 766 767 # Validate inputs against action schema 768 action_config = self.config.actions[name] 769 validator = Draft7Validator(action_config.input_schema) 770 errors = sorted(validator.iter_errors(inputs), key=lambda e: e.path) 771 if errors: 772 message = "" 773 for error in errors: 774 message += f"{list(error.schema_path)}, {error.message},\n " 775 raise ValidationError(message, action_config.input_schema, inputs, source="input") 776 777 if "fields" in self.config.auth: 778 auth_config = self.config.auth["fields"] 779 validator = Draft7Validator(auth_config) 780 errors = sorted(validator.iter_errors(context.auth), key=lambda e: e.path) 781 if errors: 782 message = "" 783 for error in errors: 784 message += f"{list(error.schema_path)}, {error.message},\n " 785 raise ValidationError(message, auth_config, context.auth, source="input") 786 787 # Create handler instance and execute 788 handler = self._action_handlers[name]() 789 result = await handler.execute(inputs, context) 790 791 # Handle ActionError - skip output schema validation 792 if isinstance(result, ActionError): 793 return IntegrationResult( 794 version=__version__, 795 type=ResultType.ACTION_ERROR, 796 result=result 797 ) 798 799 # Validate that result is ActionResult 800 if not isinstance(result, ActionResult): 801 raise ValidationError( 802 f"Action handler '{name}' must return ActionResult or ActionError, got {type(result).__name__}", 803 source="output" 804 ) 805 806 # Validate output schema against the data inside ActionResult 807 validator = Draft7Validator(action_config.output_schema) 808 errors = sorted(validator.iter_errors(result.data), key=lambda e: e.path) 809 if errors: 810 message = "" 811 for error in errors: 812 message += f"{list(error.schema_path)}, {error.message},\n " 813 raise ValidationError(message, action_config.output_schema, result.data, source="output") 814 815 # Return IntegrationResult with ActionResult directly 816 return IntegrationResult( 817 version=__version__, 818 type=ResultType.ACTION, 819 result=result 820 ) 821 except ValidationError as e: 822 return IntegrationResult( 823 version=__version__, 824 type=ResultType.VALIDATION_ERROR, 825 result={ 826 'message': str(e), 827 'property': None, 828 'value': None, 829 'source': getattr(e, 'source', 'legacy') 830 } 831 )
Execute a registered action.
Args: name: Name of the action to execute inputs: Action inputs context: Execution context
Returns: IntegrationResult with action data (ResultType.ACTION), action error (ResultType.ACTION_ERROR) if the handler returned ActionError, or validation error (ResultType.VALIDATION_ERROR) if schema validation fails.
833 async def execute_polling_trigger(self, 834 name: str, 835 inputs: Dict[str, Any], 836 last_poll_ts: Optional[str], 837 context: ExecutionContext) -> List[Dict[str, Any]]: 838 """Execute a registered polling trigger 839 840 Args: 841 name: Name of the polling trigger to execute 842 inputs: Trigger inputs 843 last_poll_ts: Last poll timestamp 844 context: Execution context 845 846 Returns: 847 List of records 848 849 Raises: 850 ValidationError: If inputs or outputs don't match schema 851 """ 852 if name not in self._polling_handlers: 853 raise ValidationError(f"Polling trigger '{name}' not registered") 854 855 # Validate trigger configuration 856 trigger_config = self.config.polling_triggers[name] 857 try: 858 validate(inputs, trigger_config.input_schema) 859 except Exception as e: 860 raise ValidationError(e.message, e.schema, e.instance) 861 862 try: 863 auth_config = self.config.auth["fields"] 864 validate(context.auth, auth_config) 865 except Exception as e: 866 raise ValidationError(e.message, e.schema, e.instance) 867 868 # Create handler instance and execute 869 handler = self._polling_handlers[name]() 870 records = await handler.poll(inputs, last_poll_ts, context) 871 # Validate each record 872 for record in records: 873 if "id" not in record: 874 raise ValidationError( 875 f"Polling trigger '{name}' returned record without required 'id' field") 876 if "data" not in record: 877 raise ValidationError( 878 f"Polling trigger '{name}' returned record without required 'data' field") 879 880 # Validate record data against output schema 881 try: 882 validate(record["data"], trigger_config.output_schema) 883 except Exception as e: 884 raise ValidationError(e.message, e.schema, e.instance) 885 886 return records
Execute a registered polling trigger
Args: name: Name of the polling trigger to execute inputs: Trigger inputs last_poll_ts: Last poll timestamp context: Execution context
Returns: List of records
Raises: ValidationError: If inputs or outputs don't match schema
888 async def get_connected_account(self, context: ExecutionContext) -> IntegrationResult: 889 """Get connected account information 890 891 Args: 892 context: Execution context 893 894 Returns: 895 IntegrationResult containing connected account data 896 897 Raises: 898 ValidationError: If no connected account handler is registered or auth is invalid 899 """ 900 if not self._connected_account_handler: 901 raise ValidationError("No connected account handler registered") 902 903 if "fields" in self.config.auth: 904 auth_config = self.config.auth["fields"] 905 validator = Draft7Validator(auth_config) 906 errors = sorted(validator.iter_errors(context.auth), key=lambda e: e.path) 907 if errors: 908 message = "" 909 for error in errors: 910 message += f"{list(error.schema_path)}, {error.message},\n " 911 raise ValidationError(message, auth_config, context.auth) 912 913 handler = self._connected_account_handler() 914 account_info = await handler.get_account_info(context) 915 916 if not isinstance(account_info, ConnectedAccountInfo): 917 raise ValidationError( 918 f"Connected account handler must return ConnectedAccountInfo, got {type(account_info).__name__}" 919 ) 920 921 # Return IntegrationResult with ConnectedAccountInfo object directly 922 return IntegrationResult( 923 version=__version__, 924 type=ResultType.CONNECTED_ACCOUNT, 925 result=account_info 926 )
Get connected account information
Args: context: Execution context
Returns: IntegrationResult containing connected account data
Raises: ValidationError: If no connected account handler is registered or auth is invalid