"""Subscription management service."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
import structlog
from subscriptionkore.core.events import (
SubscriptionCreated,
SubscriptionPlanChanged,
SubscriptionUpdated,
)
from subscriptionkore.core.exceptions import EntityNotFoundError, InvalidStateTransitionError
from subscriptionkore.core.models import (
Plan,
ProviderReference,
Subscription,
SubscriptionStatus,
)
from subscriptionkore.core.state import SubscriptionStateMachine
from subscriptionkore.ports.provider import (
ChangePlanRequest,
ChangePreview,
CreateSubscriptionRequest,
DiscountRequest,
PaymentProviderPort,
ProrationBehavior,
UpdateSubscriptionRequest,
)
if TYPE_CHECKING:
from subscriptionkore.ports.event_bus import EventBusPort
from subscriptionkore.ports.repository import (
CustomerRepository,
PlanRepository,
SubscriptionRepository,
)
logger = structlog.get_logger()
[docs]
class SubscriptionManager:
"""
Manages subscriptionkore lifecycle operations.
Provides a unified interface for subscriptionkore operations across providers.
"""
def __init__(
self,
subscriptionkore_repo: SubscriptionRepository,
customer_repo: CustomerRepository,
plan_repo: PlanRepository,
provider: PaymentProviderPort,
event_bus: EventBusPort,
) -> None:
self._subscriptionkore_repo = subscriptionkore_repo
self._customer_repo = customer_repo
self._plan_repo = plan_repo
self._provider = provider
self._event_bus = event_bus
self._state_machine = SubscriptionStateMachine()
[docs]
async def create(
self,
customer_id: str,
plan_id: str,
quantity: int = 1,
trial_period_days: int | None = None,
coupon_code: str | None = None,
metadata: dict | None = None,
) -> Subscription:
"""
Create a new subscriptionkore.
Args:
customer_id: Internal customer ID
plan_id: Internal plan ID
quantity: Subscription quantity
trial_period_days: Override trial period (None uses plan default)
coupon_code: Optional coupon code to apply
metadata: Additional metadata
Returns:
Created subscriptionkore
Raises:
EntityNotFoundError: If customer or plan not found
"""
log = logger.bind(customer_id=customer_id, plan_id=plan_id)
log.info("Creating subscriptionkore")
# Get customer and plan
customer = await self._customer_repo.get(customer_id)
if customer is None:
raise EntityNotFoundError("Customer", customer_id)
plan = await self._plan_repo.get(plan_id)
if plan is None:
raise EntityNotFoundError("Plan", plan_id)
# Get provider references
customer_ref = customer.get_provider_ref(self._provider.provider_type.value)
if customer_ref is None:
raise EntityNotFoundError(
"CustomerProviderRef",
f"{customer_id}:{self._provider.provider_type.value}",
)
plan_ref = plan.get_provider_ref(self._provider.provider_type.value)
if plan_ref is None:
raise EntityNotFoundError(
"PlanProviderRef",
f"{plan_id}:{self._provider.provider_type.value}",
)
# Determine trial period
effective_trial = trial_period_days
if effective_trial is None and plan.trial_period_days:
effective_trial = plan.trial_period_days
# Create subscriptionkore in provider
request = CreateSubscriptionRequest(
customer_id=customer_id,
plan_id=plan_id,
quantity=quantity,
trial_period_days=effective_trial,
coupon_code=coupon_code,
metadata=metadata or {},
)
subscriptionkore = await self._provider.create_subscriptionkore(
request=request,
customer_provider_ref=customer_ref,
plan_provider_ref=plan_ref,
)
# Set internal IDs
subscriptionkore.customer_id = customer_id
subscriptionkore.plan_id = plan_id
# Save to repository
subscriptionkore = await self._subscriptionkore_repo.save(subscriptionkore)
# Emit event
await self._event_bus.publish(
SubscriptionCreated(
subscriptionkore=subscriptionkore,
customer_id=customer_id,
plan_id=plan_id,
)
)
log.info(
"Subscription created",
subscriptionkore_id=subscriptionkore.id,
status=subscriptionkore.status,
)
return subscriptionkore
[docs]
async def get(self, subscriptionkore_id: str) -> Subscription:
"""Get subscriptionkore by ID."""
subscriptionkore = await self._subscriptionkore_repo.get(subscriptionkore_id)
if subscriptionkore is None:
raise EntityNotFoundError("Subscription", subscriptionkore_id)
return subscriptionkore
[docs]
async def get_by_customer(
self,
customer_id: str,
include_canceled: bool = False,
) -> list[Subscription]:
"""Get all subscriptionkores for a customer."""
return await self._subscriptionkore_repo.list_by_customer(
customer_id=customer_id,
include_canceled=include_canceled,
)
[docs]
async def get_active_by_customer(self, customer_id: str) -> list[Subscription]:
"""Get active subscriptionkores for a customer."""
return await self._subscriptionkore_repo.list_active_by_customer(customer_id)
[docs]
async def update(
self,
subscriptionkore_id: str,
quantity: int | None = None,
metadata: dict | None = None,
) -> Subscription:
"""Update subscriptionkore quantity or metadata."""
log = logger.bind(subscriptionkore_id=subscriptionkore_id)
log.info("Updating subscriptionkore")
subscriptionkore = await self.get(subscriptionkore_id)
request = UpdateSubscriptionRequest(
subscriptionkore_id=subscriptionkore_id,
quantity=quantity,
metadata=metadata,
)
updated = await self._provider.update_subscriptionkore(
request=request,
subscriptionkore_provider_ref=subscriptionkore.provider_ref,
)
# Preserve internal IDs
updated.id = subscriptionkore.id
updated.customer_id = subscriptionkore.customer_id
updated.plan_id = subscriptionkore.plan_id
# Determine changed fields
changed_fields: list[str] = []
if quantity is not None and subscriptionkore.quantity != quantity:
changed_fields.append("quantity")
if metadata is not None:
changed_fields.append("metadata")
updated = await self._subscriptionkore_repo.save(updated)
if changed_fields:
await self._event_bus.publish(
SubscriptionUpdated(
subscriptionkore=updated,
customer_id=updated.customer_id,
changed_fields=changed_fields,
)
)
log.info("Subscription updated", changed_fields=changed_fields)
return updated
[docs]
async def cancel(
self,
subscriptionkore_id: str,
immediate: bool = False,
reason: str | None = None,
) -> Subscription:
"""
Cancel a subscriptionkore.
Args:
subscriptionkore_id: Subscription ID
immediate: If True, cancel immediately. If False, cancel at period end.
reason: Optional cancellation reason
Returns:
Updated subscriptionkore
"""
log = logger.bind(subscriptionkore_id=subscriptionkore_id, immediate=immediate)
log.info("Canceling subscriptionkore")
subscriptionkore = await self.get(subscriptionkore_id)
# Cancel in provider
updated = await self._provider.cancel_subscriptionkore(
subscriptionkore_provider_ref=subscriptionkore.provider_ref,
immediate=immediate,
)
# Apply state transition
if immediate:
target_status = SubscriptionStatus.CANCELED
else:
target_status = subscriptionkore.status # Stay in current status
updated.cancel_at_period_end = True
updated.canceled_at = datetime.utcnow()
result = self._state_machine.transition(
subscriptionkore=updated,
new_status=target_status,
reason=reason,
immediate=immediate,
)
if not result.success and result.error:
raise result.error
# Preserve internal IDs
updated.id = subscriptionkore.id
updated.customer_id = subscriptionkore.customer_id
updated.plan_id = subscriptionkore.plan_id
updated = await self._subscriptionkore_repo.save(updated)
# Publish events
await self._event_bus.publish_many(result.events)
log.info(
"Subscription canceled",
status=updated.status,
cancel_at_period_end=updated.cancel_at_period_end,
)
return updated
[docs]
async def pause(
self,
subscriptionkore_id: str,
resumes_at: datetime | None = None,
) -> Subscription:
"""Pause a subscriptionkore."""
log = logger.bind(subscriptionkore_id=subscriptionkore_id)
log.info("Pausing subscriptionkore")
subscriptionkore = await self.get(subscriptionkore_id)
if not self._provider.capabilities.supports_pausing:
raise InvalidStateTransitionError(
from_state=subscriptionkore.status,
to_state=SubscriptionStatus.PAUSED,
reason=f"Provider {self._provider.provider_type} does not support pausing",
)
# Pause in provider
updated = await self._provider.pause_subscriptionkore(
subscriptionkore_provider_ref=subscriptionkore.provider_ref,
resumes_at=resumes_at,
)
# Apply state transition
result = self._state_machine.transition(
subscriptionkore=updated,
new_status=SubscriptionStatus.PAUSED,
)
if not result.success and result.error:
raise result.error
# Preserve internal IDs
updated.id = subscriptionkore.id
updated.customer_id = subscriptionkore.customer_id
updated.plan_id = subscriptionkore.plan_id
updated = await self._subscriptionkore_repo.save(updated)
await self._event_bus.publish_many(result.events)
log.info("Subscription paused", resumes_at=resumes_at)
return updated
[docs]
async def resume(self, subscriptionkore_id: str) -> Subscription:
"""Resume a paused subscriptionkore."""
log = logger.bind(subscriptionkore_id=subscriptionkore_id)
log.info("Resuming subscriptionkore")
subscriptionkore = await self.get(subscriptionkore_id)
if subscriptionkore.status != SubscriptionStatus.PAUSED:
raise InvalidStateTransitionError(
from_state=subscriptionkore.status,
to_state=SubscriptionStatus.ACTIVE,
reason="Can only resume paused subscriptionkores",
)
# Resume in provider
updated = await self._provider.resume_subscriptionkore(
subscriptionkore_provider_ref=subscriptionkore.provider_ref,
)
# Apply state transition
result = self._state_machine.transition(
subscriptionkore=updated,
new_status=SubscriptionStatus.ACTIVE,
)
if not result.success and result.error:
raise result.error
# Preserve internal IDs
updated.id = subscriptionkore.id
updated.customer_id = subscriptionkore.customer_id
updated.plan_id = subscriptionkore.plan_id
updated = await self._subscriptionkore_repo.save(updated)
await self._event_bus.publish_many(result.events)
log.info("Subscription resumed")
return updated
[docs]
async def change_plan(
self,
subscriptionkore_id: str,
new_plan_id: str,
proration_behavior: ProrationBehavior = ProrationBehavior.CREATE_PRORATIONS,
) -> Subscription:
"""
Change subscriptionkore plan (upgrade/downgrade).
Args:
subscriptionkore_id: Subscription ID
new_plan_id: New plan ID
proration_behavior: How to handle proration
Returns:
Updated subscriptionkore
"""
log = logger.bind(subscriptionkore_id=subscriptionkore_id, new_plan_id=new_plan_id)
log.info("Changing subscriptionkore plan")
subscriptionkore = await self.get(subscriptionkore_id)
old_plan_id = subscriptionkore.plan_id
# Get new plan
new_plan = await self._plan_repo.get(new_plan_id)
if new_plan is None:
raise EntityNotFoundError("Plan", new_plan_id)
new_plan_ref = new_plan.get_provider_ref(self._provider.provider_type.value)
if new_plan_ref is None:
raise EntityNotFoundError(
"PlanProviderRef",
f"{new_plan_id}:{self._provider.provider_type.value}",
)
# Get old plan for comparison
old_plan = await self._plan_repo.get(old_plan_id)
is_upgrade = new_plan.is_upgrade_from(old_plan) if old_plan else False
# Change plan in provider
request = ChangePlanRequest(
subscriptionkore_id=subscriptionkore_id,
new_plan_id=new_plan_id,
proration_behavior=proration_behavior,
)
updated = await self._provider.change_plan(
request=request,
subscriptionkore_provider_ref=subscriptionkore.provider_ref,
new_plan_provider_ref=new_plan_ref,
)
# Preserve internal IDs and update plan
updated.id = subscriptionkore.id
updated.customer_id = subscriptionkore.customer_id
updated.plan_id = new_plan_id
updated = await self._subscriptionkore_repo.save(updated)
# Emit event
await self._event_bus.publish(
SubscriptionPlanChanged(
subscriptionkore=updated,
customer_id=updated.customer_id,
previous_plan_id=old_plan_id,
new_plan_id=new_plan_id,
is_upgrade=is_upgrade,
)
)
log.info(
"Subscription plan changed",
old_plan_id=old_plan_id,
is_upgrade=is_upgrade,
)
return updated
[docs]
async def preview_plan_change(
self,
subscriptionkore_id: str,
new_plan_id: str,
proration_behavior: ProrationBehavior = ProrationBehavior.CREATE_PRORATIONS,
) -> ChangePreview:
"""Preview costs for a plan change."""
subscriptionkore = await self.get(subscriptionkore_id)
new_plan = await self._plan_repo.get(new_plan_id)
if new_plan is None:
raise EntityNotFoundError("Plan", new_plan_id)
new_plan_ref = new_plan.get_provider_ref(self._provider.provider_type.value)
if new_plan_ref is None:
raise EntityNotFoundError(
"PlanProviderRef",
f"{new_plan_id}:{self._provider.provider_type.value}",
)
request = ChangePlanRequest(
subscriptionkore_id=subscriptionkore_id,
new_plan_id=new_plan_id,
proration_behavior=proration_behavior,
)
return await self._provider.preview_plan_change(
request=request,
subscriptionkore_provider_ref=subscriptionkore.provider_ref,
new_plan_provider_ref=new_plan_ref,
)
[docs]
async def apply_discount(
self,
subscriptionkore_id: str,
coupon_code: str,
) -> Subscription:
"""Apply a discount to a subscriptionkore."""
log = logger.bind(subscriptionkore_id=subscriptionkore_id, coupon_code=coupon_code)
log.info("Applying discount")
subscriptionkore = await self.get(subscriptionkore_id)
updated = await self._provider.apply_discount(
subscriptionkore_provider_ref=subscriptionkore.provider_ref,
discount=DiscountRequest(coupon_code=coupon_code),
)
# Preserve internal IDs
updated.id = subscriptionkore.id
updated.customer_id = subscriptionkore.customer_id
updated.plan_id = subscriptionkore.plan_id
updated = await self._subscriptionkore_repo.save(updated)
log.info("Discount applied")
return updated
[docs]
async def remove_discount(self, subscriptionkore_id: str) -> Subscription:
"""Remove discount from a subscriptionkore."""
log = logger.bind(subscriptionkore_id=subscriptionkore_id)
log.info("Removing discount")
subscriptionkore = await self.get(subscriptionkore_id)
updated = await self._provider.remove_discount(
subscriptionkore_provider_ref=subscriptionkore.provider_ref,
)
# Preserve internal IDs
updated.id = subscriptionkore.id
updated.customer_id = subscriptionkore.customer_id
updated.plan_id = subscriptionkore.plan_id
updated = await self._subscriptionkore_repo.save(updated)
log.info("Discount removed")
return updated
[docs]
async def reactivate(self, subscriptionkore_id: str) -> Subscription:
"""
Reactivate a subscriptionkore that was scheduled to cancel.
Removes the cancel_at_period_end flag.
"""
log = logger.bind(subscriptionkore_id=subscriptionkore_id)
log.info("Reactivating subscriptionkore")
subscriptionkore = await self.get(subscriptionkore_id)
if not subscriptionkore.cancel_at_period_end:
log.info("Subscription not scheduled to cancel, no action needed")
return subscriptionkore
# Update in provider
request = UpdateSubscriptionRequest(
subscriptionkore_id=subscriptionkore_id,
cancel_at_period_end=False,
)
updated = await self._provider.update_subscriptionkore(
request=request,
subscriptionkore_provider_ref=subscriptionkore.provider_ref,
)
# Update local state
updated.id = subscriptionkore.id
updated.customer_id = subscriptionkore.customer_id
updated.plan_id = subscriptionkore.plan_id
updated.cancel_at_period_end = False
updated.canceled_at = None
updated = await self._subscriptionkore_repo.save(updated)
await self._event_bus.publish(
SubscriptionUpdated(
subscriptionkore=updated,
customer_id=updated.customer_id,
changed_fields=["cancel_at_period_end"],
)
)
log.info("Subscription reactivated")
return updated