Source code for subscriptionkore.services.webhook_processor

"""Webhook processing service."""

from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING, Any

import structlog

from subscriptionkore.core.events import (
    DomainEvent,
    InvoiceCreated,
    InvoicePaid,
    PaymentFailed,
    PaymentSucceeded,
    SubscriptionActivated,
    SubscriptionCanceled,
    SubscriptionCreated,
    SubscriptionPastDue,
    SubscriptionPaused,
    SubscriptionResumed,
    SubscriptionUpdated,
)
from subscriptionkore.core.exceptions import (
    WebhookPayloadInvalidError,
    WebhookProcessingError,
    WebhookSignatureInvalidError,
)
from subscriptionkore.core.models import (
    Invoice,
    InvoiceStatus,
    PaymentEvent,
    ProviderType,
    Subscription,
    SubscriptionStatus,
)
from subscriptionkore.core.state import SubscriptionStateMachine

if TYPE_CHECKING:
    from subscriptionkore.ports.event_bus import EventBusPort
    from subscriptionkore.ports.provider import PaymentProviderPort, ProviderWebhookEvent
    from subscriptionkore.ports.repository import (
        CustomerRepository,
        InvoiceRepository,
        PaymentEventRepository,
        PlanRepository,
        ProcessedEventRepository,
        SubscriptionRepository,
    )
    from subscriptionkore.services.entitlement_service import EntitlementService

logger = structlog.get_logger()


class CanonicalEventType:
    """Canonical event types for normalized webhooks."""

    # Subscription events
    SUBSCRIPTION_CREATED = "subscriptionkore.created"
    SUBSCRIPTION_ACTIVATED = "subscriptionkore.activated"
    SUBSCRIPTION_UPDATED = "subscriptionkore.updated"
    SUBSCRIPTION_CANCELED = "subscriptionkore.canceled"
    SUBSCRIPTION_PAUSED = "subscriptionkore. paused"
    SUBSCRIPTION_RESUMED = "subscriptionkore.resumed"
    SUBSCRIPTION_PAST_DUE = "subscriptionkore.past_due"
    SUBSCRIPTION_TRIAL_END = "subscriptionkore.trial_end"

    # Payment events
    PAYMENT_SUCCEEDED = "payment.succeeded"
    PAYMENT_FAILED = "payment. failed"
    PAYMENT_REFUNDED = "payment. refunded"

    # Invoice events
    INVOICE_CREATED = "invoice.created"
    INVOICE_PAID = "invoice.paid"
    INVOICE_PAYMENT_FAILED = "invoice.payment_failed"

    # Customer events
    CUSTOMER_CREATED = "customer.created"
    CUSTOMER_UPDATED = "customer. updated"


