Metadata-Version: 2.4
Name: taphealth-kafka
Version: 0.2.1
Summary: Kafka utilities for TapHealth Python services
Author-email: Afif <afif@tap.health>
License: MIT
Project-URL: Homepage, https://github.com/taphealth/kafka-py
Project-URL: Repository, https://github.com/taphealth/kafka-py
Keywords: kafka,taphealth
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: confluent-kafka>=2.0.0
Provides-Extra: dev
Requires-Dist: pre-commit>=3.0.0; extra == "dev"
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: ruff>=0.4.0; extra == "dev"

# TapHealth Kafka

A Python library for working with Kafka in TapHealth services. This library provides abstractions over [confluent-kafka](https://github.com/confluentinc/confluent-kafka-python) to simplify producing and consuming Kafka messages.

## Requirements

- Python 3.8+
- confluent-kafka >= 2.0.0

## Installation

```bash
pip install taphealth-kafka
```

## Usage

This package is designed to be installed and used across TapHealth services. The pattern is:

1. Install the package in your project
2. Extend the abstract classes (`KafkaProducer`/`KafkaConsumer`) and implement the required abstract methods
3. Use the provided event dataclasses to structure your messages

### Connecting to Kafka

```python
from taphealth_kafka import KafkaClient

# Using context manager (recommended — auto-disconnects)
with KafkaClient() as client:
    client.connect(["localhost:9092"])
    # ... use client ...

# Or manually manage the lifecycle
client = KafkaClient()
client.connect(["localhost:9092"])
# ... use client ...
client.disconnect()

# Disable automatic topic creation on connect
client.connect(["localhost:9092"], auto_create_topics=False)
```

### Producing Messages

Extend `KafkaProducer` and implement the `topic` property:

```python
from taphealth_kafka import KafkaProducer, Topics

class GlucoseLoggedProducer(KafkaProducer):
    @property
    def topic(self) -> Topics:
        return Topics.GLUCOSE_LOGGED

producer = GlucoseLoggedProducer(kafka_client)
```

Use event dataclasses to structure your messages:

```python
from datetime import datetime
from taphealth_kafka.events import GlucoseLoggedData, GlucoseType, GlucoseRange

glucose_data = GlucoseLoggedData(
    user_id="user-12345",
    date=datetime.now().strftime("%Y-%m-%d"),
    glucose_type=GlucoseType.FASTING,
    glucose_reading=105.0,
    glucose_range=GlucoseRange.NORMAL,
    is_critical=False,
    timezone="UTC",
)

# Send the dataclass directly — auto-serialized to camelCase JSON
producer.send(glucose_data.to_dict())
```

### Consuming Messages

Extend `KafkaConsumer` and implement the required abstract methods:

```python
from taphealth_kafka import KafkaConsumer, Topics

class GlucoseLoggedConsumer(KafkaConsumer):
    @property
    def topic(self) -> Topics:
        return Topics.GLUCOSE_LOGGED

    @property
    def group_id(self) -> str:
        return "glucose-processor-group"

    def on_message(self, data: dict, message) -> None:
        # Deserialize back into a dataclass
        from taphealth_kafka.events import GlucoseLoggedData
        glucose = GlucoseLoggedData.from_dict(data)
        print(f"User: {glucose.user_id}, Reading: {glucose.glucose_reading} mg/dL")

consumer = GlucoseLoggedConsumer(kafka_client)
consumer.consume()  # Blocking call
```

Override `consumer_config` to customize consumer settings:

```python
class LatestOffsetConsumer(KafkaConsumer):
    @property
    def consumer_config(self) -> dict:
        return {
            "auto_offset_reset": "latest",
            "enable_auto_commit": False,
        }

    # ... implement topic, group_id, on_message ...
```

### Serialization

All event dataclasses support bidirectional serialization between Python snake_case and Kafka camelCase:
for common TapHealth contracts, enum fields follow `kafka/` numeric enum encoding.

```python
from taphealth_kafka.events import GlucoseLoggedData, GlucoseType, GlucoseRange

# Serialize to camelCase dict
glucose_data = GlucoseLoggedData(
    user_id="user-12345",
    date="2026-01-15",
    glucose_type=GlucoseType.FASTING,
    glucose_reading=105.0,
    glucose_range=GlucoseRange.NORMAL,
    is_critical=False,
    timezone="UTC",
)
data_dict = glucose_data.to_dict()
# {"userId": "user-12345", "date": "2026-01-15", "type": 0, "glucoseRange": 1, ...}

# Deserialize from camelCase dict
glucose = GlucoseLoggedData.from_dict(data_dict)
assert glucose.user_id == "user-12345"
```

For advanced use cases, the serialization utilities are available directly:

```python
from taphealth_kafka import dataclass_to_dict, dataclass_from_dict

# Convert any dataclass to camelCase dict
data_dict = dataclass_to_dict(my_dataclass)

# Reconstruct from camelCase dict with full type awareness
obj = dataclass_from_dict(MyDataclass, data_dict)
```

### Disconnecting

```python
# Preferred: use the context manager (see "Connecting to Kafka")
# Manual: call disconnect() to flush pending messages and close consumers
kafka_client.disconnect()
```

## Complete Example

Here's a complete example showing a producer and consumer for glucose logging:

```python
"""
Glucose Logging Example

Usage:
    # Terminal 1 - Start the consumer
    python example.py consumer

    # Terminal 2 - Send a message with the producer
    python example.py producer
"""

import sys
from datetime import datetime

from taphealth_kafka import (
    KafkaClient,
    KafkaConsumer,
    KafkaProducer,
    Topics,
)
from taphealth_kafka.events import (
    GlucoseLoggedData,
    GlucoseRange,
    GlucoseType,
)


class GlucoseLoggedProducer(KafkaProducer):
    @property
    def topic(self) -> Topics:
        return Topics.GLUCOSE_LOGGED


class GlucoseLoggedConsumer(KafkaConsumer):
    @property
    def topic(self) -> Topics:
        return Topics.GLUCOSE_LOGGED

    @property
    def group_id(self) -> str:
        return "glucose-example-group"

    def on_message(self, data: dict, message) -> None:
        glucose = GlucoseLoggedData.from_dict(data)
        print(f"Received: user={glucose.user_id}, reading={glucose.glucose_reading}")


def run_producer(client: KafkaClient) -> None:
    producer = GlucoseLoggedProducer(client)

    glucose_data = GlucoseLoggedData(
        user_id="user-12345",
        date=datetime.now().strftime("%Y-%m-%d"),
        glucose_type=GlucoseType.FASTING,
        glucose_reading=105.0,
        glucose_range=GlucoseRange.NORMAL,
        is_critical=False,
        timezone="UTC",
    )

    producer.send(glucose_data.to_dict())
    print("Event sent!")


def run_consumer(client: KafkaClient) -> None:
    consumer = GlucoseLoggedConsumer(client)
    print("Listening for events... (Ctrl+C to stop)")
    consumer.consume()


def main() -> None:
    with KafkaClient() as client:
        client.connect(["localhost:9092"])

        mode = sys.argv[1] if len(sys.argv) > 1 else "consumer"

        if mode == "producer":
            run_producer(client)
        else:
            run_consumer(client)


if __name__ == "__main__":
    main()
```

## API Reference

### KafkaClient

The main client for managing Kafka connections. Supports the context manager protocol.

| Method/Property | Description |
|-----------------|-------------|
| `connect(bootstrap_servers, *, auto_create_topics=True)` | Connect to Kafka brokers. Auto-creates all topics by default. |
| `disconnect()` | Disconnect and clean up resources |
| `create_consumer(group_id, **kwargs)` | Create a consumer with the given group ID |
| `create_topics(topics, *, num_partitions=1, replication_factor=1)` | Create topics if they don't exist |
| `producer` | Property to access the producer instance |
| `admin` | Property to access the admin client |
| `bootstrap_servers` | Property to access the list of bootstrap servers |

### KafkaProducer (Abstract)

Base class for producers. Subclasses must implement:

| Property/Method | Description |
|-----------------|-------------|
| `topic` | Abstract property returning the `Topics` enum value |
| `send(data)` | Send a message (data is JSON-serialized automatically) |

### KafkaConsumer (Abstract)

Base class for consumers. Subclasses must implement:

| Property/Method | Description |
|-----------------|-------------|
| `topic` | Abstract property returning the `Topics` enum value |
| `group_id` | Abstract property returning the consumer group ID |
| `on_message(data, message)` | Called for each received message |
| `consumer_config` | Override to customize consumer settings (default: `auto_offset_reset="earliest"`, `enable_auto_commit=True`) |
| `consume()` | Start consuming messages (blocking) |
| `ensure_topics_exist()` | Ensures the topic exists before consuming |

### Topics

Enum containing available Kafka topics:

| Topic | Value |
|-------|-------|
| `Topics.WEEKLY_PLAN_CREATED` | `weekly-plan-created` |
| `Topics.UPDATE_WEEKLY_NUTRITION_PLAN` | `update-weekly-nutrition-plan` |
| `Topics.UPDATE_WEEKLY_PLAN` | `update-weekly-plan` |
| `Topics.CONVERSATION_SUMMARY` | `conversation-summary` |
| `Topics.PROFILE_UPDATED` | `profile-updated` |
| `Topics.DAILY_PLAN_CREATED` | `daily-plan-created` |
| `Topics.DIET_CREATED` | `diet-created` |
| `Topics.EXERCISE_LOGGED` | `exercise-logged` |
| `Topics.GLUCOSE_LOGGED` | `glucose-logged` |
| `Topics.MEAL_LOGGED` | `meal-logged` |
| `Topics.METRIC_LOGGED` | `metric-logged` |
| `Topics.PLAN_UPDATE` | `plan-update` |
| `Topics.PLAN_UPDATED` | `plan-updated` |
| `Topics.VOICE_ANALYTICS` | `voice-analytics` |

> **Deprecated:** `Topics.UPDATE_WEEEKLY_NUTRTION_PLAN` still works but emits a `DeprecationWarning`. Use `Topics.UPDATE_WEEKLY_NUTRITION_PLAN` instead.

Use `Topics.get_all_topics()` to get all topic names as a list of strings.

### Serialization Utilities

| Function | Description |
|----------|-------------|
| `dataclass_to_dict(obj, key_overrides=None)` | Convert a dataclass to a camelCase dict. Recursively handles nested dataclasses, enums, and lists. Skips `None` fields. |
| `dataclass_from_dict(cls, data, key_overrides=None)` | Construct a dataclass from a camelCase dict. Recursively deserializes nested types. |

### Event Dataclasses

The package provides typed dataclasses for structuring event messages. All dataclasses have `to_dict()` and `from_dict()` methods for bidirectional camelCase serialization:

| Event | Description |
|-------|-------------|
| `ConversationSummaryData` | AI conversation summary |
| `VoiceCallAnalyticsData` | Voice call analytics |
| `GlucoseLoggedData` | Glucose reading log |
| `MetricLoggedData` | Health metric log |
| `ExerciseLoggedData` | Exercise log |
| `MealLoggedData` | Logged meal with feedback |
| `DietCreatedData` | Generated diet plan |
| `ProfileUpdatedData` | User profile update |
| `PlanUpdateData` | Plan update command (request to update a plan) |
| `PlanUpdatedData` | Plan updated event (confirms plan was updated) |
| `DailyPlanCreatedData` | Daily plan with nutrition, glucose, exercise, and education plans |

Supporting types and enums:

| Type | Values / Description |
|------|----------------------|
| `GlucoseType` | `FASTING`, `PRE_LUNCH`, `PRE_DINNER`, `POST_BREAKFAST`, `POST_LUNCH`, `POST_DINNER`, `RANDOM` |
| `GlucoseRange` | `LOW`, `NORMAL`, `HIGH`, `VERY_HIGH`, `VERY_LOW` |
| `GlucoseSchedule` | Dataclass with `fasting`, `post_lunch`, `bedtime`, `post_exercise` fields |
| `MealType` | `BREAKFAST`, `LUNCH`, `DINNER`, `SNACK` |
| `MealVariation` | `EASY`, `MODERATE`, `CHALLENGING` |
| `MetricType` | Various health metrics (weight, blood pressure, HbA1c, etc.) |
| `WorkoutStatus` | `COMPLETE`, `INCOMPLETE`, `INCOMPLETE_TOO_EASY`, `INCOMPLETE_TOO_HARD` |
| `WorkoutDifficulty` | Workout difficulty levels |
| `DailyTaskType` | Types for daily plan tasks |
| `PlanUpdatedTrigger` | `ROUTINE`, `LAB_REPORT`, `SYMPTOM` — trigger type for plan update events |
| `HabitKey` | Habit tracking keys |

## Configuration

When creating consumers, you can pass additional configuration options:

```python
consumer = kafka_client.create_consumer(
    group_id="my-group",
    auto_offset_reset="earliest",      # or "latest"
    enable_auto_commit=True,
    session_timeout_ms=6000,
    max_poll_interval_ms=300000
)
```

Or override `consumer_config` in your `KafkaConsumer` subclass (see [Consuming Messages](#consuming-messages)).

## License

MIT
