Source code for subscriptionkore.services.customer_manager

"""Customer management service."""

from __future__ import annotations

from typing import TYPE_CHECKING

import structlog

from subscriptionkore.core.events import CustomerCreated, CustomerUpdated
from subscriptionkore.core.exceptions import DuplicateEntityError, EntityNotFoundError
from subscriptionkore.core.models import Customer, ProviderType
from subscriptionkore.core.models.customer import Address, TaxInfo

if TYPE_CHECKING:
    from subscriptionkore.ports.event_bus import EventBusPort
    from subscriptionkore.ports.provider import PaymentProviderPort
    from subscriptionkore.ports.repository import CustomerRepository

logger = structlog.get_logger()


[docs] class CustomerManager: """ Manages customer lifecycle operations. Provides a unified interface for customer operations across providers. """ def __init__( self, customer_repo: CustomerRepository, provider: PaymentProviderPort, event_bus: EventBusPort, ) -> None: self._customer_repo = customer_repo self._provider = provider self._event_bus = event_bus
[docs] async def create( self, external_id: str, email: str, name: str | None = None, tax_info: TaxInfo | None = None, billing_address: Address | None = None, metadata: dict | None = None, sync_to_provider: bool = True, ) -> Customer: """ Create a new customer. Args: external_id: Your application's user ID email: Customer email name: Customer name tax_info: Tax information billing_address: Billing address metadata: Additional metadata sync_to_provider: Whether to create in payment provider Returns: Created customer Raises: DuplicateEntityError: If customer with external_id exists """ log = logger.bind(external_id=external_id, email=email) log.info("Creating customer") # Check for existing customer existing = await self._customer_repo.get_by_external_id(external_id) if existing is not None: raise DuplicateEntityError("Customer", external_id) # Create customer model customer = Customer( external_id=external_id, email=email, name=name, tax_info=tax_info, billing_address=billing_address, metadata=metadata or {}, ) # Sync to provider if requested if sync_to_provider: provider_ref = await self._provider.create_customer(customer) customer.add_provider_ref(provider_ref) # Save to repository customer = await self._customer_repo.save(customer) # Emit event await self._event_bus.publish(CustomerCreated(customer=customer)) log.info("Customer created", customer_id=customer.id) return customer
[docs] async def get(self, customer_id: str) -> Customer: """Get customer by internal ID.""" customer = await self._customer_repo.get(customer_id) if customer is None: raise EntityNotFoundError("Customer", customer_id) return customer
[docs] async def get_by_external_id(self, external_id: str) -> Customer: """Get customer by external (application) ID.""" customer = await self._customer_repo.get_by_external_id(external_id) if customer is None: raise EntityNotFoundError("Customer", external_id) return customer
[docs] async def get_or_create( self, external_id: str, email: str, name: str | None = None, sync_to_provider: bool = True, ) -> tuple[Customer, bool]: """ Get existing customer or create new one. Returns: Tuple of (customer, created) where created is True if new """ existing = await self._customer_repo.get_by_external_id(external_id) if existing is not None: return existing, False customer = await self.create( external_id=external_id, email=email, name=name, sync_to_provider=sync_to_provider, ) return customer, True
[docs] async def update( self, customer_id: str, email: str | None = None, name: str | None = None, tax_info: TaxInfo | None = None, billing_address: Address | None = None, metadata: dict | None = None, sync_to_provider: bool = True, ) -> Customer: """Update customer information.""" log = logger.bind(customer_id=customer_id) log.info("Updating customer") customer = await self.get(customer_id) # Track changed fields changed_fields: list[str] = [] if email is not None and customer.email != email: changed_fields.append("email") if name is not None and customer.name != name: changed_fields.append("name") if tax_info is not None: changed_fields.append("tax_info") if billing_address is not None: changed_fields.append("billing_address") if metadata is not None: changed_fields.append("metadata") # Update local model customer.update( email=email, name=name, tax_info=tax_info, billing_address=billing_address, metadata=metadata, ) # Sync to provider if sync_to_provider and customer.provider_refs: await self._provider.update_customer(customer) # Save to repository customer = await self._customer_repo.save(customer) # Emit event if changed if changed_fields: await self._event_bus.publish( CustomerUpdated( customer=customer, changed_fields=changed_fields, ) ) log.info("Customer updated", changed_fields=changed_fields) return customer
[docs] async def delete( self, customer_id: str, delete_from_provider: bool = True, ) -> bool: """ Delete a customer. Args: customer_id: Customer ID delete_from_provider: Whether to delete from payment provider Returns: True if deleted """ log = logger.bind(customer_id=customer_id) log.info("Deleting customer") customer = await self.get(customer_id) # Delete from provider if delete_from_provider: for ref in customer.provider_refs: await self._provider.delete_customer(ref) # Delete from repository deleted = await self._customer_repo.delete(customer_id) log.info("Customer deleted", deleted=deleted) return deleted
[docs] async def sync_to_provider( self, customer_id: str, provider: ProviderType | None = None, ) -> Customer: """ Sync customer to payment provider. Creates customer in provider if not exists, or updates if exists. """ customer = await self.get(customer_id) target_provider = provider or self._provider.provider_type existing_ref = customer.get_provider_ref(target_provider.value) if existing_ref is None: # Create in provider provider_ref = await self._provider.create_customer(customer) customer.add_provider_ref(provider_ref) else: # Update in provider await self._provider.update_customer(customer) return await self._customer_repo.save(customer)
[docs] async def list( self, limit: int = 100, offset: int = 0, ) -> list[Customer]: """List customers with pagination.""" return await self._customer_repo.list(limit=limit, offset=offset)