Metadata-Version: 2.4
Name: klime
Version: 1.1.0
Summary: Klime SDK for Python
Author: Klime
License: MIT
Project-URL: Homepage, https://github.com/klimeapp/klime-python
Project-URL: Documentation, https://github.com/klimeapp/klime-python
Project-URL: Repository, https://github.com/klimeapp/klime-python
Keywords: analytics,klime,tracking,events
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE.md
Dynamic: license-file

# klime

Klime SDK for Python.

## Installation

```bash
pip install klime
```

## Quick Start

```python
from klime import KlimeClient

client = KlimeClient(
    write_key='your-write-key'
)

# Identify a user
client.identify('user_123', {
    'email': 'user@example.com',
    'name': 'Stefan'
})

# Track an event
client.track('Button Clicked', {
    'button_name': 'Sign up',
    'plan': 'pro'
}, user_id='user_123')

# Associate user with a group and set group traits
client.group('org_456', {
    'name': 'Acme Inc',
    'plan': 'enterprise'
}, user_id='user_123')

# Or just link the user to a group (if traits are already set)
client.group('org_456', user_id='user_123')

# Shutdown gracefully
client.shutdown()
```

## Installation Prompt

Copy and paste this prompt into Cursor, Copilot, or your favorite AI editor to integrate Klime:

```
Integrate Klime for customer analytics. Klime tracks user activity to identify which customers are healthy vs at risk of churning.

ANALYTICS MODES (determine which applies):
- Companies & Teams: Your customers are companies with multiple team members (SaaS, enterprise tools)
  → Use identify() + group() + track()
- Individual Customers: Your customers are individuals with private accounts (consumer apps, creator tools)
  → Use identify() + track() only (no group() needed)

KEY CONCEPTS:
- Every track() call requires either user_id OR group_id (no anonymous events)
- Use group_id alone for org-level events (webhooks, cron jobs, system metrics)
- group() links a user to a company AND sets company traits (only for Companies & Teams mode)
- Order doesn't matter - events before identify/group still get attributed correctly

BEST PRACTICES:
- Initialize client ONCE at app startup (singleton pattern)
- Store write key in KLIME_WRITE_KEY environment variable
- Call shutdown() on process exit to flush remaining events (auto-registered via atexit)

Install: pip install klime

import os
from klime import KlimeClient

client = KlimeClient(write_key=os.environ['KLIME_WRITE_KEY'])

# Identify users at signup/login:
client.identify('usr_abc123', {'email': 'jane@acme.com', 'name': 'Jane Smith'})

# Track key activities:
client.track('Report Generated', {'report_type': 'revenue'}, user_id='usr_abc123')
client.track('Feature Used', {'feature': 'export', 'format': 'csv'}, user_id='usr_abc123')
client.track('Teammate Invited', {'role': 'member'}, user_id='usr_abc123')

# If Companies & Teams mode: link user to their company and set company traits
client.group('org_456', {'name': 'Acme Inc', 'plan': 'enterprise'}, user_id='usr_abc123')

INTEGRATION WORKFLOW:

Phase 1: Discover
Explore the codebase to understand:
1. What framework is used? (Django, Flask, FastAPI, Starlette, etc.)
2. Where is user identity available? (e.g., request.user.id, current_user.id, g.user, Depends() injection)
3. Is this Companies & Teams or Individual Customers?
   - Look for: organization, workspace, tenant, team, account models → Companies & Teams (use group())
   - No company/org concept, just individual users → Individual Customers (skip group())
4. Where do core user actions happen? (views, routes, API endpoints, services)
5. Is there existing analytics? (search: segment, posthog, mixpanel, amplitude, track)
Match your integration style to the framework's conventions.

Phase 2: Instrument
Add these calls using idiomatic patterns for the framework:
- Initialize client once (Django: apps.py/settings, Flask: app factory, FastAPI: lifespan/startup)
- identify() in auth/login success handler
- group() when user-org association is established (Companies & Teams mode only)
- track() for key user actions (see below)

WHAT TO TRACK:
Active engagement (primary): feature usage, resource creation, collaboration, completing flows
Session signals (secondary): login/session start, dashboard access - distinguishes "low usage" from "churned"
Do NOT track: every request, health checks, middleware passthrough, background tasks

Phase 3: Verify
Confirm: client initialized, shutdown handled, identify/group/track calls added

Phase 4: Summarize
Report what you added:
- Files modified and what was added to each
- Events being tracked (list event names and what triggers them)
- How user_id is obtained (and group_id if Companies & Teams mode)
- Any assumptions made or questions
```

