Source code for subscriptionkore.adapters.paddle.adapter

"""Paddle payment provider adapter."""

from __future__ import annotations

import hashlib
import hmac
import json
from datetime import datetime
from decimal import Decimal
from typing import Any

import httpx
import structlog
from tenacity import (
    retry,
    retry_if_exception_type,
    stop_after_attempt,
    wait_exponential_jitter,
)

from subscriptionkore.config import PaddleConfig
from subscriptionkore.core.exceptions import (
    ProviderAPIError,
    ProviderAuthenticationError,
    ProviderNetworkError,
    ProviderRateLimitError,
)
from subscriptionkore.core.models import (
    Customer,
    Invoice,
    InvoiceStatus,
    Plan,
    Product,
    ProviderReference,
    ProviderType,
    Subscription,
    SubscriptionStatus,
)
from subscriptionkore.core.models.customer import Address
from subscriptionkore.core.models.invoice import InvoiceLineItem
from subscriptionkore.core.models.subscription import (
    AppliedDiscount,
    PauseBehavior,
    PauseConfig,
)
from subscriptionkore.core.models.value_objects import (
    BillingPeriod,
    Currency,
    DateRange,
    Interval,
    Money,
)
from subscriptionkore.ports.provider import (
    ChangePlanRequest,
    ChangePreview,
    CheckoutRequest,
    CheckoutSession,
    CreateSubscriptionRequest,
    DiscountRequest,
    PaymentProviderPort,
    PortalSession,
    ProrationBehavior,
    ProviderCapabilities,
    ProviderWebhookEvent,
    UpdateSubscriptionRequest,
)

logger = structlog.get_logger()