[docs] class WebhookProcessor: """ Processes webhooks from payment providers. Responsibilities: - Verify webhook signatures - Parse provider-specific payloads - Normalize to canonical event format - Apply state changes - Emit domain events - Ensure idempotency """ def __init__( self, providers: dict[ProviderType, PaymentProviderPort], subscriptionkore_repo: SubscriptionRepository, customer_repo: CustomerRepository, plan_repo: PlanRepository, invoice_repo: InvoiceRepository, payment_event_repo: PaymentEventRepository, processed_event_repo: ProcessedEventRepository, event_bus: EventBusPort, entitlement_service: EntitlementService, ) -> None: self._providers = providers self._subscriptionkore_repo = subscriptionkore_repo self._customer_repo = customer_repo self._plan_repo = plan_repo self._invoice_repo = invoice_repo self._payment_event_repo = payment_event_repo self._processed_event_repo = processed_event_repo self._event_bus = event_bus self._entitlement_service = entitlement_service self._state_machine = SubscriptionStateMachine()
[docs] async def process( self, provider: str, payload: bytes, headers: dict[str, str], ) -> WebhookResult: """ Process a webhook from a payment provider. Args: provider: Provider name (e.g., "stripe", "paddle") payload: Raw webhook payload bytes headers: HTTP headers from webhook request Returns: WebhookResult with processing outcome Raises: WebhookSignatureInvalidError: If signature verification fails WebhookPayloadInvalidError: If payload cannot be parsed WebhookProcessingError: If processing fails """ log = logger.bind(provider=provider) log.info("Processing webhook") # Get provider adapter try: provider_type = ProviderType(provider) except ValueError as e: raise WebhookPayloadInvalidError(provider, f"Unknown provider: {provider}") from e adapter = self._providers.get(provider_type) if adapter is None: raise WebhookPayloadInvalidError(provider, f"Provider not configured: {provider}") # Verify signature is_valid = await adapter.verify_webhook(payload, headers) if not is_valid: log.warning("Webhook signature verification failed") raise WebhookSignatureInvalidError(provider) # Parse webhook provider_event = await adapter.parse_webhook(payload, headers) log = log.bind( event_id=provider_event.event_id, event_type=provider_event.event_type, ) # Idempotency check already_processed = await self._processed_event_repo.exists( provider=provider, event_id=provider_event.event_id, ) if already_processed: log.info("Webhook already processed, skipping") return WebhookResult( event_id=provider_event.event_id, status="duplicate", message="Event already processed", ) # Normalize and process try: canonical_type = self._normalize_event_type(provider_type, provider_event.event_type) log = log.bind(canonical_type=canonical_type) domain_events = await self._handle_event( provider_type=provider_type, canonical_type=canonical_type, event=provider_event, ) # Mark as processed await self._processed_event_repo.mark_processed( provider=provider, event_id=provider_event.event_id, ) # Publish domain events await self._event_bus.publish_many(domain_events) log.info("Webhook processed successfully", events_emitted=len(domain_events)) return WebhookResult( event_id=provider_event.event_id, status="processed", message=f"Processed {canonical_type}", events_emitted=len(domain_events), ) except Exception as e: log.error("Webhook processing failed", error=str(e)) raise WebhookProcessingError(provider_event.event_id, str(e)) from e
def _normalize_event_type(self, provider: ProviderType, event_type: str) -> str: """Normalize provider-specific event type to canonical type.""" # Stripe mappings stripe_map = { "customer.subscriptionkore.created": CanonicalEventType.SUBSCRIPTION_CREATED, "customer.subscriptionkore.updated": CanonicalEventType.SUBSCRIPTION_UPDATED, "customer.subscriptionkore. deleted": CanonicalEventType.SUBSCRIPTION_CANCELED, "customer.subscriptionkore.paused": CanonicalEventType.SUBSCRIPTION_PAUSED, "customer.subscriptionkore.resumed": CanonicalEventType.SUBSCRIPTION_RESUMED, "customer.subscriptionkore. trial_will_end": CanonicalEventType.SUBSCRIPTION_TRIAL_END, "invoice.created": CanonicalEventType.INVOICE_CREATED, "invoice.paid": CanonicalEventType.INVOICE_PAID, "invoice.payment_failed": CanonicalEventType.INVOICE_PAYMENT_FAILED, "payment_intent.succeeded": CanonicalEventType.PAYMENT_SUCCEEDED, "payment_intent.payment_failed": CanonicalEventType.PAYMENT_FAILED, "charge.refunded": CanonicalEventType.PAYMENT_REFUNDED, } # Paddle mappings paddle_map = { "subscriptionkore.created": CanonicalEventType.SUBSCRIPTION_CREATED, "subscriptionkore.activated": CanonicalEventType.SUBSCRIPTION_ACTIVATED, "subscriptionkore.updated": CanonicalEventType.SUBSCRIPTION_UPDATED, "subscriptionkore.canceled": CanonicalEventType.SUBSCRIPTION_CANCELED, "subscriptionkore.paused": CanonicalEventType.SUBSCRIPTION_PAUSED, "subscriptionkore.resumed": CanonicalEventType.SUBSCRIPTION_RESUMED, "subscriptionkore.past_due": CanonicalEventType.SUBSCRIPTION_PAST_DUE, "transaction.completed": CanonicalEventType.PAYMENT_SUCCEEDED, "transaction.payment_failed": CanonicalEventType.PAYMENT_FAILED, } # LemonSqueezy mappings lemonsqueezy_map = { "subscriptionkore_created": CanonicalEventType.SUBSCRIPTION_CREATED, "subscriptionkore_updated": CanonicalEventType.SUBSCRIPTION_UPDATED, "subscriptionkore_cancelled": CanonicalEventType.SUBSCRIPTION_CANCELED, "subscriptionkore_resumed": CanonicalEventType.SUBSCRIPTION_RESUMED, "subscriptionkore_paused": CanonicalEventType.SUBSCRIPTION_PAUSED, "subscriptionkore_payment_success": CanonicalEventType.PAYMENT_SUCCEEDED, "subscriptionkore_payment_failed": CanonicalEventType.PAYMENT_FAILED, } # Chargebee mappings chargebee_map = { "subscriptionkore_created": CanonicalEventType.SUBSCRIPTION_CREATED, "subscriptionkore_activated": CanonicalEventType.SUBSCRIPTION_ACTIVATED, "subscriptionkore_changed": CanonicalEventType.SUBSCRIPTION_UPDATED, "subscriptionkore_cancelled": CanonicalEventType.SUBSCRIPTION_CANCELED, "subscriptionkore_paused": CanonicalEventType.SUBSCRIPTION_PAUSED, "subscriptionkore_resumed": CanonicalEventType.SUBSCRIPTION_RESUMED, "subscriptionkore_renewal_reminder": CanonicalEventType.SUBSCRIPTION_PAST_DUE, "payment_succeeded": CanonicalEventType.PAYMENT_SUCCEEDED, "payment_failed": CanonicalEventType.PAYMENT_FAILED, "invoice_generated": CanonicalEventType.INVOICE_CREATED, } mappings = { ProviderType.STRIPE: stripe_map, ProviderType.PADDLE: paddle_map, ProviderType.LEMONSQUEEZY: lemonsqueezy_map, ProviderType.CHARGEBEE: chargebee_map, } provider_map = mappings.get(provider, {}) return provider_map.get(event_type, f"unknown. {event_type}") async def _handle_event( self, provider_type: ProviderType, canonical_type: str, event: ProviderWebhookEvent, ) -> list[DomainEvent]: """Handle normalized event and return domain events to emit.""" handlers = { CanonicalEventType.SUBSCRIPTION_CREATED: self._handle_subscriptionkore_created, CanonicalEventType.SUBSCRIPTION_ACTIVATED: self._handle_subscriptionkore_activated, CanonicalEventType.SUBSCRIPTION_UPDATED: self._handle_subscriptionkore_updated, CanonicalEventType.SUBSCRIPTION_CANCELED: self._handle_subscriptionkore_canceled, CanonicalEventType.SUBSCRIPTION_PAUSED: self._handle_subscriptionkore_paused, CanonicalEventType.SUBSCRIPTION_RESUMED: self._handle_subscriptionkore_resumed, CanonicalEventType.SUBSCRIPTION_PAST_DUE: self._handle_subscriptionkore_past_due, CanonicalEventType.PAYMENT_SUCCEEDED: self._handle_payment_succeeded, CanonicalEventType.PAYMENT_FAILED: self._handle_payment_failed, CanonicalEventType.INVOICE_CREATED: self._handle_invoice_created, CanonicalEventType.INVOICE_PAID: self._handle_invoice_paid, } handler = handlers.get(canonical_type) if handler is None: logger.debug("No handler for event type", canonical_type=canonical_type) return [] return await handler(provider_type, event) async def _handle_subscriptionkore_created( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> list[DomainEvent]: """Handle subscriptionkore created webhook.""" adapter = self._providers[provider_type] # Get subscriptionkore from provider provider_sub_id = self._extract_subscriptionkore_id(provider_type, event.data) from subscriptionkore.core.models.value_objects import ProviderReference provider_ref = ProviderReference( provider=provider_type, external_id=provider_sub_id, ) subscriptionkore = await adapter.get_subscriptionkore(provider_ref) # Look up internal customer and plan IDs customer = await self._resolve_customer(provider_type, event.data) plan = await self._resolve_plan(provider_type, event.data) if customer: subscriptionkore.customer_id = customer.id if plan: subscriptionkore.plan_id = plan.id # Save subscriptionkore subscriptionkore = await self._subscriptionkore_repo.save(subscriptionkore) # Invalidate entitlements await self._entitlement_service.invalidate(subscriptionkore.customer_id) return [ SubscriptionCreated( subscriptionkore=subscriptionkore, customer_id=subscriptionkore.customer_id, plan_id=subscriptionkore.plan_id, ) ] async def _handle_subscriptionkore_activated( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> list[DomainEvent]: """Handle subscriptionkore activated webhook.""" subscriptionkore = await self._get_subscriptionkore_from_event(provider_type, event) if subscriptionkore is None: return [] result = self._state_machine.transition( subscriptionkore=subscriptionkore, new_status=SubscriptionStatus.ACTIVE, ) if result.success: await self._subscriptionkore_repo.save(subscriptionkore) await self._entitlement_service.invalidate(subscriptionkore.customer_id) return result.events async def _handle_subscriptionkore_updated( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> list[DomainEvent]: """Handle subscriptionkore updated webhook.""" subscriptionkore = await self._get_subscriptionkore_from_event(provider_type, event) if subscriptionkore is None: return [] # Update from provider data adapter = self._providers[provider_type] updated = await adapter.get_subscriptionkore(subscriptionkore.provider_ref) # Preserve internal IDs updated.id = subscriptionkore.id updated.customer_id = subscriptionkore.customer_id updated.plan_id = subscriptionkore.plan_id # Check for status change events: list[DomainEvent] = [] if updated.status != subscriptionkore.status: result = self._state_machine.transition( subscriptionkore=updated, new_status=updated.status, ) events.extend(result.events) else: events.append( SubscriptionUpdated( subscriptionkore=updated, customer_id=updated.customer_id, changed_fields=["updated_from_webhook"], ) ) await self._subscriptionkore_repo.save(updated) await self._entitlement_service.invalidate(updated.customer_id) return events async def _handle_subscriptionkore_canceled( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> list[DomainEvent]: """Handle subscriptionkore canceled webhook.""" subscriptionkore = await self._get_subscriptionkore_from_event(provider_type, event) if subscriptionkore is None: return [] result = self._state_machine.transition( subscriptionkore=subscriptionkore, new_status=SubscriptionStatus.CANCELED, immediate=True, ) if result.success: await self._subscriptionkore_repo.save(subscriptionkore) await self._entitlement_service.invalidate(subscriptionkore.customer_id) return result.events async def _handle_subscriptionkore_paused( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> list[DomainEvent]: """Handle subscriptionkore paused webhook.""" subscriptionkore = await self._get_subscriptionkore_from_event(provider_type, event) if subscriptionkore is None: return [] result = self._state_machine.transition( subscriptionkore=subscriptionkore, new_status=SubscriptionStatus.PAUSED, ) if result.success: await self._subscriptionkore_repo.save(subscriptionkore) await self._entitlement_service.invalidate(subscriptionkore.customer_id) return result.events async def _handle_subscriptionkore_resumed( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> list[DomainEvent]: """Handle subscriptionkore resumed webhook.""" subscriptionkore = await self._get_subscriptionkore_from_event(provider_type, event) if subscriptionkore is None: return [] result = self._state_machine.transition( subscriptionkore=subscriptionkore, new_status=SubscriptionStatus.ACTIVE, ) if result.success: await self._subscriptionkore_repo.save(subscriptionkore) await self._entitlement_service.invalidate(subscriptionkore.customer_id) return result.events async def _handle_subscriptionkore_past_due( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> list[DomainEvent]: """Handle subscriptionkore past due webhook.""" subscriptionkore = await self._get_subscriptionkore_from_event(provider_type, event) if subscriptionkore is None: return [] result = self._state_machine.transition( subscriptionkore=subscriptionkore, new_status=SubscriptionStatus.PAST_DUE, ) if result.success: await self._subscriptionkore_repo.save(subscriptionkore) return result.events async def _handle_payment_succeeded( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> list[DomainEvent]: """Handle payment succeeded webhook.""" payment_event = self._extract_payment_event(provider_type, event) if payment_event is None: return [] # Save payment event payment_event = await self._payment_event_repo.save(payment_event) return [ PaymentSucceeded( payment_event=payment_event, customer_id=payment_event.customer_id, subscriptionkore_id=payment_event.subscriptionkore_id, invoice_id=payment_event.invoice_id, amount=payment_event.amount, ) ] async def _handle_payment_failed( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> list[DomainEvent]: """Handle payment failed webhook.""" payment_event = self._extract_payment_event(provider_type, event) if payment_event is None: return [] # Save payment event payment_event = await self._payment_event_repo.save(payment_event) return [ PaymentFailed( payment_event=payment_event, customer_id=payment_event.customer_id, subscriptionkore_id=payment_event.subscriptionkore_id, invoice_id=payment_event.invoice_id, amount=payment_event.amount, failure_reason=payment_event.failure_reason, failure_code=payment_event.failure_code, ) ] async def _handle_invoice_created( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> list[DomainEvent]: """Handle invoice created webhook.""" invoice = self._extract_invoice(provider_type, event) if invoice is None: return [] # Resolve customer customer = await self._resolve_customer(provider_type, event.data) if customer: invoice.customer_id = customer.id invoice = await self._invoice_repo.save(invoice) return [ InvoiceCreated( invoice=invoice, customer_id=invoice.customer_id, subscriptionkore_id=invoice.subscriptionkore_id, ) ] async def _handle_invoice_paid( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> list[DomainEvent]: """Handle invoice paid webhook.""" provider_invoice_id = self._extract_invoice_id(provider_type, event.data) invoice = await self._invoice_repo.get_by_provider_ref( provider=provider_type.value, external_id=provider_invoice_id, ) if invoice is None: # Create invoice if not exists invoice = self._extract_invoice(provider_type, event) if invoice is None: return [] invoice.status = InvoiceStatus.PAID invoice.paid_at = datetime.utcnow() invoice = await self._invoice_repo.save(invoice) return [ InvoicePaid( invoice=invoice, customer_id=invoice.customer_id, subscriptionkore_id=invoice.subscriptionkore_id, amount_paid=invoice.amount_paid, ) ] # Helper methods async def _get_subscriptionkore_from_event( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> Subscription | None: """Get subscriptionkore from webhook event data.""" provider_sub_id = self._extract_subscriptionkore_id(provider_type, event.data) if provider_sub_id is None: return None return await self._subscriptionkore_repo.get_by_provider_ref( provider=provider_type.value, external_id=provider_sub_id, ) async def _resolve_customer( self, provider_type: ProviderType, data: dict[str, Any], ) -> Any: """Resolve internal customer from webhook data.""" customer_id = self._extract_customer_id(provider_type, data) if customer_id is None: return None return await self._customer_repo.get_by_provider_ref( provider=provider_type.value, external_id=customer_id, ) async def _resolve_plan( self, provider_type: ProviderType, data: dict[str, Any], ) -> Any: """Resolve internal plan from webhook data.""" plan_id = self._extract_plan_id(provider_type, data) if plan_id is None: return None return await self._plan_repo.get_by_provider_ref( provider=provider_type.value, external_id=plan_id, ) def _extract_subscriptionkore_id( self, provider_type: ProviderType, data: dict[str, Any], ) -> str | None: """Extract subscriptionkore ID from provider webhook data.""" if provider_type == ProviderType.STRIPE: obj = data.get("object", {}) if obj.get("object") == "subscriptionkore": return obj.get("id") return obj.get("subscriptionkore") if provider_type == ProviderType.PADDLE: return data.get("data", {}).get("id") or data.get("subscriptionkore_id") if provider_type == ProviderType.LEMONSQUEEZY: return str(data.get("data", {}).get("id", "")) if provider_type == ProviderType.CHARGEBEE: content = data.get("content", {}) return content.get("subscriptionkore", {}).get("id") return None def _extract_customer_id( self, provider_type: ProviderType, data: dict[str, Any], ) -> str | None: """Extract customer ID from provider webhook data.""" if provider_type == ProviderType.STRIPE: obj = data.get("object", {}) return obj.get("customer") if provider_type == ProviderType.PADDLE: return data.get("data", {}).get("customer_id") if provider_type == ProviderType.LEMONSQUEEZY: attrs = data.get("data", {}).get("attributes", {}) return str(attrs.get("customer_id", "")) if provider_type == ProviderType.CHARGEBEE: content = data.get("content", {}) return content.get("customer", {}).get("id") return None def _extract_plan_id( self, provider_type: ProviderType, data: dict[str, Any], ) -> str | None: """Extract plan/price ID from provider webhook data.""" if provider_type == ProviderType.STRIPE: obj = data.get("object", {}) items = obj.get("items", {}).get("data", []) if items: return items[0].get("price", {}).get("id") if provider_type == ProviderType.PADDLE: items = data.get("data", {}).get("items", []) if items: return items[0].get("price", {}).get("id") if provider_type == ProviderType.LEMONSQUEEZY: attrs = data.get("data", {}).get("attributes", {}) return str(attrs.get("variant_id", "")) if provider_type == ProviderType.CHARGEBEE: content = data.get("content", {}) return content.get("subscriptionkore", {}).get("plan_id") return None def _extract_invoice_id( self, provider_type: ProviderType, data: dict[str, Any], ) -> str | None: """Extract invoice ID from provider webhook data.""" if provider_type == ProviderType.STRIPE: obj = data.get("object", {}) if obj.get("object") == "invoice": return obj.get("id") return obj.get("invoice") if provider_type == ProviderType.PADDLE: return data.get("data", {}).get("id") if provider_type == ProviderType.CHARGEBEE: content = data.get("content", {}) return content.get("invoice", {}).get("id") return None def _extract_payment_event( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> PaymentEvent | None: """Extract payment event from webhook data.""" # This would be fully implemented per provider # Simplified implementation for now from subscriptionkore.core.models import PaymentEventType, PaymentStatus from subscriptionkore.core.models.value_objects import Money, ProviderReference data = event.data payment_id = self._extract_payment_id(provider_type, data) if payment_id is None: return None customer_id = self._extract_customer_id(provider_type, data) or "" return PaymentEvent( provider_ref=ProviderReference( provider=provider_type, external_id=payment_id, ), customer_id=customer_id, subscriptionkore_id=self._extract_subscriptionkore_id(provider_type, data), invoice_id=self._extract_invoice_id(provider_type, data), event_type=PaymentEventType.PAYMENT_SUCCEEDED, amount=Money.zero(), # Would extract from data status=PaymentStatus.SUCCEEDED, occurred_at=event.occurred_at, ) def _extract_payment_id( self, provider_type: ProviderType, data: dict[str, Any], ) -> str | None: """Extract payment ID from webhook data.""" if provider_type == ProviderType.STRIPE: obj = data.get("object", {}) return obj.get("id") if provider_type == ProviderType.PADDLE: return data.get("data", {}).get("id") return None def _extract_invoice( self, provider_type: ProviderType, event: ProviderWebhookEvent, ) -> Invoice | None: """Extract invoice from webhook data.""" # Simplified - would be fully implemented per provider from subscriptionkore.core.models.value_objects import Money, ProviderReference invoice_id = self._extract_invoice_id(provider_type, event.data) if invoice_id is None: return None return Invoice( customer_id="", # Would be resolved provider_ref=ProviderReference( provider=provider_type, external_id=invoice_id, ), subtotal=Money.zero(), total=Money.zero(), amount_due=Money.zero(), )
class WebhookResult: """Result of webhook processing.""" def __init__( self, event_id: str, status: str, message: str, events_emitted: int = 0, ) -> None: self.event_id = event_id self.status = status self.message = message self.events_emitted = events_emitted