Source code for subscriptionkore.adapters.lemonsqueezy.adapter

"""LemonSqueezy payment provider adapter."""

from __future__ import annotations

import hashlib
import hmac
import json
from datetime import datetime
from decimal import Decimal
from typing import Any

import httpx
import structlog
from tenacity import (
    retry,
    retry_if_exception_type,
    stop_after_attempt,
    wait_exponential_jitter,
)

from subscriptionkore.config import LemonSqueezyConfig
from subscriptionkore.core.exceptions import (
    ProviderAPIError,
    ProviderAuthenticationError,
    ProviderNetworkError,
    ProviderRateLimitError,
)
from subscriptionkore.core.models import (
    Customer,
    Invoice,
    InvoiceStatus,
    Plan,
    Product,
    ProviderReference,
    ProviderType,
    Subscription,
    SubscriptionStatus,
)
from subscriptionkore.core.models.invoice import InvoiceLineItem
from subscriptionkore.core.models.subscription import (
    AppliedDiscount,
    PauseBehavior,
    PauseConfig,
)
from subscriptionkore.core.models.value_objects import (
    BillingPeriod,
    Currency,
    DateRange,
    Interval,
    Money,
)
from subscriptionkore.ports.provider import (
    ChangePlanRequest,
    ChangePreview,
    CheckoutRequest,
    CheckoutSession,
    CreateSubscriptionRequest,
    DiscountRequest,
    PaymentProviderPort,
    PortalSession,
    ProviderCapabilities,
    ProviderWebhookEvent,
    UpdateSubscriptionRequest,
)

logger = structlog.get_logger()


