autohive_integrations_sdk.integration
Autohive Integrations SDK — core module.
Provides the building blocks for creating Autohive integrations:
Integration— load config and register action/trigger/connected-account handlersExecutionContext— authenticated HTTP client passed to every handlerActionHandler— base class for action implementations (returnActionResult)ConnectedAccountHandler— base class for connected-account lookups (returnConnectedAccountInfo)ActionResult— standard return type wrapping action output data and optional billing costConnectedAccountInfo— structured account info returned by connected-account handlers
Typical usage::
from autohive_integrations_sdk import Integration, ActionHandler, ActionResult, ExecutionContext
integration = Integration.load()
@integration.action("my_action")
class MyAction(ActionHandler):
async def execute(self, inputs, context):
data = await context.fetch("https://api.example.com/resource")
return ActionResult(data=data)
1"""Autohive Integrations SDK — core module. 2 3Provides the building blocks for creating Autohive integrations: 4 5- `Integration` — load config and register action/trigger/connected-account handlers 6- `ExecutionContext` — authenticated HTTP client passed to every handler 7- `ActionHandler` — base class for action implementations (return `ActionResult`) 8- `ConnectedAccountHandler` — base class for connected-account lookups (return `ConnectedAccountInfo`) 9- `ActionResult` — standard return type wrapping action output data and optional billing cost 10- `ConnectedAccountInfo` — structured account info returned by connected-account handlers 11 12Typical usage:: 13 14 from autohive_integrations_sdk import Integration, ActionHandler, ActionResult, ExecutionContext 15 16 integration = Integration.load() 17 18 @integration.action("my_action") 19 class MyAction(ActionHandler): 20 async def execute(self, inputs, context): 21 data = await context.fetch("https://api.example.com/resource") 22 return ActionResult(data=data) 23""" 24 25# Standard Library Imports 26from abc import ABC, abstractmethod 27import asyncio 28from dataclasses import dataclass, field, asdict 29from datetime import timedelta 30from enum import Enum 31import json 32import json as jsonX # Keep alias to avoid conflict with 'json' parameter in fetch 33import logging 34import os 35import sys 36from pathlib import Path 37from typing import Dict, Any, List, Optional, Union, Type, TypeVar, Generic, ClassVar 38from urllib.parse import urlencode 39 40# Third-Party Imports 41import aiohttp 42from jsonschema import validate, Draft7Validator 43 44 45# Local Imports 46from autohive_integrations_sdk import __version__ 47 48 49# ---- Type Definitions ---- 50T = TypeVar('T') 51 52# ---- Auth Types ---- 53class AuthType(Enum): 54 """Authentication strategy used by an integration. 55 56 The platform stores the auth type alongside credentials and passes both 57 to ``ExecutionContext``. ``context.fetch()`` uses the type to decide 58 whether to auto-inject an ``Authorization`` header. 59 60 Members: 61 PlatformOauth2: Platform-managed OAuth 2.0 — the platform handles the 62 token lifecycle and injects ``Bearer <access_token>`` automatically. 63 PlatformTeams: Platform-managed Microsoft Teams auth. 64 ApiKey: A single API key provided by the user. 65 Basic: Username/password (HTTP Basic) credentials. 66 Custom: Free-form credential fields defined by the integration's 67 ``config.json`` auth schema. The integration is responsible for 68 reading individual fields from ``context.auth``. 69 """ 70 PlatformOauth2 = "PlatformOauth2" 71 PlatformTeams = "PlatformTeams" 72 ApiKey = "ApiKey" 73 Basic = "Basic" 74 Custom = "Custom" 75 76class ResultType(Enum): 77 """Type of result being returned""" 78 ACTION = "action" 79 CONNECTED_ACCOUNT = "connected_account" 80 ERROR = "error" 81 82# ---- Exceptions ---- 83class ValidationError(Exception): 84 """Raised when SDK validation fails. 85 86 This covers several cases: 87 88 - Action inputs don't match the ``input_schema`` in ``config.json`` 89 - Action outputs don't match the ``output_schema`` 90 - Auth credentials don't match the ``auth.fields`` schema 91 - An action handler returns something other than ``ActionResult`` 92 - A handler name isn't registered 93 """ 94 def __init__(self, message: str, schema: str = None, inputs: str = None): 95 self.schema = schema 96 """The schema that failed validation""" 97 self.inputs = inputs 98 """The data that failed validation""" 99 self.message = message 100 """The error message""" 101 super().__init__(message) 102 103class ConfigurationError(Exception): 104 """Raised when integration configuration is invalid""" 105 pass 106 107class HTTPError(Exception): 108 """Raised by ``ExecutionContext.fetch()`` for non-2xx HTTP responses (except 429).""" 109 def __init__(self, status: int, message: str, response_data: Any = None): 110 self.status = status 111 """Status code""" 112 self.message = message 113 """Error message""" 114 self.response_data = response_data 115 """Response data""" 116 super().__init__(f"HTTP {status}: {message}") 117 118class RateLimitError(HTTPError): 119 """Raised by ``ExecutionContext.fetch()`` on HTTP 429 (Too Many Requests). 120 121 Attributes: 122 retry_after: Seconds to wait before retrying, taken from the 123 ``Retry-After`` response header (defaults to 60 if absent). 124 """ 125 def __init__(self, retry_after: int, *args, **kwargs): 126 self.retry_after = retry_after 127 """Seconds to wait before retrying.""" 128 super().__init__(*args, **kwargs) 129 130# ---- Result Classes ---- 131@dataclass 132class ActionResult: 133 """Result returned by action handlers. 134 135 This class encapsulates the data returned by an action along with optional 136 billing information for cost tracking. 137 138 Args: 139 data: The actual result data from the action 140 cost_usd: Optional USD cost for billing purposes 141 142 Example: 143 ```python 144 return ActionResult( 145 data={"message": "Success", "result": 42}, 146 cost_usd=0.05 147 ) 148 ``` 149 """ 150 data: Any 151 cost_usd: Optional[float] = None 152 153@dataclass 154class ConnectedAccountInfo: 155 """Account metadata returned by a ``ConnectedAccountHandler``. 156 157 The platform calls the connected-account handler after a user links 158 their external account. The returned info is displayed in the 159 Autohive UI (avatar, name, email, etc.). 160 161 All fields are optional — populate whichever ones the external API provides. 162 """ 163 email: Optional[str] = None 164 first_name: Optional[str] = None 165 last_name: Optional[str] = None 166 username: Optional[str] = None 167 user_id: Optional[str] = None 168 avatar_url: Optional[str] = None 169 organization: Optional[str] = None 170 171@dataclass 172class IntegrationResult: 173 """Result format sent from lambda wrapper to backend. 174 175 This class represents the standardized format that the lambda wrapper 176 sends to the Autohive backend, including SDK version and type-specific data. 177 178 Args: 179 version: SDK version (auto-populated) 180 type: Type of result payload (ResultType enum: ACTION, CONNECTED_ACCOUNT, ERROR) 181 result: The result object - ActionResult for actions or 182 ConnectedAccountInfo for connected accounts. 183 The lambda wrapper serializes these to dicts using asdict(). 184 185 Note: 186 This type is returned by Integration methods and serialized by the lambda wrapper. 187 Integration developers should use ActionResult for action handlers. 188 """ 189 version: str 190 type: ResultType 191 result: Union[ActionResult, ConnectedAccountInfo] 192 193# ---- Configuration Classes ---- 194 195@dataclass 196class Parameter: 197 """Definition of a parameter""" 198 name: str 199 type: str 200 description: str 201 enum: Optional[List[str]] = None 202 required: bool = True 203 default: Any = None 204 205@dataclass 206class SchemaDefinition: 207 """Base class for components that have input/output schemas""" 208 name: str 209 description: str 210 input_schema: List[Parameter] 211 output_schema: Optional[Dict[str, Any]] = None 212 213@dataclass 214class Action(SchemaDefinition): 215 """Empty dataclass that inherits from SchemaDefinition""" 216 pass 217 218@dataclass 219class PollingTrigger(SchemaDefinition): 220 """Definition of a polling trigger""" 221 polling_interval: timedelta = field(default_factory=timedelta) 222 223@dataclass 224class IntegrationConfig: 225 """Configuration for an integration""" 226 name: str 227 version: str 228 description: str 229 auth: Dict[str, Any] 230 actions: Dict[str, Action] 231 polling_triggers: Dict[str, PollingTrigger] 232 233# ---- Base Handler Classes ---- 234class ActionHandler(ABC): 235 """Base class for action handlers. 236 237 Subclass this and implement ``execute()`` to handle a specific action. 238 Register it with the ``@integration.action("action_name")`` decorator. 239 240 Example:: 241 242 @integration.action("get_user") 243 class GetUser(ActionHandler): 244 async def execute(self, inputs, context): 245 user = await context.fetch(f"https://api.example.com/users/{inputs['id']}") 246 return ActionResult(data=user) 247 """ 248 @abstractmethod 249 async def execute(self, inputs: Dict[str, Any], context: 'ExecutionContext') -> Any: 250 """Run the action logic. 251 252 Args: 253 inputs: Validated action inputs matching the ``input_schema`` from ``config.json``. 254 context: Execution context providing ``fetch()``, ``auth``, and logging. 255 256 Returns: 257 An ``ActionResult`` containing the output data and optional ``cost_usd``. 258 """ 259 pass 260 261class PollingTriggerHandler(ABC): 262 """Base class for polling trigger handlers""" 263 @abstractmethod 264 async def poll(self, inputs: Dict[str, Any], last_poll_ts: Optional[str], context: 'ExecutionContext') -> List[Dict[str, Any]]: 265 """Execute the polling trigger""" 266 pass 267 268class ConnectedAccountHandler(ABC): 269 """Base class for connected-account handlers. 270 271 The platform calls this after a user links their external account. 272 The returned ``ConnectedAccountInfo`` is shown in the Autohive UI. 273 274 Register with the ``@integration.connected_account()`` decorator. 275 276 Example:: 277 278 @integration.connected_account() 279 class MyAccountHandler(ConnectedAccountHandler): 280 async def get_account_info(self, context): 281 me = await context.fetch("https://api.example.com/me") 282 return ConnectedAccountInfo( 283 email=me["email"], 284 first_name=me["first_name"], 285 last_name=me["last_name"], 286 ) 287 """ 288 @abstractmethod 289 async def get_account_info(self, context: 'ExecutionContext') -> ConnectedAccountInfo: 290 """Fetch account metadata from the external service. 291 292 For platform OAuth integrations, ``context.fetch()`` auto-injects 293 the Bearer token — no manual auth handling needed. 294 295 Returns: 296 A ``ConnectedAccountInfo`` with whichever fields the API provides. 297 """ 298 pass 299 300# ---- Core SDK Classes ---- 301class ExecutionContext: 302 """Context provided to integration handlers for making authenticated HTTP requests. 303 304 Manages an ``aiohttp`` session with automatic retries, error handling, and 305 optional Bearer-token injection for platform OAuth integrations. 306 307 Use as an async context manager:: 308 309 async with ExecutionContext(auth=auth) as context: 310 result = await integration.execute_action("my_action", inputs, context) 311 312 Args: 313 auth: Authentication data. In **local tests** this is a flat dict 314 matching the ``auth.fields`` schema in ``config.json`` 315 (e.g. ``{"api_key": "..."}``). In **production** the platform 316 wraps credentials as ``{"auth_type": "...", "credentials": {...}}``. 317 request_config: Override default ``max_retries`` (3) and ``timeout`` (30 s). 318 metadata: Arbitrary metadata forwarded to handlers. 319 logger: Custom logger; falls back to ``logging.getLogger(__name__)``. 320 """ 321 def __init__( 322 self, 323 auth: Dict[str, Any] = {}, 324 request_config: Optional[Dict[str, Any]] = None, 325 metadata: Optional[Dict[str, Any]] = None, 326 logger: Optional[logging.Logger] = None 327 ): 328 self.auth = auth 329 """Authentication configuration""" 330 self.config = request_config or {"max_retries": 3, "timeout": 30} 331 """Request configuration""" 332 self.metadata = metadata or {} 333 """Additional metadata""" 334 self.logger = logger or logging.getLogger(__name__) 335 """Logger instance""" 336 self._session: Optional[aiohttp.ClientSession] = None 337 338 async def __aenter__(self): 339 if not self._session: 340 self._session = aiohttp.ClientSession() 341 return self 342 343 async def __aexit__(self, exc_type, exc_val, exc_tb): 344 if self._session: 345 await self._session.close() 346 self._session = None 347 348 async def fetch( 349 self, 350 url: str, 351 method: str = "GET", 352 params: Optional[Dict[str, Any]] = None, 353 data: Any = None, 354 json: Any = None, 355 headers: Optional[Dict[str, str]] = None, 356 content_type: Optional[str] = None, 357 timeout: Optional[int] = None, 358 retry_count: int = 0 359 ) -> Any: 360 """Make an HTTP request with automatic retries and error handling. 361 362 For **platform OAuth** integrations (``auth_type == "PlatformOauth2"``), 363 a ``Bearer`` token is auto-injected from ``auth.credentials.access_token`` 364 unless an ``Authorization`` header is explicitly provided. 365 366 Retries up to ``max_retries`` (default 3) on transient network errors 367 with exponential back-off. HTTP 429 responses raise ``RateLimitError`` 368 immediately (no automatic retry). 369 370 Args: 371 url: The URL to request. 372 method: HTTP method (``"GET"``, ``"POST"``, ``"PUT"``, etc.). 373 params: Query parameters appended to the URL. Nested dicts/lists 374 are JSON-serialized automatically. 375 data: Raw request body. Encoding depends on ``content_type``. 376 json: JSON-serializable payload. Sets ``content_type`` to 377 ``application/json`` automatically. 378 headers: Additional HTTP headers. Merged *after* any auto-injected 379 auth header, so an explicit ``Authorization`` takes precedence. 380 content_type: ``Content-Type`` header value. 381 timeout: Per-request timeout in seconds (overrides ``request_config``). 382 retry_count: Internal — current retry attempt number. 383 384 Returns: 385 Parsed JSON (``dict``/``list``) when the response is 386 ``application/json``, otherwise the raw response text. 387 Returns ``None`` for empty 200/201/204 responses. 388 389 Raises: 390 RateLimitError: On HTTP 429 with the ``Retry-After`` value. 391 HTTPError: On any other non-2xx status. 392 """ 393 if not self._session: 394 self._session = aiohttp.ClientSession() 395 396 # Prepare request 397 if json is not None: 398 data = json 399 content_type = "application/json" 400 401 final_headers = {} 402 403 if self.auth and "Authorization" not in (headers or {}): 404 auth_type = AuthType(self.auth.get("auth_type", "PlatformOauth2")) 405 credentials = self.auth.get("credentials", {}) 406 407 if auth_type == AuthType.PlatformOauth2 and "access_token" in credentials: 408 final_headers["Authorization"] = f"Bearer {credentials['access_token']}" 409 410 if content_type: 411 final_headers["Content-Type"] = content_type 412 if headers: 413 final_headers.update(headers) 414 415 if params: 416 # Handle nested dictionary parameters 417 flat_params = {} 418 for key, value in params.items(): 419 if isinstance(value, (dict, list)): 420 flat_params[key] = jsonX.dumps(value) 421 elif value is not None: 422 flat_params[key] = str(value) 423 query_string = urlencode(flat_params) 424 url = f"{url}{'&' if '?' in url else '?'}{query_string}" 425 426 # Prepare body 427 if data is not None: 428 if content_type == "application/json": 429 data = jsonX.dumps(data) 430 elif content_type == "application/x-www-form-urlencoded": 431 data = urlencode(data) if isinstance(data, dict) else data 432 433 # Store the original timeout numeric value 434 original_timeout = timeout or self.config["timeout"] 435 436 # Convert the numeric timeout to a ClientTimeout instance for this request 437 client_timeout = aiohttp.ClientTimeout(total=original_timeout) 438 439 try: 440 async with self._session.request( 441 method=method, 442 url=url, 443 data=data, 444 headers=final_headers, 445 timeout=client_timeout, 446 ssl=True 447 ) as response: 448 content_type = response.headers.get("Content-Type", "") 449 450 if response.status == 429: # Rate limit 451 retry_after = int(response.headers.get("Retry-After", 60)) 452 raise RateLimitError( 453 retry_after, 454 response.status, 455 "Rate limit exceeded", 456 await response.text() 457 ) 458 459 try: 460 if "application/json" in content_type: 461 result = await response.json() 462 else: 463 result = await response.text() 464 if not result and response.status in {200, 201, 204}: 465 return None 466 except Exception as e: 467 self.logger.error(f"Error parsing response: {e}") 468 result = await response.text() 469 470 if not response.ok: 471 print(f"HTTP error encountered. Status: {response.status}. Result: {result}") 472 raise HTTPError(response.status, str(result), result) 473 474 return result 475 476 except RateLimitError: 477 raise 478 except (aiohttp.ClientError, asyncio.TimeoutError) as e: 479 # Don't want to send this to Raygun here because this will be retried. 480 print(f"Error encountered: {e}. Retry count: {retry_count}. Backing off.") 481 if retry_count < self.config["max_retries"]: 482 await asyncio.sleep(2 ** retry_count) # Exponential backoff 483 print("Retrying request...") 484 # Use original_timeout (numeric) for recursive calls 485 return await self.fetch( 486 url, method, params, data, json, 487 headers, content_type, original_timeout, retry_count + 1 488 ) 489 else: 490 print("Max retries reached. Raising error.") 491 raise 492 except Exception as e: 493 self.logger.error(f"Unexpected error during {method} {url}: {e}") 494 print(f"Unexpected error encountered: {e}") 495 raise 496 497 498class Integration: 499 """Base integration class with handler registration and execution. 500 501 This class manages the integration configuration, handler registration, 502 and provides methods to execute actions and triggers. 503 504 Args: 505 config: Integration configuration 506 507 Attributes: 508 config: Integration configuration 509 """ 510 511 def __init__(self, config: IntegrationConfig): 512 self.config = config 513 """Integration configuration""" 514 self._action_handlers: Dict[str, Type[ActionHandler]] = {} 515 """Action handlers""" 516 self._polling_handlers: Dict[str, Type[PollingTriggerHandler]] = {} 517 """Polling handlers""" 518 self._connected_account_handler: Optional[Type[ConnectedAccountHandler]] = None 519 """Connected account handler""" 520 521 @classmethod 522 def load(cls, config_path: Union[str, Path] = None) -> 'Integration': 523 """Load an integration from its ``config.json``. 524 525 Args: 526 config_path: Explicit path to ``config.json``. When omitted the 527 SDK resolves the path relative to its own package location, 528 which works when the SDK is vendored via 529 ``pip install --target dependencies``. Multi-file integrations 530 that use ``actions/`` sub-packages should pass an explicit path 531 (e.g. ``Integration.load("config.json")``). 532 533 Returns: 534 A fully initialised ``Integration`` ready for handler registration. 535 536 Raises: 537 ConfigurationError: If the file is missing or contains invalid JSON. 538 """ 539 if config_path is None: 540 config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'config.json') 541 542 config_path = Path(config_path) 543 544 if not config_path.exists(): 545 raise ConfigurationError(f"Configuration file not found: {config_path}") 546 547 try: 548 with open(config_path, 'r') as f: 549 config_data = json.load(f) 550 except json.JSONDecodeError as e: 551 raise ConfigurationError(f"Invalid JSON configuration: {e}") 552 553 # Parse configuration sections 554 actions = cls._parse_actions(config_data.get("actions", {})) 555 polling_triggers = cls._parse_polling_triggers(config_data.get("polling_triggers", {})) 556 557 config = IntegrationConfig( 558 name=config_data["name"], 559 version=config_data["version"], 560 description=config_data["description"], 561 auth=config_data.get("auth", {}), 562 actions=actions, 563 polling_triggers=polling_triggers 564 ) 565 566 return cls(config) 567 568 @staticmethod 569 def _parse_interval(interval_str: str) -> timedelta: 570 """Parse interval string into timedelta""" 571 unit = interval_str[-1].lower() 572 value = int(interval_str[:-1]) 573 574 if unit == 's': 575 return timedelta(seconds=value) 576 elif unit == 'm': 577 return timedelta(minutes=value) 578 elif unit == 'h': 579 return timedelta(hours=value) 580 elif unit == 'd': 581 return timedelta(days=value) 582 else: 583 raise ConfigurationError(f"Invalid interval format: {interval_str}") 584 585 @classmethod 586 def _parse_actions(cls, actions_config: Dict[str, Any]) -> Dict[str, Action]: 587 """Parse action configurations""" 588 actions = {} 589 for name, data in actions_config.items(): 590 actions[name] = Action( 591 name=name, 592 description=data["description"], 593 input_schema=data["input_schema"], 594 output_schema=data["output_schema"] 595 ) 596 597 return actions 598 599 @classmethod 600 def _parse_polling_triggers(cls, triggers_config: Dict[str, Any]) -> Dict[str, PollingTrigger]: 601 """Parse polling trigger configurations""" 602 triggers = {} 603 for name, data in triggers_config.items(): 604 interval = cls._parse_interval(data["polling_interval"]) 605 606 triggers[name] = PollingTrigger( 607 name=name, 608 description=data["description"], 609 polling_interval=interval, 610 input_schema=data["input_schema"], 611 output_schema=data["output_schema"] 612 ) 613 614 return triggers 615 616 def action(self, name: str): 617 """Decorator to register an action handler. 618 619 Args: 620 name: Name of the action to register 621 622 Returns: 623 Decorator function 624 625 Raises: 626 ConfigurationError: If action is not defined in config 627 628 Example: 629 ```python 630 @integration.action("my_action") 631 class MyActionHandler(ActionHandler): 632 async def execute(self, inputs, context): 633 # Implementation 634 return result 635 ``` 636 """ 637 def decorator(handler_class: Type[ActionHandler]): 638 if name not in self.config.actions: 639 raise ConfigurationError(f"Action '{name}' not defined in config") 640 self._action_handlers[name] = handler_class 641 return handler_class 642 return decorator 643 644 def polling_trigger(self, name: str): 645 """Decorator to register a polling trigger handler 646 647 Args: 648 name: Name of the polling trigger to register 649 650 Returns: 651 Decorator function 652 653 Raises: 654 ConfigurationError: If polling trigger is not defined in config 655 656 Example: 657 ```python 658 @integration.polling_trigger("my_polling_trigger") 659 class MyPollingTriggerHandler(PollingTriggerHandler): 660 async def poll(self, inputs, last_poll_ts, context): 661 # Implementation 662 return result 663 ``` 664 """ 665 def decorator(handler_class: Type[PollingTriggerHandler]): 666 if name not in self.config.polling_triggers: 667 raise ConfigurationError(f"Polling trigger '{name}' not defined in config") 668 self._polling_handlers[name] = handler_class 669 return handler_class 670 return decorator 671 672 def connected_account(self): 673 """Decorator to register a connected account handler 674 675 Returns: 676 Decorator function 677 678 Example: 679 ```python 680 @integration.connected_account() 681 class MyConnectedAccountHandler(ConnectedAccountHandler): 682 async def get_account_info(self, context): 683 # Implementation 684 return {"email": "user@example.com", "name": "John Doe"} 685 ``` 686 """ 687 def decorator(handler_class: Type[ConnectedAccountHandler]): 688 self._connected_account_handler = handler_class 689 return handler_class 690 return decorator 691 692 async def execute_action(self, 693 name: str, 694 inputs: Dict[str, Any], 695 context: ExecutionContext) -> IntegrationResult: 696 """Execute a registered action. 697 698 Args: 699 name: Name of the action to execute 700 inputs: Action inputs 701 context: Execution context 702 703 Returns: 704 IntegrationResult with action data and optional billing information 705 706 Raises: 707 ValidationError: If inputs or outputs don't match schema, or if handler doesn't return ActionResult 708 """ 709 if name not in self._action_handlers: 710 raise ValidationError(f"Action '{name}' not registered") 711 712 # Validate inputs against action schema 713 action_config = self.config.actions[name] 714 validator = Draft7Validator(action_config.input_schema) 715 errors = sorted(validator.iter_errors(inputs), key=lambda e: e.path) 716 if errors: 717 message = "" 718 for error in errors: 719 message += f"{list(error.schema_path)}, {error.message},\n " 720 raise ValidationError(message, action_config.input_schema, inputs) 721 722 if "fields" in self.config.auth: 723 auth_config = self.config.auth["fields"] 724 validator = Draft7Validator(auth_config) 725 errors = sorted(validator.iter_errors(context.auth), key=lambda e: e.path) 726 if errors: 727 message = "" 728 for error in errors: 729 message += f"{list(error.schema_path)}, {error.message},\n " 730 raise ValidationError(message, auth_config, context.auth) 731 732 # Create handler instance and execute 733 handler = self._action_handlers[name]() 734 result = await handler.execute(inputs, context) 735 736 # Validate that result is ActionResult 737 if not isinstance(result, ActionResult): 738 raise ValidationError( 739 f"Action handler '{name}' must return ActionResult, got {type(result).__name__}" 740 ) 741 742 # Validate output schema against the data inside ActionResult 743 validator = Draft7Validator(action_config.output_schema) 744 errors = sorted(validator.iter_errors(result.data), key=lambda e: e.path) 745 if errors: 746 message = "" 747 for error in errors: 748 message += f"{list(error.schema_path)}, {error.message},\n " 749 raise ValidationError(message, action_config.output_schema, result.data) 750 751 # Return IntegrationResult with ActionResult directly 752 return IntegrationResult( 753 version=__version__, 754 type=ResultType.ACTION, 755 result=result 756 ) 757 758 async def execute_polling_trigger(self, 759 name: str, 760 inputs: Dict[str, Any], 761 last_poll_ts: Optional[str], 762 context: ExecutionContext) -> List[Dict[str, Any]]: 763 """Execute a registered polling trigger 764 765 Args: 766 name: Name of the polling trigger to execute 767 inputs: Trigger inputs 768 last_poll_ts: Last poll timestamp 769 context: Execution context 770 771 Returns: 772 List of records 773 774 Raises: 775 ValidationError: If inputs or outputs don't match schema 776 """ 777 if name not in self._polling_handlers: 778 raise ValidationError(f"Polling trigger '{name}' not registered") 779 780 # Validate trigger configuration 781 trigger_config = self.config.polling_triggers[name] 782 try: 783 validate(inputs, trigger_config.input_schema) 784 except Exception as e: 785 raise ValidationError(e.message, e.schema, e.instance) 786 787 try: 788 auth_config = self.config.auth["fields"] 789 validate(context.auth, auth_config) 790 except Exception as e: 791 raise ValidationError(e.message, e.schema, e.instance) 792 793 # Create handler instance and execute 794 handler = self._polling_handlers[name]() 795 records = await handler.poll(inputs, last_poll_ts, context) 796 # Validate each record 797 for record in records: 798 if "id" not in record: 799 raise ValidationError( 800 f"Polling trigger '{name}' returned record without required 'id' field") 801 if "data" not in record: 802 raise ValidationError( 803 f"Polling trigger '{name}' returned record without required 'data' field") 804 805 # Validate record data against output schema 806 try: 807 validate(record["data"], trigger_config.output_schema) 808 except Exception as e: 809 raise ValidationError(e.message, e.schema, e.instance) 810 811 return records 812 813 async def get_connected_account(self, context: ExecutionContext) -> IntegrationResult: 814 """Get connected account information 815 816 Args: 817 context: Execution context 818 819 Returns: 820 IntegrationResult containing connected account data 821 822 Raises: 823 ValidationError: If no connected account handler is registered or auth is invalid 824 """ 825 if not self._connected_account_handler: 826 raise ValidationError("No connected account handler registered") 827 828 if "fields" in self.config.auth: 829 auth_config = self.config.auth["fields"] 830 validator = Draft7Validator(auth_config) 831 errors = sorted(validator.iter_errors(context.auth), key=lambda e: e.path) 832 if errors: 833 message = "" 834 for error in errors: 835 message += f"{list(error.schema_path)}, {error.message},\n " 836 raise ValidationError(message, auth_config, context.auth) 837 838 handler = self._connected_account_handler() 839 account_info = await handler.get_account_info(context) 840 841 if not isinstance(account_info, ConnectedAccountInfo): 842 raise ValidationError( 843 f"Connected account handler must return ConnectedAccountInfo, got {type(account_info).__name__}" 844 ) 845 846 # Return IntegrationResult with ConnectedAccountInfo object directly 847 return IntegrationResult( 848 version=__version__, 849 type=ResultType.CONNECTED_ACCOUNT, 850 result=account_info 851 )
54class AuthType(Enum): 55 """Authentication strategy used by an integration. 56 57 The platform stores the auth type alongside credentials and passes both 58 to ``ExecutionContext``. ``context.fetch()`` uses the type to decide 59 whether to auto-inject an ``Authorization`` header. 60 61 Members: 62 PlatformOauth2: Platform-managed OAuth 2.0 — the platform handles the 63 token lifecycle and injects ``Bearer <access_token>`` automatically. 64 PlatformTeams: Platform-managed Microsoft Teams auth. 65 ApiKey: A single API key provided by the user. 66 Basic: Username/password (HTTP Basic) credentials. 67 Custom: Free-form credential fields defined by the integration's 68 ``config.json`` auth schema. The integration is responsible for 69 reading individual fields from ``context.auth``. 70 """ 71 PlatformOauth2 = "PlatformOauth2" 72 PlatformTeams = "PlatformTeams" 73 ApiKey = "ApiKey" 74 Basic = "Basic" 75 Custom = "Custom"
Authentication strategy used by an integration.
The platform stores the auth type alongside credentials and passes both
to ExecutionContext. context.fetch() uses the type to decide
whether to auto-inject an Authorization header.
Members:
PlatformOauth2: Platform-managed OAuth 2.0 — the platform handles the
token lifecycle and injects Bearer <access_token> automatically.
PlatformTeams: Platform-managed Microsoft Teams auth.
ApiKey: A single API key provided by the user.
Basic: Username/password (HTTP Basic) credentials.
Custom: Free-form credential fields defined by the integration's
config.json auth schema. The integration is responsible for
reading individual fields from context.auth.
77class ResultType(Enum): 78 """Type of result being returned""" 79 ACTION = "action" 80 CONNECTED_ACCOUNT = "connected_account" 81 ERROR = "error"
Type of result being returned
84class ValidationError(Exception): 85 """Raised when SDK validation fails. 86 87 This covers several cases: 88 89 - Action inputs don't match the ``input_schema`` in ``config.json`` 90 - Action outputs don't match the ``output_schema`` 91 - Auth credentials don't match the ``auth.fields`` schema 92 - An action handler returns something other than ``ActionResult`` 93 - A handler name isn't registered 94 """ 95 def __init__(self, message: str, schema: str = None, inputs: str = None): 96 self.schema = schema 97 """The schema that failed validation""" 98 self.inputs = inputs 99 """The data that failed validation""" 100 self.message = message 101 """The error message""" 102 super().__init__(message)
Raised when SDK validation fails.
This covers several cases:
- Action inputs don't match the
input_schemainconfig.json - Action outputs don't match the
output_schema - Auth credentials don't match the
auth.fieldsschema - An action handler returns something other than
ActionResult - A handler name isn't registered
104class ConfigurationError(Exception): 105 """Raised when integration configuration is invalid""" 106 pass
Raised when integration configuration is invalid
108class HTTPError(Exception): 109 """Raised by ``ExecutionContext.fetch()`` for non-2xx HTTP responses (except 429).""" 110 def __init__(self, status: int, message: str, response_data: Any = None): 111 self.status = status 112 """Status code""" 113 self.message = message 114 """Error message""" 115 self.response_data = response_data 116 """Response data""" 117 super().__init__(f"HTTP {status}: {message}")
Raised by ExecutionContext.fetch() for non-2xx HTTP responses (except 429).
119class RateLimitError(HTTPError): 120 """Raised by ``ExecutionContext.fetch()`` on HTTP 429 (Too Many Requests). 121 122 Attributes: 123 retry_after: Seconds to wait before retrying, taken from the 124 ``Retry-After`` response header (defaults to 60 if absent). 125 """ 126 def __init__(self, retry_after: int, *args, **kwargs): 127 self.retry_after = retry_after 128 """Seconds to wait before retrying.""" 129 super().__init__(*args, **kwargs)
Raised by ExecutionContext.fetch() on HTTP 429 (Too Many Requests).
Attributes:
retry_after: Seconds to wait before retrying, taken from the
Retry-After response header (defaults to 60 if absent).
Inherited Members
132@dataclass 133class ActionResult: 134 """Result returned by action handlers. 135 136 This class encapsulates the data returned by an action along with optional 137 billing information for cost tracking. 138 139 Args: 140 data: The actual result data from the action 141 cost_usd: Optional USD cost for billing purposes 142 143 Example: 144 ```python 145 return ActionResult( 146 data={"message": "Success", "result": 42}, 147 cost_usd=0.05 148 ) 149 ``` 150 """ 151 data: Any 152 cost_usd: Optional[float] = None
Result returned by action handlers.
This class encapsulates the data returned by an action along with optional billing information for cost tracking.
Args: data: The actual result data from the action cost_usd: Optional USD cost for billing purposes
Example:
return ActionResult(
data={"message": "Success", "result": 42},
cost_usd=0.05
)
154@dataclass 155class ConnectedAccountInfo: 156 """Account metadata returned by a ``ConnectedAccountHandler``. 157 158 The platform calls the connected-account handler after a user links 159 their external account. The returned info is displayed in the 160 Autohive UI (avatar, name, email, etc.). 161 162 All fields are optional — populate whichever ones the external API provides. 163 """ 164 email: Optional[str] = None 165 first_name: Optional[str] = None 166 last_name: Optional[str] = None 167 username: Optional[str] = None 168 user_id: Optional[str] = None 169 avatar_url: Optional[str] = None 170 organization: Optional[str] = None
Account metadata returned by a ConnectedAccountHandler.
The platform calls the connected-account handler after a user links their external account. The returned info is displayed in the Autohive UI (avatar, name, email, etc.).
All fields are optional — populate whichever ones the external API provides.
172@dataclass 173class IntegrationResult: 174 """Result format sent from lambda wrapper to backend. 175 176 This class represents the standardized format that the lambda wrapper 177 sends to the Autohive backend, including SDK version and type-specific data. 178 179 Args: 180 version: SDK version (auto-populated) 181 type: Type of result payload (ResultType enum: ACTION, CONNECTED_ACCOUNT, ERROR) 182 result: The result object - ActionResult for actions or 183 ConnectedAccountInfo for connected accounts. 184 The lambda wrapper serializes these to dicts using asdict(). 185 186 Note: 187 This type is returned by Integration methods and serialized by the lambda wrapper. 188 Integration developers should use ActionResult for action handlers. 189 """ 190 version: str 191 type: ResultType 192 result: Union[ActionResult, ConnectedAccountInfo]
Result format sent from lambda wrapper to backend.
This class represents the standardized format that the lambda wrapper sends to the Autohive backend, including SDK version and type-specific data.
Args: version: SDK version (auto-populated) type: Type of result payload (ResultType enum: ACTION, CONNECTED_ACCOUNT, ERROR) result: The result object - ActionResult for actions or ConnectedAccountInfo for connected accounts. The lambda wrapper serializes these to dicts using asdict().
Note: This type is returned by Integration methods and serialized by the lambda wrapper. Integration developers should use ActionResult for action handlers.
196@dataclass 197class Parameter: 198 """Definition of a parameter""" 199 name: str 200 type: str 201 description: str 202 enum: Optional[List[str]] = None 203 required: bool = True 204 default: Any = None
Definition of a parameter
206@dataclass 207class SchemaDefinition: 208 """Base class for components that have input/output schemas""" 209 name: str 210 description: str 211 input_schema: List[Parameter] 212 output_schema: Optional[Dict[str, Any]] = None
Base class for components that have input/output schemas
214@dataclass 215class Action(SchemaDefinition): 216 """Empty dataclass that inherits from SchemaDefinition""" 217 pass
Empty dataclass that inherits from SchemaDefinition
Inherited Members
219@dataclass 220class PollingTrigger(SchemaDefinition): 221 """Definition of a polling trigger""" 222 polling_interval: timedelta = field(default_factory=timedelta)
Definition of a polling trigger
Inherited Members
224@dataclass 225class IntegrationConfig: 226 """Configuration for an integration""" 227 name: str 228 version: str 229 description: str 230 auth: Dict[str, Any] 231 actions: Dict[str, Action] 232 polling_triggers: Dict[str, PollingTrigger]
Configuration for an integration
235class ActionHandler(ABC): 236 """Base class for action handlers. 237 238 Subclass this and implement ``execute()`` to handle a specific action. 239 Register it with the ``@integration.action("action_name")`` decorator. 240 241 Example:: 242 243 @integration.action("get_user") 244 class GetUser(ActionHandler): 245 async def execute(self, inputs, context): 246 user = await context.fetch(f"https://api.example.com/users/{inputs['id']}") 247 return ActionResult(data=user) 248 """ 249 @abstractmethod 250 async def execute(self, inputs: Dict[str, Any], context: 'ExecutionContext') -> Any: 251 """Run the action logic. 252 253 Args: 254 inputs: Validated action inputs matching the ``input_schema`` from ``config.json``. 255 context: Execution context providing ``fetch()``, ``auth``, and logging. 256 257 Returns: 258 An ``ActionResult`` containing the output data and optional ``cost_usd``. 259 """ 260 pass
Base class for action handlers.
Subclass this and implement execute() to handle a specific action.
Register it with the @integration.action("action_name") decorator.
Example::
@integration.action("get_user")
class GetUser(ActionHandler):
async def execute(self, inputs, context):
user = await context.fetch(f"https://api.example.com/users/{inputs['id']}")
return ActionResult(data=user)
249 @abstractmethod 250 async def execute(self, inputs: Dict[str, Any], context: 'ExecutionContext') -> Any: 251 """Run the action logic. 252 253 Args: 254 inputs: Validated action inputs matching the ``input_schema`` from ``config.json``. 255 context: Execution context providing ``fetch()``, ``auth``, and logging. 256 257 Returns: 258 An ``ActionResult`` containing the output data and optional ``cost_usd``. 259 """ 260 pass
Run the action logic.
Args:
inputs: Validated action inputs matching the input_schema from config.json.
context: Execution context providing fetch(), auth, and logging.
Returns:
An ActionResult containing the output data and optional cost_usd.
262class PollingTriggerHandler(ABC): 263 """Base class for polling trigger handlers""" 264 @abstractmethod 265 async def poll(self, inputs: Dict[str, Any], last_poll_ts: Optional[str], context: 'ExecutionContext') -> List[Dict[str, Any]]: 266 """Execute the polling trigger""" 267 pass
Base class for polling trigger handlers
264 @abstractmethod 265 async def poll(self, inputs: Dict[str, Any], last_poll_ts: Optional[str], context: 'ExecutionContext') -> List[Dict[str, Any]]: 266 """Execute the polling trigger""" 267 pass
Execute the polling trigger
269class ConnectedAccountHandler(ABC): 270 """Base class for connected-account handlers. 271 272 The platform calls this after a user links their external account. 273 The returned ``ConnectedAccountInfo`` is shown in the Autohive UI. 274 275 Register with the ``@integration.connected_account()`` decorator. 276 277 Example:: 278 279 @integration.connected_account() 280 class MyAccountHandler(ConnectedAccountHandler): 281 async def get_account_info(self, context): 282 me = await context.fetch("https://api.example.com/me") 283 return ConnectedAccountInfo( 284 email=me["email"], 285 first_name=me["first_name"], 286 last_name=me["last_name"], 287 ) 288 """ 289 @abstractmethod 290 async def get_account_info(self, context: 'ExecutionContext') -> ConnectedAccountInfo: 291 """Fetch account metadata from the external service. 292 293 For platform OAuth integrations, ``context.fetch()`` auto-injects 294 the Bearer token — no manual auth handling needed. 295 296 Returns: 297 A ``ConnectedAccountInfo`` with whichever fields the API provides. 298 """ 299 pass
Base class for connected-account handlers.
The platform calls this after a user links their external account.
The returned ConnectedAccountInfo is shown in the Autohive UI.
Register with the @integration.connected_account() decorator.
Example::
@integration.connected_account()
class MyAccountHandler(ConnectedAccountHandler):
async def get_account_info(self, context):
me = await context.fetch("https://api.example.com/me")
return ConnectedAccountInfo(
email=me["email"],
first_name=me["first_name"],
last_name=me["last_name"],
)
289 @abstractmethod 290 async def get_account_info(self, context: 'ExecutionContext') -> ConnectedAccountInfo: 291 """Fetch account metadata from the external service. 292 293 For platform OAuth integrations, ``context.fetch()`` auto-injects 294 the Bearer token — no manual auth handling needed. 295 296 Returns: 297 A ``ConnectedAccountInfo`` with whichever fields the API provides. 298 """ 299 pass
Fetch account metadata from the external service.
For platform OAuth integrations, context.fetch() auto-injects
the Bearer token — no manual auth handling needed.
Returns:
A ConnectedAccountInfo with whichever fields the API provides.
302class ExecutionContext: 303 """Context provided to integration handlers for making authenticated HTTP requests. 304 305 Manages an ``aiohttp`` session with automatic retries, error handling, and 306 optional Bearer-token injection for platform OAuth integrations. 307 308 Use as an async context manager:: 309 310 async with ExecutionContext(auth=auth) as context: 311 result = await integration.execute_action("my_action", inputs, context) 312 313 Args: 314 auth: Authentication data. In **local tests** this is a flat dict 315 matching the ``auth.fields`` schema in ``config.json`` 316 (e.g. ``{"api_key": "..."}``). In **production** the platform 317 wraps credentials as ``{"auth_type": "...", "credentials": {...}}``. 318 request_config: Override default ``max_retries`` (3) and ``timeout`` (30 s). 319 metadata: Arbitrary metadata forwarded to handlers. 320 logger: Custom logger; falls back to ``logging.getLogger(__name__)``. 321 """ 322 def __init__( 323 self, 324 auth: Dict[str, Any] = {}, 325 request_config: Optional[Dict[str, Any]] = None, 326 metadata: Optional[Dict[str, Any]] = None, 327 logger: Optional[logging.Logger] = None 328 ): 329 self.auth = auth 330 """Authentication configuration""" 331 self.config = request_config or {"max_retries": 3, "timeout": 30} 332 """Request configuration""" 333 self.metadata = metadata or {} 334 """Additional metadata""" 335 self.logger = logger or logging.getLogger(__name__) 336 """Logger instance""" 337 self._session: Optional[aiohttp.ClientSession] = None 338 339 async def __aenter__(self): 340 if not self._session: 341 self._session = aiohttp.ClientSession() 342 return self 343 344 async def __aexit__(self, exc_type, exc_val, exc_tb): 345 if self._session: 346 await self._session.close() 347 self._session = None 348 349 async def fetch( 350 self, 351 url: str, 352 method: str = "GET", 353 params: Optional[Dict[str, Any]] = None, 354 data: Any = None, 355 json: Any = None, 356 headers: Optional[Dict[str, str]] = None, 357 content_type: Optional[str] = None, 358 timeout: Optional[int] = None, 359 retry_count: int = 0 360 ) -> Any: 361 """Make an HTTP request with automatic retries and error handling. 362 363 For **platform OAuth** integrations (``auth_type == "PlatformOauth2"``), 364 a ``Bearer`` token is auto-injected from ``auth.credentials.access_token`` 365 unless an ``Authorization`` header is explicitly provided. 366 367 Retries up to ``max_retries`` (default 3) on transient network errors 368 with exponential back-off. HTTP 429 responses raise ``RateLimitError`` 369 immediately (no automatic retry). 370 371 Args: 372 url: The URL to request. 373 method: HTTP method (``"GET"``, ``"POST"``, ``"PUT"``, etc.). 374 params: Query parameters appended to the URL. Nested dicts/lists 375 are JSON-serialized automatically. 376 data: Raw request body. Encoding depends on ``content_type``. 377 json: JSON-serializable payload. Sets ``content_type`` to 378 ``application/json`` automatically. 379 headers: Additional HTTP headers. Merged *after* any auto-injected 380 auth header, so an explicit ``Authorization`` takes precedence. 381 content_type: ``Content-Type`` header value. 382 timeout: Per-request timeout in seconds (overrides ``request_config``). 383 retry_count: Internal — current retry attempt number. 384 385 Returns: 386 Parsed JSON (``dict``/``list``) when the response is 387 ``application/json``, otherwise the raw response text. 388 Returns ``None`` for empty 200/201/204 responses. 389 390 Raises: 391 RateLimitError: On HTTP 429 with the ``Retry-After`` value. 392 HTTPError: On any other non-2xx status. 393 """ 394 if not self._session: 395 self._session = aiohttp.ClientSession() 396 397 # Prepare request 398 if json is not None: 399 data = json 400 content_type = "application/json" 401 402 final_headers = {} 403 404 if self.auth and "Authorization" not in (headers or {}): 405 auth_type = AuthType(self.auth.get("auth_type", "PlatformOauth2")) 406 credentials = self.auth.get("credentials", {}) 407 408 if auth_type == AuthType.PlatformOauth2 and "access_token" in credentials: 409 final_headers["Authorization"] = f"Bearer {credentials['access_token']}" 410 411 if content_type: 412 final_headers["Content-Type"] = content_type 413 if headers: 414 final_headers.update(headers) 415 416 if params: 417 # Handle nested dictionary parameters 418 flat_params = {} 419 for key, value in params.items(): 420 if isinstance(value, (dict, list)): 421 flat_params[key] = jsonX.dumps(value) 422 elif value is not None: 423 flat_params[key] = str(value) 424 query_string = urlencode(flat_params) 425 url = f"{url}{'&' if '?' in url else '?'}{query_string}" 426 427 # Prepare body 428 if data is not None: 429 if content_type == "application/json": 430 data = jsonX.dumps(data) 431 elif content_type == "application/x-www-form-urlencoded": 432 data = urlencode(data) if isinstance(data, dict) else data 433 434 # Store the original timeout numeric value 435 original_timeout = timeout or self.config["timeout"] 436 437 # Convert the numeric timeout to a ClientTimeout instance for this request 438 client_timeout = aiohttp.ClientTimeout(total=original_timeout) 439 440 try: 441 async with self._session.request( 442 method=method, 443 url=url, 444 data=data, 445 headers=final_headers, 446 timeout=client_timeout, 447 ssl=True 448 ) as response: 449 content_type = response.headers.get("Content-Type", "") 450 451 if response.status == 429: # Rate limit 452 retry_after = int(response.headers.get("Retry-After", 60)) 453 raise RateLimitError( 454 retry_after, 455 response.status, 456 "Rate limit exceeded", 457 await response.text() 458 ) 459 460 try: 461 if "application/json" in content_type: 462 result = await response.json() 463 else: 464 result = await response.text() 465 if not result and response.status in {200, 201, 204}: 466 return None 467 except Exception as e: 468 self.logger.error(f"Error parsing response: {e}") 469 result = await response.text() 470 471 if not response.ok: 472 print(f"HTTP error encountered. Status: {response.status}. Result: {result}") 473 raise HTTPError(response.status, str(result), result) 474 475 return result 476 477 except RateLimitError: 478 raise 479 except (aiohttp.ClientError, asyncio.TimeoutError) as e: 480 # Don't want to send this to Raygun here because this will be retried. 481 print(f"Error encountered: {e}. Retry count: {retry_count}. Backing off.") 482 if retry_count < self.config["max_retries"]: 483 await asyncio.sleep(2 ** retry_count) # Exponential backoff 484 print("Retrying request...") 485 # Use original_timeout (numeric) for recursive calls 486 return await self.fetch( 487 url, method, params, data, json, 488 headers, content_type, original_timeout, retry_count + 1 489 ) 490 else: 491 print("Max retries reached. Raising error.") 492 raise 493 except Exception as e: 494 self.logger.error(f"Unexpected error during {method} {url}: {e}") 495 print(f"Unexpected error encountered: {e}") 496 raise
Context provided to integration handlers for making authenticated HTTP requests.
Manages an aiohttp session with automatic retries, error handling, and
optional Bearer-token injection for platform OAuth integrations.
Use as an async context manager::
async with ExecutionContext(auth=auth) as context:
result = await integration.execute_action("my_action", inputs, context)
Args:
auth: Authentication data. In local tests this is a flat dict
matching the auth.fields schema in config.json
(e.g. {"api_key": "..."}). In production the platform
wraps credentials as {"auth_type": "...", "credentials": {...}}.
request_config: Override default max_retries (3) and timeout (30 s).
metadata: Arbitrary metadata forwarded to handlers.
logger: Custom logger; falls back to logging.getLogger(__name__).
322 def __init__( 323 self, 324 auth: Dict[str, Any] = {}, 325 request_config: Optional[Dict[str, Any]] = None, 326 metadata: Optional[Dict[str, Any]] = None, 327 logger: Optional[logging.Logger] = None 328 ): 329 self.auth = auth 330 """Authentication configuration""" 331 self.config = request_config or {"max_retries": 3, "timeout": 30} 332 """Request configuration""" 333 self.metadata = metadata or {} 334 """Additional metadata""" 335 self.logger = logger or logging.getLogger(__name__) 336 """Logger instance""" 337 self._session: Optional[aiohttp.ClientSession] = None
349 async def fetch( 350 self, 351 url: str, 352 method: str = "GET", 353 params: Optional[Dict[str, Any]] = None, 354 data: Any = None, 355 json: Any = None, 356 headers: Optional[Dict[str, str]] = None, 357 content_type: Optional[str] = None, 358 timeout: Optional[int] = None, 359 retry_count: int = 0 360 ) -> Any: 361 """Make an HTTP request with automatic retries and error handling. 362 363 For **platform OAuth** integrations (``auth_type == "PlatformOauth2"``), 364 a ``Bearer`` token is auto-injected from ``auth.credentials.access_token`` 365 unless an ``Authorization`` header is explicitly provided. 366 367 Retries up to ``max_retries`` (default 3) on transient network errors 368 with exponential back-off. HTTP 429 responses raise ``RateLimitError`` 369 immediately (no automatic retry). 370 371 Args: 372 url: The URL to request. 373 method: HTTP method (``"GET"``, ``"POST"``, ``"PUT"``, etc.). 374 params: Query parameters appended to the URL. Nested dicts/lists 375 are JSON-serialized automatically. 376 data: Raw request body. Encoding depends on ``content_type``. 377 json: JSON-serializable payload. Sets ``content_type`` to 378 ``application/json`` automatically. 379 headers: Additional HTTP headers. Merged *after* any auto-injected 380 auth header, so an explicit ``Authorization`` takes precedence. 381 content_type: ``Content-Type`` header value. 382 timeout: Per-request timeout in seconds (overrides ``request_config``). 383 retry_count: Internal — current retry attempt number. 384 385 Returns: 386 Parsed JSON (``dict``/``list``) when the response is 387 ``application/json``, otherwise the raw response text. 388 Returns ``None`` for empty 200/201/204 responses. 389 390 Raises: 391 RateLimitError: On HTTP 429 with the ``Retry-After`` value. 392 HTTPError: On any other non-2xx status. 393 """ 394 if not self._session: 395 self._session = aiohttp.ClientSession() 396 397 # Prepare request 398 if json is not None: 399 data = json 400 content_type = "application/json" 401 402 final_headers = {} 403 404 if self.auth and "Authorization" not in (headers or {}): 405 auth_type = AuthType(self.auth.get("auth_type", "PlatformOauth2")) 406 credentials = self.auth.get("credentials", {}) 407 408 if auth_type == AuthType.PlatformOauth2 and "access_token" in credentials: 409 final_headers["Authorization"] = f"Bearer {credentials['access_token']}" 410 411 if content_type: 412 final_headers["Content-Type"] = content_type 413 if headers: 414 final_headers.update(headers) 415 416 if params: 417 # Handle nested dictionary parameters 418 flat_params = {} 419 for key, value in params.items(): 420 if isinstance(value, (dict, list)): 421 flat_params[key] = jsonX.dumps(value) 422 elif value is not None: 423 flat_params[key] = str(value) 424 query_string = urlencode(flat_params) 425 url = f"{url}{'&' if '?' in url else '?'}{query_string}" 426 427 # Prepare body 428 if data is not None: 429 if content_type == "application/json": 430 data = jsonX.dumps(data) 431 elif content_type == "application/x-www-form-urlencoded": 432 data = urlencode(data) if isinstance(data, dict) else data 433 434 # Store the original timeout numeric value 435 original_timeout = timeout or self.config["timeout"] 436 437 # Convert the numeric timeout to a ClientTimeout instance for this request 438 client_timeout = aiohttp.ClientTimeout(total=original_timeout) 439 440 try: 441 async with self._session.request( 442 method=method, 443 url=url, 444 data=data, 445 headers=final_headers, 446 timeout=client_timeout, 447 ssl=True 448 ) as response: 449 content_type = response.headers.get("Content-Type", "") 450 451 if response.status == 429: # Rate limit 452 retry_after = int(response.headers.get("Retry-After", 60)) 453 raise RateLimitError( 454 retry_after, 455 response.status, 456 "Rate limit exceeded", 457 await response.text() 458 ) 459 460 try: 461 if "application/json" in content_type: 462 result = await response.json() 463 else: 464 result = await response.text() 465 if not result and response.status in {200, 201, 204}: 466 return None 467 except Exception as e: 468 self.logger.error(f"Error parsing response: {e}") 469 result = await response.text() 470 471 if not response.ok: 472 print(f"HTTP error encountered. Status: {response.status}. Result: {result}") 473 raise HTTPError(response.status, str(result), result) 474 475 return result 476 477 except RateLimitError: 478 raise 479 except (aiohttp.ClientError, asyncio.TimeoutError) as e: 480 # Don't want to send this to Raygun here because this will be retried. 481 print(f"Error encountered: {e}. Retry count: {retry_count}. Backing off.") 482 if retry_count < self.config["max_retries"]: 483 await asyncio.sleep(2 ** retry_count) # Exponential backoff 484 print("Retrying request...") 485 # Use original_timeout (numeric) for recursive calls 486 return await self.fetch( 487 url, method, params, data, json, 488 headers, content_type, original_timeout, retry_count + 1 489 ) 490 else: 491 print("Max retries reached. Raising error.") 492 raise 493 except Exception as e: 494 self.logger.error(f"Unexpected error during {method} {url}: {e}") 495 print(f"Unexpected error encountered: {e}") 496 raise
Make an HTTP request with automatic retries and error handling.
For platform OAuth integrations (auth_type == "PlatformOauth2"),
a Bearer token is auto-injected from auth.credentials.access_token
unless an Authorization header is explicitly provided.
Retries up to max_retries (default 3) on transient network errors
with exponential back-off. HTTP 429 responses raise RateLimitError
immediately (no automatic retry).
Args:
url: The URL to request.
method: HTTP method ("GET", "POST", "PUT", etc.).
params: Query parameters appended to the URL. Nested dicts/lists
are JSON-serialized automatically.
data: Raw request body. Encoding depends on content_type.
json: JSON-serializable payload. Sets content_type to
application/json automatically.
headers: Additional HTTP headers. Merged after any auto-injected
auth header, so an explicit Authorization takes precedence.
content_type: Content-Type header value.
timeout: Per-request timeout in seconds (overrides request_config).
retry_count: Internal — current retry attempt number.
Returns:
Parsed JSON (dict/list) when the response is
application/json, otherwise the raw response text.
Returns None for empty 200/201/204 responses.
Raises:
RateLimitError: On HTTP 429 with the Retry-After value.
HTTPError: On any other non-2xx status.
499class Integration: 500 """Base integration class with handler registration and execution. 501 502 This class manages the integration configuration, handler registration, 503 and provides methods to execute actions and triggers. 504 505 Args: 506 config: Integration configuration 507 508 Attributes: 509 config: Integration configuration 510 """ 511 512 def __init__(self, config: IntegrationConfig): 513 self.config = config 514 """Integration configuration""" 515 self._action_handlers: Dict[str, Type[ActionHandler]] = {} 516 """Action handlers""" 517 self._polling_handlers: Dict[str, Type[PollingTriggerHandler]] = {} 518 """Polling handlers""" 519 self._connected_account_handler: Optional[Type[ConnectedAccountHandler]] = None 520 """Connected account handler""" 521 522 @classmethod 523 def load(cls, config_path: Union[str, Path] = None) -> 'Integration': 524 """Load an integration from its ``config.json``. 525 526 Args: 527 config_path: Explicit path to ``config.json``. When omitted the 528 SDK resolves the path relative to its own package location, 529 which works when the SDK is vendored via 530 ``pip install --target dependencies``. Multi-file integrations 531 that use ``actions/`` sub-packages should pass an explicit path 532 (e.g. ``Integration.load("config.json")``). 533 534 Returns: 535 A fully initialised ``Integration`` ready for handler registration. 536 537 Raises: 538 ConfigurationError: If the file is missing or contains invalid JSON. 539 """ 540 if config_path is None: 541 config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'config.json') 542 543 config_path = Path(config_path) 544 545 if not config_path.exists(): 546 raise ConfigurationError(f"Configuration file not found: {config_path}") 547 548 try: 549 with open(config_path, 'r') as f: 550 config_data = json.load(f) 551 except json.JSONDecodeError as e: 552 raise ConfigurationError(f"Invalid JSON configuration: {e}") 553 554 # Parse configuration sections 555 actions = cls._parse_actions(config_data.get("actions", {})) 556 polling_triggers = cls._parse_polling_triggers(config_data.get("polling_triggers", {})) 557 558 config = IntegrationConfig( 559 name=config_data["name"], 560 version=config_data["version"], 561 description=config_data["description"], 562 auth=config_data.get("auth", {}), 563 actions=actions, 564 polling_triggers=polling_triggers 565 ) 566 567 return cls(config) 568 569 @staticmethod 570 def _parse_interval(interval_str: str) -> timedelta: 571 """Parse interval string into timedelta""" 572 unit = interval_str[-1].lower() 573 value = int(interval_str[:-1]) 574 575 if unit == 's': 576 return timedelta(seconds=value) 577 elif unit == 'm': 578 return timedelta(minutes=value) 579 elif unit == 'h': 580 return timedelta(hours=value) 581 elif unit == 'd': 582 return timedelta(days=value) 583 else: 584 raise ConfigurationError(f"Invalid interval format: {interval_str}") 585 586 @classmethod 587 def _parse_actions(cls, actions_config: Dict[str, Any]) -> Dict[str, Action]: 588 """Parse action configurations""" 589 actions = {} 590 for name, data in actions_config.items(): 591 actions[name] = Action( 592 name=name, 593 description=data["description"], 594 input_schema=data["input_schema"], 595 output_schema=data["output_schema"] 596 ) 597 598 return actions 599 600 @classmethod 601 def _parse_polling_triggers(cls, triggers_config: Dict[str, Any]) -> Dict[str, PollingTrigger]: 602 """Parse polling trigger configurations""" 603 triggers = {} 604 for name, data in triggers_config.items(): 605 interval = cls._parse_interval(data["polling_interval"]) 606 607 triggers[name] = PollingTrigger( 608 name=name, 609 description=data["description"], 610 polling_interval=interval, 611 input_schema=data["input_schema"], 612 output_schema=data["output_schema"] 613 ) 614 615 return triggers 616 617 def action(self, name: str): 618 """Decorator to register an action handler. 619 620 Args: 621 name: Name of the action to register 622 623 Returns: 624 Decorator function 625 626 Raises: 627 ConfigurationError: If action is not defined in config 628 629 Example: 630 ```python 631 @integration.action("my_action") 632 class MyActionHandler(ActionHandler): 633 async def execute(self, inputs, context): 634 # Implementation 635 return result 636 ``` 637 """ 638 def decorator(handler_class: Type[ActionHandler]): 639 if name not in self.config.actions: 640 raise ConfigurationError(f"Action '{name}' not defined in config") 641 self._action_handlers[name] = handler_class 642 return handler_class 643 return decorator 644 645 def polling_trigger(self, name: str): 646 """Decorator to register a polling trigger handler 647 648 Args: 649 name: Name of the polling trigger to register 650 651 Returns: 652 Decorator function 653 654 Raises: 655 ConfigurationError: If polling trigger is not defined in config 656 657 Example: 658 ```python 659 @integration.polling_trigger("my_polling_trigger") 660 class MyPollingTriggerHandler(PollingTriggerHandler): 661 async def poll(self, inputs, last_poll_ts, context): 662 # Implementation 663 return result 664 ``` 665 """ 666 def decorator(handler_class: Type[PollingTriggerHandler]): 667 if name not in self.config.polling_triggers: 668 raise ConfigurationError(f"Polling trigger '{name}' not defined in config") 669 self._polling_handlers[name] = handler_class 670 return handler_class 671 return decorator 672 673 def connected_account(self): 674 """Decorator to register a connected account handler 675 676 Returns: 677 Decorator function 678 679 Example: 680 ```python 681 @integration.connected_account() 682 class MyConnectedAccountHandler(ConnectedAccountHandler): 683 async def get_account_info(self, context): 684 # Implementation 685 return {"email": "user@example.com", "name": "John Doe"} 686 ``` 687 """ 688 def decorator(handler_class: Type[ConnectedAccountHandler]): 689 self._connected_account_handler = handler_class 690 return handler_class 691 return decorator 692 693 async def execute_action(self, 694 name: str, 695 inputs: Dict[str, Any], 696 context: ExecutionContext) -> IntegrationResult: 697 """Execute a registered action. 698 699 Args: 700 name: Name of the action to execute 701 inputs: Action inputs 702 context: Execution context 703 704 Returns: 705 IntegrationResult with action data and optional billing information 706 707 Raises: 708 ValidationError: If inputs or outputs don't match schema, or if handler doesn't return ActionResult 709 """ 710 if name not in self._action_handlers: 711 raise ValidationError(f"Action '{name}' not registered") 712 713 # Validate inputs against action schema 714 action_config = self.config.actions[name] 715 validator = Draft7Validator(action_config.input_schema) 716 errors = sorted(validator.iter_errors(inputs), key=lambda e: e.path) 717 if errors: 718 message = "" 719 for error in errors: 720 message += f"{list(error.schema_path)}, {error.message},\n " 721 raise ValidationError(message, action_config.input_schema, inputs) 722 723 if "fields" in self.config.auth: 724 auth_config = self.config.auth["fields"] 725 validator = Draft7Validator(auth_config) 726 errors = sorted(validator.iter_errors(context.auth), key=lambda e: e.path) 727 if errors: 728 message = "" 729 for error in errors: 730 message += f"{list(error.schema_path)}, {error.message},\n " 731 raise ValidationError(message, auth_config, context.auth) 732 733 # Create handler instance and execute 734 handler = self._action_handlers[name]() 735 result = await handler.execute(inputs, context) 736 737 # Validate that result is ActionResult 738 if not isinstance(result, ActionResult): 739 raise ValidationError( 740 f"Action handler '{name}' must return ActionResult, got {type(result).__name__}" 741 ) 742 743 # Validate output schema against the data inside ActionResult 744 validator = Draft7Validator(action_config.output_schema) 745 errors = sorted(validator.iter_errors(result.data), key=lambda e: e.path) 746 if errors: 747 message = "" 748 for error in errors: 749 message += f"{list(error.schema_path)}, {error.message},\n " 750 raise ValidationError(message, action_config.output_schema, result.data) 751 752 # Return IntegrationResult with ActionResult directly 753 return IntegrationResult( 754 version=__version__, 755 type=ResultType.ACTION, 756 result=result 757 ) 758 759 async def execute_polling_trigger(self, 760 name: str, 761 inputs: Dict[str, Any], 762 last_poll_ts: Optional[str], 763 context: ExecutionContext) -> List[Dict[str, Any]]: 764 """Execute a registered polling trigger 765 766 Args: 767 name: Name of the polling trigger to execute 768 inputs: Trigger inputs 769 last_poll_ts: Last poll timestamp 770 context: Execution context 771 772 Returns: 773 List of records 774 775 Raises: 776 ValidationError: If inputs or outputs don't match schema 777 """ 778 if name not in self._polling_handlers: 779 raise ValidationError(f"Polling trigger '{name}' not registered") 780 781 # Validate trigger configuration 782 trigger_config = self.config.polling_triggers[name] 783 try: 784 validate(inputs, trigger_config.input_schema) 785 except Exception as e: 786 raise ValidationError(e.message, e.schema, e.instance) 787 788 try: 789 auth_config = self.config.auth["fields"] 790 validate(context.auth, auth_config) 791 except Exception as e: 792 raise ValidationError(e.message, e.schema, e.instance) 793 794 # Create handler instance and execute 795 handler = self._polling_handlers[name]() 796 records = await handler.poll(inputs, last_poll_ts, context) 797 # Validate each record 798 for record in records: 799 if "id" not in record: 800 raise ValidationError( 801 f"Polling trigger '{name}' returned record without required 'id' field") 802 if "data" not in record: 803 raise ValidationError( 804 f"Polling trigger '{name}' returned record without required 'data' field") 805 806 # Validate record data against output schema 807 try: 808 validate(record["data"], trigger_config.output_schema) 809 except Exception as e: 810 raise ValidationError(e.message, e.schema, e.instance) 811 812 return records 813 814 async def get_connected_account(self, context: ExecutionContext) -> IntegrationResult: 815 """Get connected account information 816 817 Args: 818 context: Execution context 819 820 Returns: 821 IntegrationResult containing connected account data 822 823 Raises: 824 ValidationError: If no connected account handler is registered or auth is invalid 825 """ 826 if not self._connected_account_handler: 827 raise ValidationError("No connected account handler registered") 828 829 if "fields" in self.config.auth: 830 auth_config = self.config.auth["fields"] 831 validator = Draft7Validator(auth_config) 832 errors = sorted(validator.iter_errors(context.auth), key=lambda e: e.path) 833 if errors: 834 message = "" 835 for error in errors: 836 message += f"{list(error.schema_path)}, {error.message},\n " 837 raise ValidationError(message, auth_config, context.auth) 838 839 handler = self._connected_account_handler() 840 account_info = await handler.get_account_info(context) 841 842 if not isinstance(account_info, ConnectedAccountInfo): 843 raise ValidationError( 844 f"Connected account handler must return ConnectedAccountInfo, got {type(account_info).__name__}" 845 ) 846 847 # Return IntegrationResult with ConnectedAccountInfo object directly 848 return IntegrationResult( 849 version=__version__, 850 type=ResultType.CONNECTED_ACCOUNT, 851 result=account_info 852 )
Base integration class with handler registration and execution.
This class manages the integration configuration, handler registration, and provides methods to execute actions and triggers.
Args: config: Integration configuration
Attributes: config: Integration configuration
512 def __init__(self, config: IntegrationConfig): 513 self.config = config 514 """Integration configuration""" 515 self._action_handlers: Dict[str, Type[ActionHandler]] = {} 516 """Action handlers""" 517 self._polling_handlers: Dict[str, Type[PollingTriggerHandler]] = {} 518 """Polling handlers""" 519 self._connected_account_handler: Optional[Type[ConnectedAccountHandler]] = None 520 """Connected account handler"""
522 @classmethod 523 def load(cls, config_path: Union[str, Path] = None) -> 'Integration': 524 """Load an integration from its ``config.json``. 525 526 Args: 527 config_path: Explicit path to ``config.json``. When omitted the 528 SDK resolves the path relative to its own package location, 529 which works when the SDK is vendored via 530 ``pip install --target dependencies``. Multi-file integrations 531 that use ``actions/`` sub-packages should pass an explicit path 532 (e.g. ``Integration.load("config.json")``). 533 534 Returns: 535 A fully initialised ``Integration`` ready for handler registration. 536 537 Raises: 538 ConfigurationError: If the file is missing or contains invalid JSON. 539 """ 540 if config_path is None: 541 config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'config.json') 542 543 config_path = Path(config_path) 544 545 if not config_path.exists(): 546 raise ConfigurationError(f"Configuration file not found: {config_path}") 547 548 try: 549 with open(config_path, 'r') as f: 550 config_data = json.load(f) 551 except json.JSONDecodeError as e: 552 raise ConfigurationError(f"Invalid JSON configuration: {e}") 553 554 # Parse configuration sections 555 actions = cls._parse_actions(config_data.get("actions", {})) 556 polling_triggers = cls._parse_polling_triggers(config_data.get("polling_triggers", {})) 557 558 config = IntegrationConfig( 559 name=config_data["name"], 560 version=config_data["version"], 561 description=config_data["description"], 562 auth=config_data.get("auth", {}), 563 actions=actions, 564 polling_triggers=polling_triggers 565 ) 566 567 return cls(config)
Load an integration from its config.json.
Args:
config_path: Explicit path to config.json. When omitted the
SDK resolves the path relative to its own package location,
which works when the SDK is vendored via
pip install --target dependencies. Multi-file integrations
that use actions/ sub-packages should pass an explicit path
(e.g. Integration.load("config.json")).
Returns:
A fully initialised Integration ready for handler registration.
Raises: ConfigurationError: If the file is missing or contains invalid JSON.
617 def action(self, name: str): 618 """Decorator to register an action handler. 619 620 Args: 621 name: Name of the action to register 622 623 Returns: 624 Decorator function 625 626 Raises: 627 ConfigurationError: If action is not defined in config 628 629 Example: 630 ```python 631 @integration.action("my_action") 632 class MyActionHandler(ActionHandler): 633 async def execute(self, inputs, context): 634 # Implementation 635 return result 636 ``` 637 """ 638 def decorator(handler_class: Type[ActionHandler]): 639 if name not in self.config.actions: 640 raise ConfigurationError(f"Action '{name}' not defined in config") 641 self._action_handlers[name] = handler_class 642 return handler_class 643 return decorator
Decorator to register an action handler.
Args: name: Name of the action to register
Returns: Decorator function
Raises: ConfigurationError: If action is not defined in config
Example:
@integration.action("my_action")
class MyActionHandler(ActionHandler):
async def execute(self, inputs, context):
# Implementation
return result
645 def polling_trigger(self, name: str): 646 """Decorator to register a polling trigger handler 647 648 Args: 649 name: Name of the polling trigger to register 650 651 Returns: 652 Decorator function 653 654 Raises: 655 ConfigurationError: If polling trigger is not defined in config 656 657 Example: 658 ```python 659 @integration.polling_trigger("my_polling_trigger") 660 class MyPollingTriggerHandler(PollingTriggerHandler): 661 async def poll(self, inputs, last_poll_ts, context): 662 # Implementation 663 return result 664 ``` 665 """ 666 def decorator(handler_class: Type[PollingTriggerHandler]): 667 if name not in self.config.polling_triggers: 668 raise ConfigurationError(f"Polling trigger '{name}' not defined in config") 669 self._polling_handlers[name] = handler_class 670 return handler_class 671 return decorator
Decorator to register a polling trigger handler
Args: name: Name of the polling trigger to register
Returns: Decorator function
Raises: ConfigurationError: If polling trigger is not defined in config
Example:
@integration.polling_trigger("my_polling_trigger")
class MyPollingTriggerHandler(PollingTriggerHandler):
async def poll(self, inputs, last_poll_ts, context):
# Implementation
return result
673 def connected_account(self): 674 """Decorator to register a connected account handler 675 676 Returns: 677 Decorator function 678 679 Example: 680 ```python 681 @integration.connected_account() 682 class MyConnectedAccountHandler(ConnectedAccountHandler): 683 async def get_account_info(self, context): 684 # Implementation 685 return {"email": "user@example.com", "name": "John Doe"} 686 ``` 687 """ 688 def decorator(handler_class: Type[ConnectedAccountHandler]): 689 self._connected_account_handler = handler_class 690 return handler_class 691 return decorator
Decorator to register a connected account handler
Returns: Decorator function
Example:
@integration.connected_account()
class MyConnectedAccountHandler(ConnectedAccountHandler):
async def get_account_info(self, context):
# Implementation
return {"email": "user@example.com", "name": "John Doe"}
693 async def execute_action(self, 694 name: str, 695 inputs: Dict[str, Any], 696 context: ExecutionContext) -> IntegrationResult: 697 """Execute a registered action. 698 699 Args: 700 name: Name of the action to execute 701 inputs: Action inputs 702 context: Execution context 703 704 Returns: 705 IntegrationResult with action data and optional billing information 706 707 Raises: 708 ValidationError: If inputs or outputs don't match schema, or if handler doesn't return ActionResult 709 """ 710 if name not in self._action_handlers: 711 raise ValidationError(f"Action '{name}' not registered") 712 713 # Validate inputs against action schema 714 action_config = self.config.actions[name] 715 validator = Draft7Validator(action_config.input_schema) 716 errors = sorted(validator.iter_errors(inputs), key=lambda e: e.path) 717 if errors: 718 message = "" 719 for error in errors: 720 message += f"{list(error.schema_path)}, {error.message},\n " 721 raise ValidationError(message, action_config.input_schema, inputs) 722 723 if "fields" in self.config.auth: 724 auth_config = self.config.auth["fields"] 725 validator = Draft7Validator(auth_config) 726 errors = sorted(validator.iter_errors(context.auth), key=lambda e: e.path) 727 if errors: 728 message = "" 729 for error in errors: 730 message += f"{list(error.schema_path)}, {error.message},\n " 731 raise ValidationError(message, auth_config, context.auth) 732 733 # Create handler instance and execute 734 handler = self._action_handlers[name]() 735 result = await handler.execute(inputs, context) 736 737 # Validate that result is ActionResult 738 if not isinstance(result, ActionResult): 739 raise ValidationError( 740 f"Action handler '{name}' must return ActionResult, got {type(result).__name__}" 741 ) 742 743 # Validate output schema against the data inside ActionResult 744 validator = Draft7Validator(action_config.output_schema) 745 errors = sorted(validator.iter_errors(result.data), key=lambda e: e.path) 746 if errors: 747 message = "" 748 for error in errors: 749 message += f"{list(error.schema_path)}, {error.message},\n " 750 raise ValidationError(message, action_config.output_schema, result.data) 751 752 # Return IntegrationResult with ActionResult directly 753 return IntegrationResult( 754 version=__version__, 755 type=ResultType.ACTION, 756 result=result 757 )
Execute a registered action.
Args: name: Name of the action to execute inputs: Action inputs context: Execution context
Returns: IntegrationResult with action data and optional billing information
Raises: ValidationError: If inputs or outputs don't match schema, or if handler doesn't return ActionResult
759 async def execute_polling_trigger(self, 760 name: str, 761 inputs: Dict[str, Any], 762 last_poll_ts: Optional[str], 763 context: ExecutionContext) -> List[Dict[str, Any]]: 764 """Execute a registered polling trigger 765 766 Args: 767 name: Name of the polling trigger to execute 768 inputs: Trigger inputs 769 last_poll_ts: Last poll timestamp 770 context: Execution context 771 772 Returns: 773 List of records 774 775 Raises: 776 ValidationError: If inputs or outputs don't match schema 777 """ 778 if name not in self._polling_handlers: 779 raise ValidationError(f"Polling trigger '{name}' not registered") 780 781 # Validate trigger configuration 782 trigger_config = self.config.polling_triggers[name] 783 try: 784 validate(inputs, trigger_config.input_schema) 785 except Exception as e: 786 raise ValidationError(e.message, e.schema, e.instance) 787 788 try: 789 auth_config = self.config.auth["fields"] 790 validate(context.auth, auth_config) 791 except Exception as e: 792 raise ValidationError(e.message, e.schema, e.instance) 793 794 # Create handler instance and execute 795 handler = self._polling_handlers[name]() 796 records = await handler.poll(inputs, last_poll_ts, context) 797 # Validate each record 798 for record in records: 799 if "id" not in record: 800 raise ValidationError( 801 f"Polling trigger '{name}' returned record without required 'id' field") 802 if "data" not in record: 803 raise ValidationError( 804 f"Polling trigger '{name}' returned record without required 'data' field") 805 806 # Validate record data against output schema 807 try: 808 validate(record["data"], trigger_config.output_schema) 809 except Exception as e: 810 raise ValidationError(e.message, e.schema, e.instance) 811 812 return records
Execute a registered polling trigger
Args: name: Name of the polling trigger to execute inputs: Trigger inputs last_poll_ts: Last poll timestamp context: Execution context
Returns: List of records
Raises: ValidationError: If inputs or outputs don't match schema
814 async def get_connected_account(self, context: ExecutionContext) -> IntegrationResult: 815 """Get connected account information 816 817 Args: 818 context: Execution context 819 820 Returns: 821 IntegrationResult containing connected account data 822 823 Raises: 824 ValidationError: If no connected account handler is registered or auth is invalid 825 """ 826 if not self._connected_account_handler: 827 raise ValidationError("No connected account handler registered") 828 829 if "fields" in self.config.auth: 830 auth_config = self.config.auth["fields"] 831 validator = Draft7Validator(auth_config) 832 errors = sorted(validator.iter_errors(context.auth), key=lambda e: e.path) 833 if errors: 834 message = "" 835 for error in errors: 836 message += f"{list(error.schema_path)}, {error.message},\n " 837 raise ValidationError(message, auth_config, context.auth) 838 839 handler = self._connected_account_handler() 840 account_info = await handler.get_account_info(context) 841 842 if not isinstance(account_info, ConnectedAccountInfo): 843 raise ValidationError( 844 f"Connected account handler must return ConnectedAccountInfo, got {type(account_info).__name__}" 845 ) 846 847 # Return IntegrationResult with ConnectedAccountInfo object directly 848 return IntegrationResult( 849 version=__version__, 850 type=ResultType.CONNECTED_ACCOUNT, 851 result=account_info 852 )
Get connected account information
Args: context: Execution context
Returns: IntegrationResult containing connected account data
Raises: ValidationError: If no connected account handler is registered or auth is invalid