## API Reference

### Constructor

```python
KlimeClient(
    write_key: str,                    # Required: Your Klime write key
    endpoint: Optional[str] = None,    # Optional: API endpoint (default: https://i.klime.com)
    flush_interval: Optional[int] = None,      # Optional: Milliseconds between flushes (default: 2000)
    max_batch_size: Optional[int] = None,     # Optional: Max events per batch (default: 20, max: 100)
    max_queue_size: Optional[int] = None,      # Optional: Max queued events (default: 1000)
    retry_max_attempts: Optional[int] = None, # Optional: Max retry attempts (default: 5)
    retry_initial_delay: Optional[int] = None, # Optional: Initial retry delay in ms (default: 1000)
    flush_on_shutdown: Optional[bool] = None,  # Optional: Auto-flush on exit (default: True)
    logger: Optional[logging.Logger] = None,   # Optional: Custom logger (default: logging.getLogger("klime"))
    on_error: Optional[Callable] = None,       # Optional: Callback for batch failures
    on_success: Optional[Callable] = None      # Optional: Callback for successful sends
)
```

### Methods

#### `track(event: str, properties: Optional[Dict] = None, user_id: Optional[str] = None, group_id: Optional[str] = None) -> None`

Track an event. Events can be attributed in two ways:
- **User events**: Provide `user_id` to track user activity (most common)
- **Group events**: Provide `group_id` without `user_id` for organization-level events

```python
# User event (most common)
client.track('Button Clicked', {
    'button_name': 'Sign up',
    'plan': 'pro'
}, user_id='user_123')

# Group event (for webhooks, cron jobs, system events)
client.track('Events Received', {
    'count': 100,
    'source': 'webhook'
}, group_id='org_456')
```

> **Note**: The `group_id` parameter can also be combined with `user_id` for multi-tenant scenarios where you need to specify which organization context a user event occurred in.

#### `identify(user_id: str, traits: Optional[Dict] = None) -> None`

Identify a user with traits.

```python
client.identify('user_123', {
    'email': 'user@example.com',
    'name': 'Stefan'
})
```

#### `group(group_id: str, traits: Optional[Dict] = None, user_id: Optional[str] = None) -> None`

Associate a user with a group and/or set group traits.

```python
# Associate user with a group and set group traits (most common)
client.group('org_456', {
    'name': 'Acme Inc',
    'plan': 'enterprise'
}, user_id='user_123')

# Just link a user to a group (traits already set or not needed)
client.group('org_456', user_id='user_123')

# Just update group traits (e.g., from a webhook or background job)
client.group('org_456', {
    'plan': 'enterprise',
    'employee_count': 50
})
```

#### `flush() -> None`

Manually flush queued events immediately.

```python
client.flush()
```

#### `shutdown() -> None`

Gracefully shutdown the client, flushing remaining events.

```python
client.shutdown()
```

#### `queue_size() -> int`

Return the number of events currently in the queue.

```python
pending = client.queue_size()
print(f"{pending} events waiting to be sent")
```

### Synchronous Methods

For cases where you need confirmation that events were sent (e.g., before process exit, in tests), use the synchronous variants:

#### `track_sync(event, properties, user_id, group_id) -> BatchResponse`

Track an event synchronously. Raises `SendError` on failure.

```python
from klime import SendError

try:
    response = client.track_sync('Critical Action', {'key': 'value'}, user_id='user_123')
    print(f"Sent! Accepted: {response.accepted}")
except SendError as e:
    print(f"Failed to send: {e.message}")
```

#### `identify_sync(user_id, traits) -> BatchResponse`