[docs] class PaddleAdapter(PaymentProviderPort): """ Paddle Billing payment provider implementation. Implements the Paddle Billing API (v1) for subscriptionkore management. Reference: https://developer.paddle.com/api-reference/overview """ def __init__(self, config: PaddleConfig) -> None: self._config = config self._client: httpx.AsyncClient | None = None # Set base URL based on environment if config.environment == "sandbox": self._base_url = "https://sandbox-api.paddle.com" else: self._base_url = "https://api.paddle.com" async def _get_client(self) -> httpx.AsyncClient: if self._client is None: self._client = httpx.AsyncClient( base_url=self._base_url, headers={ "Authorization": f"Bearer {self._config.api_key}", "Content-Type": "application/json", }, timeout=30.0, ) return self._client
[docs] async def close(self) -> None: if self._client: await self._client.aclose() self._client = None
@property def provider_type(self) -> ProviderType: return ProviderType.PADDLE @property def capabilities(self) -> ProviderCapabilities: return ProviderCapabilities( supports_pausing=True, supports_trials=True, supports_quantity=True, supports_immediate_cancel=True, supports_proration=True, supports_coupons=True, supports_metered_billing=True, supports_customer_portal=True, supports_checkout_sessions=True, ) def _handle_error(self, response: httpx.Response) -> None: """Handle Paddle API errors.""" if response.status_code == 429: retry_after = response.headers.get("Retry-After") raise ProviderRateLimitError( provider="paddle", retry_after=int(retry_after) if retry_after else None, ) if response.status_code == 401: raise ProviderAuthenticationError(provider="paddle") if response.status_code >= 400: try: error_data = response.json().get("error", {}) raise ProviderAPIError( message=error_data.get("detail", "Unknown Paddle error"), provider="paddle", status_code=response.status_code, provider_message=error_data.get("detail"), provider_code=error_data.get("code"), ) except json.JSONDecodeError: raise ProviderAPIError( message=f"Paddle API error: {response.status_code}", provider="paddle", status_code=response.status_code, ) @retry( retry=retry_if_exception_type((ProviderRateLimitError, ProviderNetworkError)), stop=stop_after_attempt(3), wait=wait_exponential_jitter(initial=1, max=60), ) async def _request( self, method: str, endpoint: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None, ) -> dict[str, Any]: """Make authenticated request to Paddle API.""" client = await self._get_client() try: response = await client.request( method=method, url=endpoint, json=data, params=params, ) except httpx.NetworkError as e: raise ProviderNetworkError(provider="paddle", original_error=e) from e self._handle_error(response) return response.json() # Status mapping STATUS_MAP = { "active": SubscriptionStatus.ACTIVE, "trialing": SubscriptionStatus.TRIALING, "past_due": SubscriptionStatus.PAST_DUE, "paused": SubscriptionStatus.PAUSED, "canceled": SubscriptionStatus.CANCELED, } # Customer Operations
[docs] async def create_customer(self, customer: Customer) -> ProviderReference: data: dict[str, Any] = { "email": customer.email, "custom_data": { "subscriptionkore_id": customer.id, "external_id": customer.external_id, }, } if customer.name: data["name"] = customer.name result = await self._request("POST", "/customers", data=data) customer_data = result.get("data", {}) return ProviderReference( provider=ProviderType.PADDLE, external_id=customer_data["id"], metadata={"email": customer_data.get("email")}, )
[docs] async def update_customer(self, customer: Customer) -> None: provider_ref = customer.get_provider_ref("paddle") if provider_ref is None: raise ProviderAPIError( message="Customer has no Paddle reference", provider="paddle", status_code=400, ) data: dict[str, Any] = {"email": customer.email} if customer.name: data["name"] = customer.name await self._request("PATCH", f"/customers/{provider_ref.external_id}", data=data)
[docs] async def delete_customer(self, provider_ref: ProviderReference) -> None: # Paddle doesn't support customer deletion, but we can update to anonymize await self._request( "PATCH", f"/customers/{provider_ref.external_id}", data={ "name": "Deleted Customer", "email": f"deleted_{provider_ref.external_id}@example.com", }, )
[docs] async def get_customer(self, provider_ref: ProviderReference) -> Customer: result = await self._request("GET", f"/customers/{provider_ref.external_id}") data = result.get("data", {}) custom_data = data.get("custom_data", {}) return Customer( id=custom_data.get("subscriptionkore_id", ""), external_id=custom_data.get("external_id", ""), email=data.get("email", ""), name=data.get("name"), provider_refs=[provider_ref], created_at=datetime.fromisoformat(data["created_at"].replace("Z", "+00:00")), )
# Subscription Operations
[docs] async def create_subscriptionkore( self, request: CreateSubscriptionRequest, customer_provider_ref: ProviderReference, plan_provider_ref: ProviderReference, ) -> Subscription: """ Create a subscriptionkore in Paddle. Note: Paddle subscriptionkores are typically created through checkout. This method creates a subscriptionkore directly via API. """ data: dict[str, Any] = { "customer_id": customer_provider_ref.external_id, "items": [ { "price_id": plan_provider_ref.external_id, "quantity": request.quantity, } ], "custom_data": { "subscriptionkore_customer_id": request.customer_id, "subscriptionkore_plan_id": request.plan_id, }, } if request.trial_period_days: # Calculate trial end date from datetime import timedelta trial_end = datetime.utcnow() + timedelta(days=request.trial_period_days) data["billing_cycle"] = { "interval": "month", "frequency": 1, } # Paddle handles trials through the price configuration if request.coupon_code: data["discount_id"] = request.coupon_code for key, value in request.metadata.items(): data["custom_data"][key] = str(value) result = await self._request("POST", "/subscriptionkores", data=data) return self._map_subscriptionkore( result.get("data", {}), request.customer_id, request.plan_id, )
[docs] async def update_subscriptionkore( self, request: UpdateSubscriptionRequest, subscriptionkore_provider_ref: ProviderReference, ) -> Subscription: data: dict[str, Any] = {} if request.quantity is not None: # Get current subscriptionkore to find item sub = await self._request( "GET", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}", ) items = sub.get("data", {}).get("items", []) if items: data["items"] = [ { "price_id": items[0]["price"]["id"], "quantity": request.quantity, } ] if request.cancel_at_period_end is not None: if request.cancel_at_period_end: data["scheduled_change"] = {"action": "cancel"} else: # Remove scheduled cancellation pass if request.metadata: data["custom_data"] = request.metadata result = await self._request( "PATCH", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}", data=data, ) return self._map_subscriptionkore(result.get("data", {}), "", "")
[docs] async def cancel_subscriptionkore( self, subscriptionkore_provider_ref: ProviderReference, immediate: bool = False, ) -> Subscription: if immediate: data = {"effective_from": "immediately"} else: data = {"effective_from": "next_billing_period"} result = await self._request( "POST", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}/cancel", data=data, ) return self._map_subscriptionkore(result.get("data", {}), "", "")
[docs] async def pause_subscriptionkore( self, subscriptionkore_provider_ref: ProviderReference, resumes_at: datetime | None = None, ) -> Subscription: data: dict[str, Any] = {"effective_from": "next_billing_period"} if resumes_at: data["resume_at"] = resumes_at.isoformat() + "Z" result = await self._request( "POST", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}/pause", data=data, ) return self._map_subscriptionkore(result.get("data", {}), "", "")
[docs] async def resume_subscriptionkore( self, subscriptionkore_provider_ref: ProviderReference, ) -> Subscription: result = await self._request( "POST", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}/resume", data={"effective_from": "immediately"}, ) return self._map_subscriptionkore(result.get("data", {}), "", "")
[docs] async def get_subscriptionkore( self, provider_ref: ProviderReference, ) -> Subscription: result = await self._request("GET", f"/subscriptionkores/{provider_ref.external_id}") return self._map_subscriptionkore(result.get("data", {}), "", "")
# Plan Change Operations
[docs] async def change_plan( self, request: ChangePlanRequest, subscriptionkore_provider_ref: ProviderReference, new_plan_provider_ref: ProviderReference, ) -> Subscription: proration_map = { ProrationBehavior.CREATE_PRORATIONS: "prorated_immediately", ProrationBehavior.NONE: "full_immediately", ProrationBehavior.ALWAYS_INVOICE: "prorated_immediately", } data = { "items": [ { "price_id": new_plan_provider_ref.external_id, "quantity": 1, } ], "proration_billing_mode": proration_map.get( request.proration_behavior, "prorated_immediately" ), } result = await self._request( "PATCH", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}", data=data, ) return self._map_subscriptionkore(result.get("data", {}), "", request.new_plan_id)
[docs] async def preview_plan_change( self, request: ChangePlanRequest, subscriptionkore_provider_ref: ProviderReference, new_plan_provider_ref: ProviderReference, ) -> ChangePreview: data = { "items": [ { "price_id": new_plan_provider_ref.external_id, "quantity": 1, } ], "proration_billing_mode": "prorated_immediately", } result = await self._request( "POST", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}/preview", data=data, ) preview_data = result.get("data", {}) immediate_transaction = preview_data.get("immediate_transaction", {}) currency = Currency( immediate_transaction.get("details", {}).get("totals", {}).get("currency_code", "USD") ) totals = immediate_transaction.get("details", {}).get("totals", {}) return ChangePreview( immediate_charge=Money( amount=Decimal(totals.get("grand_total", "0")) / 100, currency=currency, ), next_invoice_amount=Money( amount=Decimal( preview_data.get("recurring_transaction_details", {}) .get("totals", {}) .get("grand_total", "0") ) / 100, currency=currency, ), proration_amount=Money( amount=Decimal(totals.get("proration", "0")) / 100, currency=currency, ), credit_amount=Money( amount=Decimal(totals.get("credit", "0")) / 100, currency=currency, ), next_billing_date=datetime.fromisoformat( preview_data.get("next_billed_at", datetime.utcnow().isoformat()).replace( "Z", "+00:00" ) ), )
# Discount Operations
[docs] async def apply_discount( self, subscriptionkore_provider_ref: ProviderReference, discount: DiscountRequest, ) -> Subscription: data: dict[str, Any] = {} if discount.coupon_code: data["discount_id"] = discount.coupon_code elif discount.promotion_code: data["discount_id"] = discount.promotion_code result = await self._request( "PATCH", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}", data=data, ) return self._map_subscriptionkore(result.get("data", {}), "", "")
[docs] async def remove_discount( self, subscriptionkore_provider_ref: ProviderReference, ) -> Subscription: result = await self._request( "PATCH", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}", data={"discount_id": None}, ) return self._map_subscriptionkore(result.get("data", {}), "", "")
# Billing Operations
[docs] async def get_invoice(self, provider_ref: ProviderReference) -> Invoice: result = await self._request("GET", f"/transactions/{provider_ref.external_id}") return self._map_transaction_to_invoice(result.get("data", {}))
[docs] async def list_invoices( self, customer_provider_ref: ProviderReference, limit: int = 10, starting_after: str | None = None, ) -> list[Invoice]: params: dict[str, Any] = { "customer_id": customer_provider_ref.external_id, "per_page": limit, "status": "completed,billed", } if starting_after: params["after"] = starting_after result = await self._request("GET", "/transactions", params=params) return [self._map_transaction_to_invoice(txn) for txn in result.get("data", [])]
[docs] async def get_upcoming_invoice( self, subscriptionkore_provider_ref: ProviderReference, ) -> Invoice | None: try: result = await self._request( "GET", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}/upcoming-invoice", ) return self._map_transaction_to_invoice(result.get("data", {})) except ProviderAPIError as e: if e.status_code == 404: return None raise
# Checkout / Portal
[docs] async def create_checkout_session( self, request: CheckoutRequest, plan_provider_ref: ProviderReference, customer_provider_ref: ProviderReference | None = None, ) -> CheckoutSession: data: dict[str, Any] = { "items": [ { "price_id": plan_provider_ref.external_id, "quantity": request.quantity, } ], "settings": { "success_url": request.success_url, }, "custom_data": request.metadata, } if customer_provider_ref: data["customer_id"] = customer_provider_ref.external_id elif request.customer_email: data["customer"] = {"email": request.customer_email} if request.trial_period_days: data["items"][0]["trial_period"] = { "interval": "day", "frequency": request.trial_period_days, } result = await self._request("POST", "/transactions", data=data) txn_data = result.get("data", {}) checkout_url = txn_data.get("checkout", {}).get("url", "") return CheckoutSession( id=txn_data["id"], url=checkout_url, expires_at=datetime.utcnow(), # Paddle doesn't provide expiration )
[docs] async def create_portal_session( self, customer_provider_ref: ProviderReference, return_url: str, ) -> PortalSession: # Paddle uses a different approach - customer portal URL # The portal URL is static per environment if self._config.environment == "sandbox": portal_base = "https://sandbox-customer-portal.paddle.com" else: portal_base = "https://customer-portal.paddle. com" # Generate a portal session result = await self._request( "POST", f"/customers/{customer_provider_ref.external_id}/portal-sessions", data={}, ) session_data = result.get("data", {}) return PortalSession( id=session_data.get("id", ""), url=session_data.get("urls", {}).get("general", {}).get("overview", portal_base), )
# Webhook Handling
[docs] async def verify_webhook( self, payload: bytes, headers: dict[str, str], ) -> bool: """Verify Paddle webhook signature.""" signature = headers.get("paddle-signature", "") if not signature: return False # Parse signature header # Format: ts=timestamp;h1=hash parts = dict(part.split("=", 1) for part in signature.split(";") if "=" in part) timestamp = parts.get("ts", "") received_hash = parts.get("h1", "") if not timestamp or not received_hash: return False # Build signed payload signed_payload = f"{timestamp}:{payload.decode('utf-8')}" # Compute expected signature expected_hash = hmac.new( self._config.webhook_secret.encode("utf-8"), signed_payload.encode("utf-8"), hashlib.sha256, ).hexdigest() return hmac.compare_digest(expected_hash, received_hash)
[docs] async def parse_webhook( self, payload: bytes, headers: dict[str, str], ) -> ProviderWebhookEvent: """Parse Paddle webhook payload.""" data = json.loads(payload) return ProviderWebhookEvent( provider=ProviderType.PADDLE, event_id=data.get("event_id", data.get("notification_id", "")), event_type=data.get("event_type", ""), occurred_at=datetime.fromisoformat( data.get("occurred_at", datetime.utcnow().isoformat()).replace("Z", "+00:00") ), data=data.get("data", {}), raw_payload=payload, )
# Sync Operations
[docs] async def sync_products(self) -> list[Product]: result = await self._request("GET", "/products", params={"status": "active"}) products = [] for prod in result.get("data", []): products.append( Product( id="", name=prod["name"], description=prod.get("description"), provider_refs=[ ProviderReference( provider=ProviderType.PADDLE, external_id=prod["id"], ) ], active=prod["status"] == "active", metadata=prod.get("custom_data", {}), created_at=datetime.fromisoformat(prod["created_at"].replace("Z", "+00:00")), ) ) return products
[docs] async def sync_plans(self, product_provider_ref: ProviderReference) -> list[Plan]: result = await self._request( "GET", "/prices", params={ "product_id": product_provider_ref.external_id, "status": "active", }, ) plans = [] for price in result.get("data", []): billing_cycle = price.get("billing_cycle", {}) interval = billing_cycle.get("interval", "month") interval_count = billing_cycle.get("frequency", 1) # Map Paddle intervals to our intervals interval_map = { "day": Interval.DAY, "week": Interval.WEEK, "month": Interval.MONTH, "year": Interval.YEAR, } unit_price = price.get("unit_price", {}) plans.append( Plan( id="", product_id="", name=price.get("description") or price["id"], provider_refs=[ ProviderReference( provider=ProviderType.PADDLE, external_id=price["id"], ) ], price=Money( amount=Decimal(unit_price.get("amount", "0")) / 100, currency=Currency(unit_price.get("currency_code", "USD")), ), billing_period=BillingPeriod( interval=interval_map.get(interval, Interval.MONTH), interval_count=interval_count, ), trial_period_days=price.get("trial_period", {}).get("frequency"), active=price["status"] == "active", metadata=price.get("custom_data", {}), created_at=datetime.fromisoformat(price["created_at"].replace("Z", "+00:00")), ) ) return plans
# Mapping Helpers def _map_subscriptionkore( self, data: dict[str, Any], customer_id: str, plan_id: str, ) -> Subscription: """Map Paddle subscriptionkore to domain model.""" status = self.STATUS_MAP.get(data.get("status", "active"), SubscriptionStatus.ACTIVE) # Handle pause pause_config = None if data.get("paused_at"): pause_config = PauseConfig( resumes_at=datetime.fromisoformat( data["scheduled_change"]["resume_at"].replace("Z", "+00:00") ) if data.get("scheduled_change", {}).get("resume_at") else None, behavior=PauseBehavior.VOID, ) # Handle discount discount = None if data.get("discount"): disc = data["discount"] discount = AppliedDiscount( discount_id=disc["id"], coupon_code=disc.get("code"), amount_off=Money( amount=Decimal(disc["amount"]) / 100, currency=Currency(data.get("currency_code", "USD")), ) if disc.get("amount") else None, percent_off=Decimal(str(disc["percentage"])) if disc.get("percentage") else None, valid_until=datetime.fromisoformat(disc["ends_at"].replace("Z", "+00:00")) if disc.get("ends_at") else None, ) # Get quantity from items quantity = 1 items = data.get("items", []) if items: quantity = items[0].get("quantity", 1) # Get trial end trial_end = None if data.get("current_billing_period", {}).get("trial_ends_at"): trial_end = datetime.fromisoformat( data["current_billing_period"]["trial_ends_at"].replace("Z", "+00:00") ) # Use custom_data for internal IDs custom_data = data.get("custom_data", {}) resolved_customer_id = customer_id or custom_data.get("subscriptionkore_customer_id", "") resolved_plan_id = plan_id or custom_data.get("subscriptionkore_plan_id", "") # Parse billing period current_period_start = datetime.utcnow() current_period_end = None if data.get("current_billing_period"): period = data["current_billing_period"] current_period_start = datetime.fromisoformat( period["starts_at"].replace("Z", "+00:00") ) current_period_end = datetime.fromisoformat(period["ends_at"].replace("Z", "+00:00")) return Subscription( id="", customer_id=resolved_customer_id, plan_id=resolved_plan_id, provider_ref=ProviderReference( provider=ProviderType.PADDLE, external_id=data["id"], metadata={"customer_id": data.get("customer_id")}, ), status=status, current_period=DateRange( start=current_period_start, end=current_period_end, ), trial_end=trial_end, cancel_at_period_end=data.get("scheduled_change", {}).get("action") == "cancel", canceled_at=datetime.fromisoformat(data["canceled_at"].replace("Z", "+00:00")) if data.get("canceled_at") else None, pause_collection=pause_config, discount=discount, quantity=quantity, metadata=custom_data, created_at=datetime.fromisoformat(data["created_at"].replace("Z", "+00:00")), ) def _map_transaction_to_invoice(self, data: dict[str, Any]) -> Invoice: """Map Paddle transaction to Invoice domain model.""" details = data.get("details", {}) totals = details.get("totals", {}) currency = Currency(totals.get("currency_code", "USD")) status_map = { "draft": InvoiceStatus.DRAFT, "ready": InvoiceStatus.OPEN, "billed": InvoiceStatus.OPEN, "paid": InvoiceStatus.PAID, "completed": InvoiceStatus.PAID, "canceled": InvoiceStatus.VOID, } # Map line items line_items = [] for item in details.get("line_items", []): line_items.append( InvoiceLineItem( id=item.get("id", ""), description=item.get("product", {}).get("name", ""), quantity=item.get("quantity", 1), unit_amount=Money( amount=Decimal(item.get("unit_totals", {}).get("subtotal", "0")) / 100, currency=currency, ), amount=Money( amount=Decimal(item.get("totals", {}).get("subtotal", "0")) / 100, currency=currency, ), proration=item.get("proration", False), ) ) return Invoice( id="", customer_id=data.get("customer_id", ""), subscriptionkore_id=data.get("subscriptionkore_id"), provider_ref=ProviderReference( provider=ProviderType.PADDLE, external_id=data.get("id", ""), ), status=status_map.get(data.get("status", "draft"), InvoiceStatus.DRAFT), subtotal=Money( amount=Decimal(totals.get("subtotal", "0")) / 100, currency=currency, ), tax=Money( amount=Decimal(totals.get("tax", "0")) / 100, currency=currency, ), discount_amount=Money( amount=Decimal(totals.get("discount", "0")) / 100, currency=currency, ), total=Money( amount=Decimal(totals.get("grand_total", "0")) / 100, currency=currency, ), amount_paid=Money( amount=Decimal(totals.get("grand_total", "0")) / 100 if data.get("status") in ("paid", "completed") else Decimal("0"), currency=currency, ), amount_due=Money( amount=Decimal(totals.get("grand_total", "0")) / 100 if data.get("status") not in ("paid", "completed") else Decimal("0"), currency=currency, ), currency=currency, line_items=line_items, invoice_pdf_url=data.get("checkout", {}).get("url"), metadata=data.get("custom_data", {}), created_at=datetime.fromisoformat( data.get("created_at", datetime.utcnow().isoformat()).replace("Z", "+00:00") ), )