[docs] class LemonSqueezyAdapter(PaymentProviderPort): """ LemonSqueezy payment provider implementation. Implements the LemonSqueezy API for subscriptionkore management. Reference: https://docs.lemonsqueezy.com/api """ BASE_URL = "https://api.lemonsqueezy.com/v1" STATUS_MAP = { "on_trial": SubscriptionStatus.TRIALING, "active": SubscriptionStatus.ACTIVE, "paused": SubscriptionStatus.PAUSED, "past_due": SubscriptionStatus.PAST_DUE, "unpaid": SubscriptionStatus.UNPAID, "cancelled": SubscriptionStatus.CANCELED, "expired": SubscriptionStatus.EXPIRED, } def __init__(self, config: LemonSqueezyConfig) -> None: self._config = config self._client: httpx.AsyncClient | None = None async def _get_client(self) -> httpx.AsyncClient: if self._client is None: self._client = httpx.AsyncClient( base_url=self.BASE_URL, headers={ "Authorization": f"Bearer {self._config.api_key}", "Accept": "application/vnd.api+json", "Content-Type": "application/vnd.api+json", }, timeout=30.0, ) return self._client
[docs] async def close(self) -> None: if self._client: await self._client.aclose() self._client = None
@property def provider_type(self) -> ProviderType: return ProviderType.LEMONSQUEEZY @property def capabilities(self) -> ProviderCapabilities: return ProviderCapabilities( supports_pausing=True, supports_trials=True, supports_quantity=True, supports_immediate_cancel=True, supports_proration=True, supports_coupons=True, supports_metered_billing=False, supports_customer_portal=True, supports_checkout_sessions=True, ) def _handle_error(self, response: httpx.Response) -> None: """Handle LemonSqueezy API errors.""" if response.status_code == 429: retry_after = response.headers.get("Retry-After") raise ProviderRateLimitError( provider="lemonsqueezy", retry_after=int(retry_after) if retry_after else None, ) if response.status_code == 401: raise ProviderAuthenticationError(provider="lemonsqueezy") if response.status_code >= 400: try: error_data = response.json() errors = error_data.get("errors", [{}]) first_error = errors[0] if errors else {} raise ProviderAPIError( message=first_error.get("detail", "Unknown LemonSqueezy error"), provider="lemonsqueezy", status_code=response.status_code, provider_message=first_error.get("detail"), provider_code=first_error.get("code"), ) except json.JSONDecodeError: raise ProviderAPIError( message=f"LemonSqueezy API error: {response.status_code}", provider="lemonsqueezy", status_code=response.status_code, ) @retry( retry=retry_if_exception_type((ProviderRateLimitError, ProviderNetworkError)), stop=stop_after_attempt(3), wait=wait_exponential_jitter(initial=1, max=60), ) async def _request( self, method: str, endpoint: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None, ) -> dict[str, Any]: """Make authenticated request to LemonSqueezy API.""" client = await self._get_client() try: response = await client.request( method=method, url=endpoint, json=data, params=params, ) except httpx.NetworkError as e: raise ProviderNetworkError(provider="lemonsqueezy", original_error=e) from e self._handle_error(response) return response.json() # Customer Operations
[docs] async def create_customer(self, customer: Customer) -> ProviderReference: data = { "data": { "type": "customers", "attributes": { "name": customer.name or customer.email.split("@")[0], "email": customer.email, }, "relationships": { "store": { "data": { "type": "stores", "id": self._config.store_id, } } }, } } result = await self._request("POST", "/customers", data=data) customer_data = result.get("data", {}) return ProviderReference( provider=ProviderType.LEMONSQUEEZY, external_id=str(customer_data["id"]), metadata={ "email": customer_data.get("attributes", {}).get("email"), "subscriptionkore_id": customer.id, "external_id": customer.external_id, }, )
[docs] async def update_customer(self, customer: Customer) -> None: provider_ref = customer.get_provider_ref("lemonsqueezy") if provider_ref is None: raise ProviderAPIError( message="Customer has no LemonSqueezy reference", provider="lemonsqueezy", status_code=400, ) data = { "data": { "type": "customers", "id": provider_ref.external_id, "attributes": { "name": customer.name, "email": customer.email, }, } } await self._request("PATCH", f"/customers/{provider_ref.external_id}", data=data)
[docs] async def delete_customer(self, provider_ref: ProviderReference) -> None: logger.warning( "LemonSqueezy does not support customer deletion", customer_id=provider_ref.external_id, )
[docs] async def get_customer(self, provider_ref: ProviderReference) -> Customer: result = await self._request("GET", f"/customers/{provider_ref.external_id}") data = result.get("data", {}) attrs = data.get("attributes", {}) return Customer( id=provider_ref.metadata.get("subscriptionkore_id", ""), external_id=provider_ref.metadata.get("external_id", ""), email=attrs.get("email", ""), name=attrs.get("name"), provider_refs=[provider_ref], created_at=datetime.fromisoformat( attrs.get("created_at", datetime.utcnow().isoformat()).replace("Z", "+00:00") ), )
# Subscription Operations
[docs] async def create_subscriptionkore( self, request: CreateSubscriptionRequest, customer_provider_ref: ProviderReference, plan_provider_ref: ProviderReference, ) -> Subscription: raise ProviderAPIError( message="LemonSqueezy requires checkout for subscriptionkore creation. Use create_checkout_session instead.", provider="lemonsqueezy", status_code=400, )
[docs] async def update_subscriptionkore( self, request: UpdateSubscriptionRequest, subscriptionkore_provider_ref: ProviderReference, ) -> Subscription: data: dict[str, Any] = { "data": { "type": "subscriptionkores", "id": subscriptionkore_provider_ref.external_id, "attributes": {}, } } if request.cancel_at_period_end is not None: data["data"]["attributes"]["cancelled"] = request.cancel_at_period_end result = await self._request( "PATCH", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}", data=data, ) return self._map_subscriptionkore(result.get("data", {}), "", "")
[docs] async def cancel_subscriptionkore( self, subscriptionkore_provider_ref: ProviderReference, immediate: bool = False, ) -> Subscription: if immediate: await self._request( "DELETE", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}", ) result = await self._request( "GET", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}", ) else: data = { "data": { "type": "subscriptionkores", "id": subscriptionkore_provider_ref.external_id, "attributes": { "cancelled": True, }, } } result = await self._request( "PATCH", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}", data=data, ) return self._map_subscriptionkore(result.get("data", {}), "", "")
[docs] async def pause_subscriptionkore( self, subscriptionkore_provider_ref: ProviderReference, resumes_at: datetime | None = None, ) -> Subscription: pause_data: dict[str, Any] = {"mode": "void"} if resumes_at: pause_data["resumes_at"] = resumes_at.isoformat() + "Z" data = { "data": { "type": "subscriptionkores", "id": subscriptionkore_provider_ref.external_id, "attributes": { "pause": pause_data, }, } } result = await self._request( "PATCH", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}", data=data, ) return self._map_subscriptionkore(result.get("data", {}), "", "")
[docs] async def resume_subscriptionkore( self, subscriptionkore_provider_ref: ProviderReference, ) -> Subscription: data = { "data": { "type": "subscriptionkores", "id": subscriptionkore_provider_ref.external_id, "attributes": { "pause": None, }, } } result = await self._request( "PATCH", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}", data=data, ) return self._map_subscriptionkore(result.get("data", {}), "", "")
[docs] async def get_subscriptionkore( self, provider_ref: ProviderReference, ) -> Subscription: result = await self._request("GET", f"/subscriptionkores/{provider_ref.external_id}") return self._map_subscriptionkore(result.get("data", {}), "", "")
# Plan Change Operations
[docs] async def change_plan( self, request: ChangePlanRequest, subscriptionkore_provider_ref: ProviderReference, new_plan_provider_ref: ProviderReference, ) -> Subscription: data = { "data": { "type": "subscriptionkores", "id": subscriptionkore_provider_ref.external_id, "attributes": { "variant_id": int(new_plan_provider_ref.external_id), "invoice_immediately": request.proration_behavior != "none", }, } } result = await self._request( "PATCH", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}", data=data, ) return self._map_subscriptionkore(result.get("data", {}), "", request.new_plan_id)
[docs] async def preview_plan_change( self, request: ChangePlanRequest, subscriptionkore_provider_ref: ProviderReference, new_plan_provider_ref: ProviderReference, ) -> ChangePreview: variant_result = await self._request( "GET", f"/variants/{new_plan_provider_ref.external_id}", ) variant = variant_result.get("data", {}).get("attributes", {}) price = Decimal(str(variant.get("price", 0))) / 100 currency = Currency("USD") return ChangePreview( immediate_charge=Money(amount=price, currency=currency), next_invoice_amount=Money(amount=price, currency=currency), proration_amount=Money.zero(currency), credit_amount=Money.zero(currency), next_billing_date=datetime.utcnow(), )
# Discount Operations
[docs] async def apply_discount( self, subscriptionkore_provider_ref: ProviderReference, discount: DiscountRequest, ) -> Subscription: if discount.coupon_code: discounts_result = await self._request( "GET", "/discounts", params={"filter[code]": discount.coupon_code}, ) discounts = discounts_result.get("data", []) if not discounts: raise ProviderAPIError( message=f"Discount code '{discount.coupon_code}' not found", provider="lemonsqueezy", status_code=404, ) discount_id = discounts[0]["id"] data = { "data": { "type": "subscriptionkores", "id": subscriptionkore_provider_ref.external_id, "relationships": { "discount": { "data": { "type": "discounts", "id": discount_id, } } }, } } result = await self._request( "PATCH", f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}", data=data, ) return self._map_subscriptionkore(result.get("data", {}), "", "") return await self.get_subscriptionkore(subscriptionkore_provider_ref)
[docs] async def remove_discount( self, subscriptionkore_provider_ref: ProviderReference, ) -> Subscription: logger.warning( "LemonSqueezy does not support removing discounts from active subscriptionkores", subscriptionkore_id=subscriptionkore_provider_ref.external_id, ) return await self.get_subscriptionkore(subscriptionkore_provider_ref)
# Billing Operations
[docs] async def get_invoice(self, provider_ref: ProviderReference) -> Invoice: result = await self._request( "GET", f"/subscriptionkore-invoices/{provider_ref.external_id}", ) return self._map_invoice(result.get("data", {}))
[docs] async def list_invoices( self, customer_provider_ref: ProviderReference, limit: int = 10, starting_after: str | None = None, ) -> list[Invoice]: subs_result = await self._request( "GET", "/subscriptionkores", params={"filter[customer_id]": customer_provider_ref.external_id}, ) all_invoices: list[Invoice] = [] for sub in subs_result.get("data", []): invoices_result = await self._request( "GET", "/subscriptionkore-invoices", params={"filter[subscriptionkore_id]": sub["id"], "page[size]": limit}, ) all_invoices.extend([self._map_invoice(inv) for inv in invoices_result.get("data", [])]) return all_invoices[:limit]
[docs] async def get_upcoming_invoice( self, subscriptionkore_provider_ref: ProviderReference, ) -> Invoice | None: return None
# Checkout / Portal
[docs] async def create_checkout_session( self, request: CheckoutRequest, plan_provider_ref: ProviderReference, customer_provider_ref: ProviderReference | None = None, ) -> CheckoutSession: data: dict[str, Any] = { "data": { "type": "checkouts", "attributes": { "checkout_data": { "custom": request.metadata, }, "product_options": { "redirect_url": request.success_url, }, }, "relationships": { "store": { "data": { "type": "stores", "id": self._config.store_id, } }, "variant": { "data": { "type": "variants", "id": plan_provider_ref.external_id, } }, }, } } if customer_provider_ref: data["data"]["attributes"]["checkout_data"]["email"] = ( customer_provider_ref.metadata.get("email") ) elif request.customer_email: data["data"]["attributes"]["checkout_data"]["email"] = request.customer_email result = await self._request("POST", "/checkouts", data=data) checkout_data = result.get("data", {}) attrs = checkout_data.get("attributes", {}) return CheckoutSession( id=str(checkout_data["id"]), url=attrs.get("url", ""), expires_at=datetime.fromisoformat( attrs.get("expires_at", datetime.utcnow().isoformat()).replace("Z", "+00:00") ), )
[docs] async def create_portal_session( self, customer_provider_ref: ProviderReference, return_url: str, ) -> PortalSession: result = await self._request( "GET", "/subscriptionkores", params={"filter[customer_id]": customer_provider_ref.external_id}, ) subscriptionkores = result.get("data", []) if not subscriptionkores: raise ProviderAPIError( message="Customer has no subscriptionkores", provider="lemonsqueezy", status_code=404, ) sub = subscriptionkores[0] urls = sub.get("attributes", {}).get("urls", {}) portal_url = urls.get("customer_portal", "") return PortalSession( id=str(sub["id"]), url=portal_url, )
# Webhook Handling
[docs] async def verify_webhook( self, payload: bytes, headers: dict[str, str], ) -> bool: """Verify LemonSqueezy webhook signature.""" signature = headers.get("x-signature", "") if not signature: return False expected_sig = hmac.new( self._config.webhook_secret.encode("utf-8"), payload, hashlib.sha256, ).hexdigest() return hmac.compare_digest(expected_sig, signature)
[docs] async def parse_webhook( self, payload: bytes, headers: dict[str, str], ) -> ProviderWebhookEvent: """Parse LemonSqueezy webhook payload.""" data = json.loads(payload) meta = data.get("meta", {}) return ProviderWebhookEvent( provider=ProviderType.LEMONSQUEEZY, event_id=meta.get("event_id", ""), event_type=meta.get("event_name", ""), occurred_at=datetime.fromisoformat( meta.get("created_at", datetime.utcnow().isoformat()).replace("Z", "+00:00") ), data=data.get("data", {}), raw_payload=payload, )
# Sync Operations
[docs] async def sync_products(self) -> list[Product]: result = await self._request( "GET", "/products", params={"filter[store_id]": self._config.store_id}, ) products = [] for prod in result.get("data", []): attrs = prod.get("attributes", {}) products.append( Product( id="", name=attrs.get("name", ""), description=attrs.get("description"), provider_refs=[ ProviderReference( provider=ProviderType.LEMONSQUEEZY, external_id=str(prod["id"]), ) ], active=attrs.get("status") == "published", metadata={}, created_at=datetime.fromisoformat( attrs.get("created_at", datetime.utcnow().isoformat()).replace( "Z", "+00:00" ) ), ) ) return products
[docs] async def sync_plans(self, product_provider_ref: ProviderReference) -> list[Plan]: result = await self._request( "GET", "/variants", params={"filter[product_id]": product_provider_ref.external_id}, ) plans = [] for variant in result.get("data", []): attrs = variant.get("attributes", {}) interval_str = attrs.get("interval", "month") interval_map = { "day": Interval.DAY, "week": Interval.WEEK, "month": Interval.MONTH, "year": Interval.YEAR, } plans.append( Plan( id="", product_id="", name=attrs.get("name", ""), description=attrs.get("description"), provider_refs=[ ProviderReference( provider=ProviderType.LEMONSQUEEZY, external_id=str(variant["id"]), ) ], price=Money( amount=Decimal(str(attrs.get("price", 0))) / 100, currency=Currency("USD"), ), billing_period=BillingPeriod( interval=interval_map.get(interval_str, Interval.MONTH), interval_count=attrs.get("interval_count", 1), ), trial_period_days=attrs.get("trial_interval_count"), active=attrs.get("status") == "published", metadata={}, created_at=datetime.fromisoformat( attrs.get("created_at", datetime.utcnow().isoformat()).replace( "Z", "+00:00" ) ), ) ) return plans
# Mapping Helpers def _map_subscriptionkore( self, data: dict[str, Any], customer_id: str, plan_id: str, ) -> Subscription: """Map LemonSqueezy subscriptionkore to domain model.""" attrs = data.get("attributes", {}) status = self.STATUS_MAP.get(attrs.get("status", "active"), SubscriptionStatus.ACTIVE) # Handle pause pause_config = None pause_data = attrs.get("pause") if pause_data: pause_config = PauseConfig( resumes_at=datetime.fromisoformat(pause_data["resumes_at"].replace("Z", "+00:00")) if pause_data.get("resumes_at") else None, behavior=PauseBehavior.VOID, ) # Handle trial trial_end = None if attrs.get("trial_ends_at"): trial_end = datetime.fromisoformat(attrs["trial_ends_at"].replace("Z", "+00:00")) # Parse dates renews_at = attrs.get("renews_at") current_period_end = None if renews_at: current_period_end = datetime.fromisoformat(renews_at.replace("Z", "+00:00")) created_at = datetime.fromisoformat( attrs.get("created_at", datetime.utcnow().isoformat()).replace("Z", "+00:00") ) return Subscription( id="", customer_id=customer_id, plan_id=plan_id, provider_ref=ProviderReference( provider=ProviderType.LEMONSQUEEZY, external_id=str(data["id"]), metadata={ "customer_id": str(attrs.get("customer_id", "")), "variant_id": str(attrs.get("variant_id", "")), }, ), status=status, current_period=DateRange( start=created_at, end=current_period_end, ), trial_end=trial_end, cancel_at_period_end=attrs.get("cancelled", False), canceled_at=datetime.fromisoformat(attrs["cancelled_at"].replace("Z", "+00:00")) if attrs.get("cancelled_at") else None, ended_at=datetime.fromisoformat(attrs["ends_at"].replace("Z", "+00:00")) if attrs.get("ends_at") else None, pause_collection=pause_config, discount=None, quantity=1, metadata={}, created_at=created_at, ) def _map_invoice(self, data: dict[str, Any]) -> Invoice: """Map LemonSqueezy subscriptionkore invoice to domain model.""" attrs = data.get("attributes", {}) currency = Currency("USD") status_map = { "pending": InvoiceStatus.OPEN, "paid": InvoiceStatus.PAID, "void": InvoiceStatus.VOID, "refunded": InvoiceStatus.VOID, } total_amount = Decimal(str(attrs.get("total", 0))) / 100 subtotal_amount = Decimal(str(attrs.get("subtotal", 0))) / 100 tax_amount = Decimal(str(attrs.get("tax", 0))) / 100 discount_amount = Decimal(str(attrs.get("discount_total", 0))) / 100 return Invoice( id="", customer_id="", subscriptionkore_id=str(attrs.get("subscriptionkore_id", "")), provider_ref=ProviderReference( provider=ProviderType.LEMONSQUEEZY, external_id=str(data.get("id", "")), ), status=status_map.get(attrs.get("status", "pending"), InvoiceStatus.OPEN), subtotal=Money(amount=subtotal_amount, currency=currency), tax=Money(amount=tax_amount, currency=currency), discount_amount=Money(amount=discount_amount, currency=currency), total=Money(amount=total_amount, currency=currency), amount_paid=Money(amount=total_amount, currency=currency) if attrs.get("status") == "paid" else Money.zero(currency), amount_due=Money(amount=total_amount, currency=currency) if attrs.get("status") != "paid" else Money.zero(currency), currency=currency, line_items=[], invoice_pdf_url=attrs.get("urls", {}).get("invoice_url"), metadata={}, created_at=datetime.fromisoformat( attrs.get("created_at", datetime.utcnow().isoformat()).replace("Z", "+00:00") ), )