Identify a user synchronously. Raises `SendError` on failure.

#### `group_sync(group_id, traits, user_id) -> BatchResponse`

Associate a user with a group synchronously. Raises `SendError` on failure.

## Features

- **Automatic Batching**: Events are automatically batched and sent every 2 seconds or when the batch size reaches 20 events
- **Automatic Retries**: Failed requests are automatically retried with exponential backoff
- **Thread-Safe**: Safe to use from multiple threads
- **Process Exit Handling**: Automatically flushes events on process exit (via `atexit`)
- **Zero Dependencies**: Uses only Python standard library

## Performance

When you call `track()`, `identify()`, or `group()`, the SDK:

1. Adds the event to a thread-safe queue (microseconds)
2. Returns immediately without waiting for network I/O

Events are sent to Klime's servers in background threads. This means:

- **No network blocking**: HTTP requests happen asynchronously in background threads
- **No latency impact**: Tracking calls add < 1ms to your request handling time
- **Automatic batching**: Events are queued and sent in batches (default: every 2 seconds or 20 events)

```python
# This returns immediately - no HTTP request is made here
client.track('Button Clicked', {'button': 'signup'}, user_id='user_123')

# Your code continues without waiting
return {'success': True}
```

The only blocking operation is `flush()`, which waits for all queued events to be sent. This is typically only called during graceful shutdown.

## Configuration

### Default Values

- `flush_interval`: 2000ms
- `max_batch_size`: 20 events
- `max_queue_size`: 1000 events
- `retry_max_attempts`: 5 attempts
- `retry_initial_delay`: 1000ms
- `flush_on_shutdown`: True

### Logging

The SDK uses Python's built-in `logging` module. By default, it logs to `logging.getLogger("klime")`.

```python
import logging

# Enable debug logging
logging.getLogger("klime").setLevel(logging.DEBUG)

# Or provide a custom logger
client = KlimeClient(
    write_key='your-write-key',
    logger=logging.getLogger("myapp.klime")
)
```

For Django/Flask, the SDK automatically integrates with your app's logging configuration.

### Callbacks

```python
def handle_error(error, events):
    # Report to your error tracking service
    sentry_sdk.capture_exception(error)
    print(f"Failed to send {len(events)} events: {error}")

def handle_success(response):
    print(f"Sent {response.accepted} events")

client = KlimeClient(
    write_key='your-write-key',
    on_error=handle_error,
    on_success=handle_success
)
```

## Error Handling

The SDK automatically handles:

- **Transient errors** (429, 503, network failures): Retries with exponential backoff
- **Permanent errors** (400, 401): Logs error and drops event
- **Rate limiting**: Respects `Retry-After` header

For synchronous operations, use `*_sync()` methods which raise `SendError` on failure:

```python
from klime import KlimeClient, SendError

try:
    response = client.track_sync('Event', user_id='user_123')
except SendError as e:
    print(f"Failed: {e.message}, events: {len(e.events)}")
```

## Size Limits

- Maximum event size: 200KB
- Maximum batch size: 10MB
- Maximum events per batch: 100

Events exceeding these limits are rejected and logged.

## Flask Example

```python
from flask import Flask, request
from klime import KlimeClient

app = Flask(__name__)
client = KlimeClient(write_key='your-write-key')

@app.route('/api/button-clicked', methods=['POST'])
def button_clicked():
    client.track('Button Clicked', {
        'button_name': request.json.get('buttonName')
    }, user_id=request.json.get('userId'))

    return {'success': True}

# Graceful shutdown
import atexit
atexit.register(client.shutdown)
```

## Django Example

```python
from django.http import JsonResponse
from django.views import View
from klime import KlimeClient

client = KlimeClient(write_key='your-write-key')

class ButtonClickView(View):
    def post(self, request):
        client.track('Button Clicked', {
            'button_name': request.POST.get('buttonName')
        }, user_id=str(request.user.id))

        return JsonResponse({'success': True})
```

## Requirements

- Python 3.9 or higher
- No external dependencies (uses only standard library)

## License

MIT
