"""Main factory for creating SubscriptionKore instances."""
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from subscriptionkore.adapters.chargebee import ChargebeeAdapter
from subscriptionkore.adapters.lemonsqueezy import LemonSqueezyAdapter
from subscriptionkore.adapters.paddle import PaddleAdapter
from subscriptionkore.adapters.stripe import StripeAdapter
from subscriptionkore.config import SubscriptionKoreConfig
from subscriptionkore.core.models import ProviderType
from subscriptionkore.infrastructure.cache.memory import InMemoryCache
from subscriptionkore.infrastructure.cache.redis import RedisCache
from subscriptionkore.infrastructure.event_bus.memory import InMemoryEventBus
from subscriptionkore.infrastructure.repositories.sqlalchemy import (
SQLAlchemyCustomerRepository,
SQLAlchemyEntitlementOverrideRepository,
SQLAlchemyEntitlementRepository,
SQLAlchemyInvoiceRepository,
SQLAlchemyPaymentEventRepository,
SQLAlchemyPlanRepository,
SQLAlchemyProcessedEventRepository,
SQLAlchemyProductRepository,
SQLAlchemySubscriptionRepository,
)
from subscriptionkore.infrastructure.repositories.sqlalchemy.models import Base
from subscriptionkore.ports.cache import CachePort
from subscriptionkore.ports.event_bus import EventBusPort
from subscriptionkore.ports.provider import PaymentProviderPort
from subscriptionkore.services import (
CustomerManager,
EntitlementService,
SubscriptionManager,
WebhookProcessor,
)
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable
from subscriptionkore.core.events import DomainEvent
[docs]
class SubscriptionKore:
"""
Main entry point for the SubscriptionKore library.
Provides access to all subscription management functionality.
Example:
.. code-block:: python
from subscriptionkore import SubscriptionKore, SubscriptionKoreConfig
from subscriptionkore.config import StripeConfig
config = SubscriptionKoreConfig(
database_url="postgresql+asyncpg://user:pass@localhost/db",
stripe=StripeConfig(
api_key="sk_test_...",
webhook_secret="whsec_...",
),
)
async with SubscriptionKore(config) as subscriptionkore:
# Create a customer
customer = await subscriptionkore.customers.create(
external_id="user_123",
email="user@example.com",
)
# Create a subscription
subscription = await subscriptionkore.subscriptions.create(
customer_id=customer.id,
plan_id="plan_abc",
)
# Check entitlements
has_access = await subscriptionkore.entitlements.has_access(
customer_id=customer.id,
entitlement_key="premium_features",
)
"""
def __init__(self, config: SubscriptionKoreConfig) -> None:
self._config = config
self._engine: any = None
self._session_factory: async_sessionmaker[AsyncSession] | None = None
self._providers: dict[ProviderType, PaymentProviderPort] = {}
self._event_bus: InMemoryEventBus = InMemoryEventBus()
self._cache: CachePort | None = None
self._initialized = False
async def __aenter__(self) -> "SubscriptionKore":
await self.initialize()
return self
async def __aexit__(self, exc_type: any, exc_val: any, exc_tb: any) -> None:
await self.close()
[docs]
async def initialize(self) -> None:
"""Initialize all components."""
if self._initialized:
return
# Create database engine
self._engine = create_async_engine(
self._config.database_url,
echo=False,
pool_pre_ping=True,
)
# Create tables
async with self._engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# Create session factory
self._session_factory = async_sessionmaker(
self._engine,
class_=AsyncSession,
expire_on_commit=False,
)
# Initialize cache
if self._config.redis_url:
self._cache = RedisCache(self._config.redis_url)
else:
self._cache = InMemoryCache()
# Initialize all configured providers
if self._config.stripe:
self._providers[ProviderType.STRIPE] = StripeAdapter(self._config.stripe)
if self._config.paddle:
self._providers[ProviderType.PADDLE] = PaddleAdapter(self._config.paddle)
if self._config.lemonsqueezy:
self._providers[ProviderType.LEMONSQUEEZY] = LemonSqueezyAdapter(
self._config.lemonsqueezy
)
if self._config.chargebee:
self._providers[ProviderType.CHARGEBEE] = ChargebeeAdapter(self._config.chargebee)
self._initialized = True
[docs]
async def close(self) -> None:
"""Close all connections and clean up resources."""
# Close provider connections
for provider in self._providers.values():
if hasattr(provider, "close"):
await provider.close()
# Close cache
if self._cache and hasattr(self._cache, "close"):
await self._cache.close()
# Close database engine
if self._engine:
await self._engine.dispose()
self._initialized = False
def _ensure_initialized(self) -> None:
if not self._initialized:
raise RuntimeError(
"SubscriptionKore not initialized. Call initialize() first or use async context manager."
)
def _get_session(self) -> AsyncSession:
self._ensure_initialized()
if self._session_factory is None:
raise RuntimeError("Session factory not initialized")
return self._session_factory()
def _get_provider(self, provider_type: ProviderType | None = None) -> PaymentProviderPort:
self._ensure_initialized()
target = provider_type or self._config.default_provider
provider = self._providers.get(target)
if provider is None:
raise RuntimeError(f"Provider {target} not configured")
return provider
[docs]
def get_provider(self, provider_type: ProviderType) -> PaymentProviderPort:
"""Get a specific provider adapter."""
return self._get_provider(provider_type)
@property
def configured_providers(self) -> list[ProviderType]:
"""Get list of configured providers."""
return list(self._providers.keys())
@property
def event_bus(self) -> EventBusPort:
"""Access the event bus for subscribing to domain events."""
return self._event_bus
[docs]
def on_event(
self,
event_type: type["DomainEvent"],
) -> Callable[
[Callable[["DomainEvent"], Awaitable[None]]], Callable[["DomainEvent"], Awaitable[None]]
]:
"""
Decorator to subscribe to domain events.
Example:
.. code-block:: python
@subscriptionkore.on_event(SubscriptionActivated)
async def handle_activation(event: SubscriptionActivated):
await send_welcome_email(event.customer_id)
"""
def decorator(
handler: Callable[["DomainEvent"], Awaitable[None]],
) -> Callable[["DomainEvent"], Awaitable[None]]:
self._event_bus.subscribe(event_type, handler)
return handler
return decorator
[docs]
async def get_subscription_manager(
self, provider_type: ProviderType | None = None
) -> SubscriptionManager:
"""Get a subscription manager instance."""
session = self._get_session()
return SubscriptionManager(
subscription_repo=SQLAlchemySubscriptionRepository(session),
customer_repo=SQLAlchemyCustomerRepository(session),
plan_repo=SQLAlchemyPlanRepository(session),
provider=self._get_provider(provider_type),
event_bus=self._event_bus,
)
[docs]
async def get_customer_manager(
self, provider_type: ProviderType | None = None
) -> CustomerManager:
"""Get a customer manager instance."""
session = self._get_session()
return CustomerManager(
customer_repo=SQLAlchemyCustomerRepository(session),
provider=self._get_provider(provider_type),
event_bus=self._event_bus,
)
[docs]
async def get_entitlement_service(self) -> EntitlementService:
"""Get an entitlement service instance."""
session = self._get_session()
return EntitlementService(
entitlement_repo=SQLAlchemyEntitlementRepository(session),
override_repo=SQLAlchemyEntitlementOverrideRepository(session),
subscription_repo=SQLAlchemySubscriptionRepository(session),
plan_repo=SQLAlchemyPlanRepository(session),
cache=self._cache,
cache_ttl=self._config.entitlement_cache_ttl,
)
[docs]
async def get_webhook_processor(self) -> WebhookProcessor:
"""Get a webhook processor instance."""
session = self._get_session()
entitlement_service = await self.get_entitlement_service()
return WebhookProcessor(
providers=self._providers,
subscription_repo=SQLAlchemySubscriptionRepository(session),
customer_repo=SQLAlchemyCustomerRepository(session),
plan_repo=SQLAlchemyPlanRepository(session),
invoice_repo=SQLAlchemyInvoiceRepository(session),
payment_event_repo=SQLAlchemyPaymentEventRepository(session),
processed_event_repo=SQLAlchemyProcessedEventRepository(session),
event_bus=self._event_bus,
entitlement_service=entitlement_service,
)
# Convenience methods for common operations
[docs]
async def create_customer(
self,
external_id: str,
email: str,
name: str | None = None,
provider_type: ProviderType | None = None,
) -> "Customer":
"""Convenience method to create a customer."""
from subscriptionkore.core.models import Customer
manager = await self.get_customer_manager(provider_type)
return await manager.create(external_id=external_id, email=email, name=name)
[docs]
async def create_subscription(
self,
customer_id: str,
plan_id: str,
quantity: int = 1,
trial_period_days: int | None = None,
provider_type: ProviderType | None = None,
) -> "Subscription":
"""Convenience method to create a subscription."""
from subscriptionkore.core.models import Subscription
manager = await self.get_subscription_manager(provider_type)
return await manager.create(
customer_id=customer_id,
plan_id=plan_id,
quantity=quantity,
trial_period_days=trial_period_days,
)
[docs]
async def check_entitlement(
self,
customer_id: str,
entitlement_key: str,
) -> "CustomerEntitlement":
"""Convenience method to check an entitlement."""
from subscriptionkore.core.models import CustomerEntitlement
service = await self.get_entitlement_service()
return await service.check(customer_id, entitlement_key)
[docs]
async def has_access(
self,
customer_id: str,
entitlement_key: str,
) -> bool:
"""Convenience method to check feature access."""
service = await self.get_entitlement_service()
return await service.has_access(customer_id, entitlement_key)