"""Chargebee payment provider adapter."""
from __future__ import annotations
import base64
import json
from datetime import datetime
from decimal import Decimal
from typing import Any
from urllib.parse import urlencode
import httpx
import structlog
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from subscriptionkore.config import ChargebeeConfig
from subscriptionkore.core.exceptions import (
ProviderAPIError,
ProviderAuthenticationError,
ProviderNetworkError,
ProviderRateLimitError,
)
from subscriptionkore.core.models import (
Customer,
Invoice,
InvoiceStatus,
Plan,
Product,
ProviderReference,
ProviderType,
Subscription,
SubscriptionStatus,
)
from subscriptionkore.core.models.customer import Address
from subscriptionkore.core.models.invoice import InvoiceLineItem
from subscriptionkore.core.models.subscription import (
AppliedDiscount,
PauseBehavior,
PauseConfig,
)
from subscriptionkore.core.models.value_objects import (
BillingPeriod,
Currency,
DateRange,
Interval,
Money,
)
from subscriptionkore.ports.provider import (
ChangePlanRequest,
ChangePreview,
CheckoutRequest,
CheckoutSession,
CreateSubscriptionRequest,
DiscountRequest,
PaymentProviderPort,
PortalSession,
ProrationBehavior,
ProviderCapabilities,
ProviderWebhookEvent,
UpdateSubscriptionRequest,
)
logger = structlog.get_logger()
[docs]
class ChargebeeAdapter(PaymentProviderPort):
"""
Chargebee payment provider implementation.
Implements the Chargebee API for subscriptionkore management.
Reference: https://apidocs.eu.chargebee. com/docs/api/
"""
STATUS_MAP = {
"in_trial": SubscriptionStatus.TRIALING,
"active": SubscriptionStatus.ACTIVE,
"non_renewing": SubscriptionStatus.ACTIVE,
"paused": SubscriptionStatus.PAUSED,
"cancelled": SubscriptionStatus.CANCELED,
"future": SubscriptionStatus.INCOMPLETE,
}
INVOICE_STATUS_MAP = {
"paid": InvoiceStatus.PAID,
"posted": InvoiceStatus.OPEN,
"payment_due": InvoiceStatus.OPEN,
"not_paid": InvoiceStatus.OPEN,
"voided": InvoiceStatus.VOID,
"pending": InvoiceStatus.DRAFT,
}
def __init__(self, config: ChargebeeConfig) -> None:
self._config = config
self._client: httpx.AsyncClient | None = None
self._base_url = f"https://{config.site}. chargebee.com/api/v2"
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
auth_string = base64.b64encode(f"{self._config.api_key}:".encode("utf-8")).decode(
"utf-8"
)
self._client = httpx.AsyncClient(
base_url=self._base_url,
headers={
"Authorization": f"Basic {auth_string}",
"Content-Type": "application/x-www-form-urlencoded",
},
timeout=30.0,
)
return self._client
[docs]
async def close(self) -> None:
if self._client:
await self._client.aclose()
self._client = None
@property
def provider_type(self) -> ProviderType:
return ProviderType.CHARGEBEE
@property
def capabilities(self) -> ProviderCapabilities:
return ProviderCapabilities(
supports_pausing=True,
supports_trials=True,
supports_quantity=True,
supports_immediate_cancel=True,
supports_proration=True,
supports_coupons=True,
supports_metered_billing=True,
supports_customer_portal=True,
supports_checkout_sessions=True,
)
def _handle_error(self, response: httpx.Response) -> None:
"""Handle Chargebee API errors."""
if response.status_code == 429:
retry_after = response.headers.get("Retry-After")
raise ProviderRateLimitError(
provider="chargebee",
retry_after=int(retry_after) if retry_after else None,
)
if response.status_code == 401:
raise ProviderAuthenticationError(provider="chargebee")
if response.status_code >= 400:
try:
error_data = response.json()
raise ProviderAPIError(
message=error_data.get("message", "Unknown Chargebee error"),
provider="chargebee",
status_code=response.status_code,
provider_message=error_data.get("message"),
provider_code=error_data.get("error_code"),
)
except json.JSONDecodeError:
raise ProviderAPIError(
message=f"Chargebee API error: {response.status_code}",
provider="chargebee",
status_code=response.status_code,
)
def _flatten_params(self, data: dict[str, Any], prefix: str = "") -> dict[str, str]:
"""Flatten nested dict to Chargebee's format: customer[email] = value."""
result: dict[str, str] = {}
for key, value in data.items():
full_key = f"{prefix}[{key}]" if prefix else key
if isinstance(value, dict):
result.update(self._flatten_params(value, full_key))
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, dict):
result.update(self._flatten_params(item, f"{full_key}[{i}]"))
else:
result[f"{full_key}[{i}]"] = str(item)
elif value is not None:
result[full_key] = str(value)
return result
@retry(
retry=retry_if_exception_type((ProviderRateLimitError, ProviderNetworkError)),
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=60),
)
async def _request(
self,
method: str,
endpoint: str,
data: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Make authenticated request to Chargebee API."""
client = await self._get_client()
try:
encoded_data = None
if data:
flat_data = self._flatten_params(data)
encoded_data = urlencode(flat_data)
response = await client.request(
method=method,
url=endpoint,
content=encoded_data,
params=params,
)
except httpx.NetworkError as e:
raise ProviderNetworkError(provider="chargebee", original_error=e) from e
self._handle_error(response)
return response.json()
# Customer Operations
[docs]
async def create_customer(self, customer: Customer) -> ProviderReference:
data: dict[str, Any] = {
"email": customer.email,
"cf_subscriptionkore_id": customer.id,
"cf_external_id": customer.external_id,
}
if customer.name:
parts = customer.name.split(" ", 1)
data["first_name"] = parts[0]
if len(parts) > 1:
data["last_name"] = parts[1]
if customer.billing_address:
addr = customer.billing_address
data["billing_address"] = {
"line1": addr.line1,
"line2": addr.line2,
"city": addr.city,
"state": addr.state,
"zip": addr.postal_code,
"country": addr.country,
}
result = await self._request("POST", "/customers", data=data)
customer_data = result.get("customer", {})
return ProviderReference(
provider=ProviderType.CHARGEBEE,
external_id=customer_data["id"],
metadata={"email": customer_data.get("email")},
)
[docs]
async def update_customer(self, customer: Customer) -> None:
provider_ref = customer.get_provider_ref("chargebee")
if provider_ref is None:
raise ProviderAPIError(
message="Customer has no Chargebee reference",
provider="chargebee",
status_code=400,
)
data: dict[str, Any] = {"email": customer.email}
if customer.name:
parts = customer.name.split(" ", 1)
data["first_name"] = parts[0]
if len(parts) > 1:
data["last_name"] = parts[1]
await self._request("POST", f"/customers/{provider_ref.external_id}", data=data)
[docs]
async def delete_customer(self, provider_ref: ProviderReference) -> None:
await self._request("POST", f"/customers/{provider_ref.external_id}/delete")
[docs]
async def get_customer(self, provider_ref: ProviderReference) -> Customer:
result = await self._request("GET", f"/customers/{provider_ref.external_id}")
data = result.get("customer", {})
name = None
if data.get("first_name") or data.get("last_name"):
name = f"{data.get('first_name', '')} {data.get('last_name', '')}".strip()
address = None
billing = data.get("billing_address", {})
if billing:
address = Address(
line1=billing.get("line1"),
line2=billing.get("line2"),
city=billing.get("city"),
state=billing.get("state"),
postal_code=billing.get("zip"),
country=billing.get("country"),
)
return Customer(
id=data.get("cf_subscriptionkore_id", ""),
external_id=data.get("cf_external_id", ""),
email=data.get("email", ""),
name=name,
billing_address=address,
provider_refs=[provider_ref],
created_at=datetime.fromtimestamp(data.get("created_at", 0)),
)
# Subscription Operations
[docs]
async def create_subscriptionkore(
self,
request: CreateSubscriptionRequest,
customer_provider_ref: ProviderReference,
plan_provider_ref: ProviderReference,
) -> Subscription:
data: dict[str, Any] = {
"plan_id": plan_provider_ref.external_id,
"plan_quantity": request.quantity,
"cf_subscriptionkore_customer_id": request.customer_id,
"cf_subscriptionkore_plan_id": request.plan_id,
}
if request.trial_period_days:
data["trial_end"] = int(
(datetime.utcnow().timestamp()) + (request.trial_period_days * 86400)
)
if request.coupon_code:
data["coupon_ids"] = [request.coupon_code]
for key, value in request.metadata.items():
data[f"cf_{key}"] = str(value)
result = await self._request(
"POST",
f"/customers/{customer_provider_ref.external_id}/subscriptionkores",
data=data,
)
return self._map_subscriptionkore(
result.get("subscriptionkore", {}),
result.get("customer", {}),
request.customer_id,
request.plan_id,
)
[docs]
async def update_subscriptionkore(
self,
request: UpdateSubscriptionRequest,
subscriptionkore_provider_ref: ProviderReference,
) -> Subscription:
data: dict[str, Any] = {}
if request.quantity is not None:
data["plan_quantity"] = request.quantity
if request.cancel_at_period_end is not None:
if request.cancel_at_period_end:
result = await self._request(
"POST",
f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}/cancel_for_items",
data={"end_of_term": True},
)
else:
result = await self._request(
"POST",
f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}/remove_scheduled_cancellation",
)
return self._map_subscriptionkore(result.get("subscriptionkore", {}), {}, "", "")
if request.metadata:
for key, value in request.metadata.items():
data[f"cf_{key}"] = str(value)
result = await self._request(
"POST",
f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}",
data=data,
)
return self._map_subscriptionkore(result.get("subscriptionkore", {}), {}, "", "")
[docs]
async def cancel_subscriptionkore(
self,
subscriptionkore_provider_ref: ProviderReference,
immediate: bool = False,
) -> Subscription:
result = await self._request(
"POST",
f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}/cancel_for_items",
data={"end_of_term": not immediate},
)
return self._map_subscriptionkore(result.get("subscriptionkore", {}), {}, "", "")
[docs]
async def pause_subscriptionkore(
self,
subscriptionkore_provider_ref: ProviderReference,
resumes_at: datetime | None = None,
) -> Subscription:
data: dict[str, Any] = {}
if resumes_at:
data["resume_date"] = int(resumes_at.timestamp())
result = await self._request(
"POST",
f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}/pause",
data=data,
)
return self._map_subscriptionkore(result.get("subscriptionkore", {}), {}, "", "")
[docs]
async def resume_subscriptionkore(
self,
subscriptionkore_provider_ref: ProviderReference,
) -> Subscription:
result = await self._request(
"POST",
f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}/resume",
)
return self._map_subscriptionkore(result.get("subscriptionkore", {}), {}, "", "")
[docs]
async def get_subscriptionkore(
self,
provider_ref: ProviderReference,
) -> Subscription:
result = await self._request("GET", f"/subscriptionkores/{provider_ref.external_id}")
return self._map_subscriptionkore(result.get("subscriptionkore", {}), {}, "", "")
# Plan Change Operations
[docs]
async def change_plan(
self,
request: ChangePlanRequest,
subscriptionkore_provider_ref: ProviderReference,
new_plan_provider_ref: ProviderReference,
) -> Subscription:
proration_map = {
ProrationBehavior.CREATE_PRORATIONS: True,
ProrationBehavior.NONE: False,
ProrationBehavior.ALWAYS_INVOICE: True,
}
data = {
"subscriptionkore_items": {
"item_price_id": {0: new_plan_provider_ref.external_id},
"quantity": {0: 1},
},
"prorate": proration_map.get(request.proration_behavior, True),
}
result = await self._request(
"POST",
f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}/update_for_items",
data=data,
)
return self._map_subscriptionkore(
result.get("subscriptionkore", {}),
{},
"",
request.new_plan_id,
)
[docs]
async def preview_plan_change(
self,
request: ChangePlanRequest,
subscriptionkore_provider_ref: ProviderReference,
new_plan_provider_ref: ProviderReference,
) -> ChangePreview:
data = {
"subscriptionkore": {"id": subscriptionkore_provider_ref.external_id},
"subscriptionkore_items": {
"item_price_id": {0: new_plan_provider_ref.external_id},
"quantity": {0: 1},
},
"replace_items_list": True,
}
result = await self._request(
"POST", "/estimates/update_subscriptionkore_for_items", data=data
)
estimate = result.get("estimate", {})
invoice_estimate = estimate.get("invoice_estimate", {})
currency = Currency(invoice_estimate.get("currency_code", "USD"))
return ChangePreview(
immediate_charge=Money(
amount=Decimal(str(invoice_estimate.get("amount_due", 0))) / 100,
currency=currency,
),
next_invoice_amount=Money(
amount=Decimal(str(invoice_estimate.get("total", 0))) / 100,
currency=currency,
),
proration_amount=Money(
amount=Decimal(str(invoice_estimate.get("credits_applied", 0))) / 100,
currency=currency,
),
credit_amount=Money.zero(currency),
next_billing_date=datetime.fromtimestamp(
estimate.get("next_invoice_estimate", {}).get("date", 0)
),
)
# Discount Operations
[docs]
async def apply_discount(
self,
subscriptionkore_provider_ref: ProviderReference,
discount: DiscountRequest,
) -> Subscription:
data: dict[str, Any] = {}
if discount.coupon_code:
data["coupon_ids"] = [discount.coupon_code]
result = await self._request(
"POST",
f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}",
data=data,
)
return self._map_subscriptionkore(result.get("subscriptionkore", {}), {}, "", "")
[docs]
async def remove_discount(
self,
subscriptionkore_provider_ref: ProviderReference,
) -> Subscription:
sub_result = await self._request(
"GET",
f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}",
)
sub = sub_result.get("subscriptionkore", {})
coupons = sub.get("coupons", [])
for coupon in coupons:
await self._request(
"POST",
f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}/remove_coupons",
data={"coupon_ids": [coupon.get("coupon_id")]},
)
result = await self._request(
"GET",
f"/subscriptionkores/{subscriptionkore_provider_ref.external_id}",
)
return self._map_subscriptionkore(result.get("subscriptionkore", {}), {}, "", "")
# Billing Operations
[docs]
async def get_invoice(self, provider_ref: ProviderReference) -> Invoice:
result = await self._request("GET", f"/invoices/{provider_ref.external_id}")
return self._map_invoice(result.get("invoice", {}))
[docs]
async def list_invoices(
self,
customer_provider_ref: ProviderReference,
limit: int = 10,
starting_after: str | None = None,
) -> list[Invoice]:
params: dict[str, Any] = {
"customer_id[is]": customer_provider_ref.external_id,
"limit": limit,
"sort_by[desc]": "date",
}
if starting_after:
params["offset"] = starting_after
result = await self._request("GET", "/invoices", params=params)
return [self._map_invoice(item.get("invoice", {})) for item in result.get("list", [])]
[docs]
async def get_upcoming_invoice(
self,
subscriptionkore_provider_ref: ProviderReference,
) -> Invoice | None:
try:
result = await self._request(
"POST",
"/estimates/upcoming_invoices_estimate",
data={"subscriptionkore_id": subscriptionkore_provider_ref.external_id},
)
estimates = result.get("estimate", {}).get("invoice_estimates", [])
if estimates:
return self._map_estimate_to_invoice(estimates[0])
return None
except ProviderAPIError as e:
if e.status_code == 404:
return None
raise
# Checkout / Portal
[docs]
async def create_checkout_session(
self,
request: CheckoutRequest,
plan_provider_ref: ProviderReference,
customer_provider_ref: ProviderReference | None = None,
) -> CheckoutSession:
data: dict[str, Any] = {
"subscriptionkore_items": {
"item_price_id": {0: plan_provider_ref.external_id},
"quantity": {0: request.quantity},
},
"redirect_url": request.success_url,
"cancel_url": request.cancel_url,
}
if customer_provider_ref:
data["customer"] = {"id": customer_provider_ref.external_id}
elif request.customer_email:
data["customer"] = {"email": request.customer_email}
if request.trial_period_days:
data["subscriptionkore"] = {
"trial_end": int(
datetime.utcnow().timestamp() + (request.trial_period_days * 86400)
)
}
for key, value in request.metadata.items():
data[f"cf_{key}"] = str(value)
result = await self._request("POST", "/hosted_pages/checkout_new_for_items", data=data)
hosted_page = result.get("hosted_page", {})
return CheckoutSession(
id=hosted_page["id"],
url=hosted_page["url"],
expires_at=datetime.fromtimestamp(hosted_page.get("expires_at", 0)),
)
[docs]
async def create_portal_session(
self,
customer_provider_ref: ProviderReference,
return_url: str,
) -> PortalSession:
result = await self._request(
"POST",
"/portal_sessions",
data={
"customer": {"id": customer_provider_ref.external_id},
"redirect_url": return_url,
},
)
portal = result.get("portal_session", {})
return PortalSession(
id=portal["id"],
url=portal["access_url"],
expires_at=datetime.fromtimestamp(portal.get("expires_at", 0)),
)
# Webhook Handling
[docs]
async def verify_webhook(
self,
payload: bytes,
headers: dict[str, str],
) -> bool:
"""Verify Chargebee webhook using basic auth."""
if self._config.webhook_username and self._config.webhook_password:
auth_header = headers.get("authorization", "")
if auth_header.startswith("Basic "):
try:
encoded = auth_header[6:]
decoded = base64.b64decode(encoded).decode("utf-8")
username, password = decoded.split(":", 1)
return (
username == self._config.webhook_username
and password == self._config.webhook_password
)
except Exception:
return False
return False
logger.warning("Chargebee webhook received without authentication configured")
return True
[docs]
async def parse_webhook(
self,
payload: bytes,
headers: dict[str, str],
) -> ProviderWebhookEvent:
"""Parse Chargebee webhook payload."""
data = json.loads(payload)
return ProviderWebhookEvent(
provider=ProviderType.CHARGEBEE,
event_id=data.get("id", ""),
event_type=data.get("event_type", ""),
occurred_at=datetime.fromtimestamp(data.get("occurred_at", 0)),
data=data.get("content", {}),
raw_payload=payload,
)
# Sync Operations
[docs]
async def sync_products(self) -> list[Product]:
result = await self._request("GET", "/item_families")
products = []
for item in result.get("list", []):
family = item.get("item_family", {})
products.append(
Product(
id="",
name=family.get("name", ""),
description=family.get("description"),
provider_refs=[
ProviderReference(
provider=ProviderType.CHARGEBEE,
external_id=family["id"],
)
],
active=family.get("status") == "active",
metadata={},
created_at=datetime.utcnow(),
)
)
return products
[docs]
async def sync_plans(self, product_provider_ref: ProviderReference) -> list[Plan]:
result = await self._request(
"GET",
"/item_prices",
params={"item_family_id[is]": product_provider_ref.external_id},
)
plans = []
for item in result.get("list", []):
price = item.get("item_price", {})
period_unit = price.get("period_unit", "month")
period_map = {
"day": Interval.DAY,
"week": Interval.WEEK,
"month": Interval.MONTH,
"year": Interval.YEAR,
}
plans.append(
Plan(
id="",
product_id="",
name=price.get("name", price.get("id", "")),
description=price.get("description"),
provider_refs=[
ProviderReference(
provider=ProviderType.CHARGEBEE,
external_id=price["id"],
)
],
price=Money(
amount=Decimal(str(price.get("price", 0))) / 100,
currency=Currency(price.get("currency_code", "USD")),
),
billing_period=BillingPeriod(
interval=period_map.get(period_unit, Interval.MONTH),
interval_count=price.get("period", 1),
),
trial_period_days=price.get("trial_period"),
active=price.get("status") == "active",
metadata={},
created_at=datetime.utcnow(),
)
)
return plans
# Mapping Helpers
def _map_subscriptionkore(
self,
data: dict[str, Any],
customer_data: dict[str, Any],
customer_id: str,
plan_id: str,
) -> Subscription:
"""Map Chargebee subscriptionkore to domain model."""
status = self.STATUS_MAP.get(data.get("status", "active"), SubscriptionStatus.ACTIVE)
# Handle pause
pause_config = None
if data.get("pause_date"):
pause_config = PauseConfig(
resumes_at=datetime.fromtimestamp(data["resume_date"])
if data.get("resume_date")
else None,
behavior=PauseBehavior.VOID,
)
# Handle discount
discount = None
coupons = data.get("coupons", [])
if coupons:
coupon = coupons[0]
discount = AppliedDiscount(
discount_id=coupon.get("coupon_id", ""),
coupon_code=coupon.get("coupon_code"),
)
# Get trial end
trial_end = None
if data.get("trial_end"):
trial_end = datetime.fromtimestamp(data["trial_end"])
# Get current period
current_term_start = data.get("current_term_start", 0)
current_term_end = data.get("current_term_end")
# Use custom fields for internal IDs
resolved_customer_id = customer_id or data.get("cf_subscriptionkore_customer_id", "")
resolved_plan_id = plan_id or data.get("cf_subscriptionkore_plan_id", "")
return Subscription(
id="",
customer_id=resolved_customer_id,
plan_id=resolved_plan_id,
provider_ref=ProviderReference(
provider=ProviderType.CHARGEBEE,
external_id=data["id"],
metadata={"customer_id": data.get("customer_id")},
),
status=status,
current_period=DateRange(
start=datetime.fromtimestamp(current_term_start)
if current_term_start
else datetime.utcnow(),
end=datetime.fromtimestamp(current_term_end) if current_term_end else None,
),
trial_end=trial_end,
cancel_at_period_end=data.get("cancel_schedule_created_at") is not None,
canceled_at=datetime.fromtimestamp(data["cancelled_at"])
if data.get("cancelled_at")
else None,
pause_collection=pause_config,
discount=discount,
quantity=data.get("plan_quantity", 1),
metadata={},
created_at=datetime.fromtimestamp(data.get("created_at", 0)),
)
def _map_invoice(self, data: dict[str, Any]) -> Invoice:
"""Map Chargebee invoice to domain model."""
currency = Currency(data.get("currency_code", "USD"))
status = self.INVOICE_STATUS_MAP.get(data.get("status", "posted"), InvoiceStatus.OPEN)
# Map line items
line_items = []
for item in data.get("line_items", []):
line_items.append(
InvoiceLineItem(
id=item.get("id", ""),
description=item.get("description", ""),
quantity=item.get("quantity", 1),
unit_amount=Money(
amount=Decimal(str(item.get("unit_amount", 0))) / 100,
currency=currency,
),
amount=Money(
amount=Decimal(str(item.get("amount", 0))) / 100,
currency=currency,
),
)
)
# Build period
period = None
if data.get("line_items"):
first_item = data["line_items"][0]
if first_item.get("date_from") and first_item.get("date_to"):
period = DateRange(
start=datetime.fromtimestamp(first_item["date_from"]),
end=datetime.fromtimestamp(first_item["date_to"]),
)
return Invoice(
id="",
customer_id=data.get("customer_id", ""),
subscriptionkore_id=data.get("subscriptionkore_id"),
provider_ref=ProviderReference(
provider=ProviderType.CHARGEBEE,
external_id=data.get("id", ""),
),
status=status,
subtotal=Money(
amount=Decimal(str(data.get("sub_total", 0))) / 100,
currency=currency,
),
tax=Money(
amount=Decimal(str(data.get("tax", 0))) / 100,
currency=currency,
),
discount_amount=Money(
amount=Decimal(str(data.get("discounts", [{}])[0].get("amount", 0))) / 100
if data.get("discounts")
else Decimal("0"),
currency=currency,
),
total=Money(
amount=Decimal(str(data.get("total", 0))) / 100,
currency=currency,
),
amount_paid=Money(
amount=Decimal(str(data.get("amount_paid", 0))) / 100,
currency=currency,
),
amount_due=Money(
amount=Decimal(str(data.get("amount_due", 0))) / 100,
currency=currency,
),
currency=currency,
line_items=line_items,
period=period,
due_date=datetime.fromtimestamp(data["due_date"]) if data.get("due_date") else None,
paid_at=datetime.fromtimestamp(data["paid_at"]) if data.get("paid_at") else None,
invoice_pdf_url=data.get("download_url"),
metadata={},
created_at=datetime.fromtimestamp(data.get("date", 0)),
)
def _map_estimate_to_invoice(self, data: dict[str, Any]) -> Invoice:
"""Map Chargebee invoice estimate to Invoice domain model."""
currency = Currency(data.get("currency_code", "USD"))
return Invoice(
id="",
customer_id="",
subscriptionkore_id="",
provider_ref=ProviderReference(
provider=ProviderType.CHARGEBEE,
external_id="estimate",
),
status=InvoiceStatus.DRAFT,
subtotal=Money(
amount=Decimal(str(data.get("sub_total", 0))) / 100,
currency=currency,
),
tax=Money(
amount=Decimal(str(data.get("tax", 0))) / 100,
currency=currency,
),
discount_amount=Money.zero(currency),
total=Money(
amount=Decimal(str(data.get("total", 0))) / 100,
currency=currency,
),
amount_paid=Money.zero(currency),
amount_due=Money(
amount=Decimal(str(data.get("total", 0))) / 100,
currency=currency,
),
currency=currency,
line_items=[],
due_date=datetime.fromtimestamp(data["date"]) if data.get("date") else None,
metadata={},
created_at=datetime.utcnow(),
)