Metadata-Version: 2.4
Name: taphealth-kafka
Version: 0.1.6
Summary: Kafka utilities for TapHealth Python services
Author-email: Afif <afif@tap.health>
License: MIT
Project-URL: Homepage, https://github.com/taphealth/kafka-py
Project-URL: Repository, https://github.com/taphealth/kafka-py
Keywords: kafka,taphealth
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: confluent-kafka>=2.0.0
Provides-Extra: dev
Requires-Dist: pre-commit>=3.0.0; extra == "dev"
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: ruff>=0.4.0; extra == "dev"

# TapHealth Kafka

A Python library for working with Kafka in TapHealth services. This library provides abstractions over [confluent-kafka](https://github.com/confluentinc/confluent-kafka-python) to simplify producing and consuming Kafka messages.

## Requirements

- Python 3.8+
- confluent-kafka >= 2.0.0

## Installation

```bash
pip install taphealth-kafka
```

## Usage

### Connecting to Kafka

```python
from taphealth_kafka import KafkaClient

# Initialize the client
kafka_client = KafkaClient()

# Connect to Kafka brokers
kafka_client.connect(["localhost:9092"])
```

### Producing Messages

```python
from taphealth_kafka import KafkaProducer, Topics

# Create a producer class
class WeeklyPlanProducer(KafkaProducer):
    @property
    def topic(self):
        return Topics.WEEKLY_PLAN_CREATED

# Initialize the producer
producer = WeeklyPlanProducer(kafka_client)

# Send a message
producer.send({
    "planId": "plan-123",
    "userId": "user-456",
    "createdAt": "2025-01-15T10:30:00Z"
})
```

### Consuming Messages

```python
from taphealth_kafka import KafkaConsumer, Topics

# Create a consumer class
class WeeklyPlanConsumer(KafkaConsumer):
    @property
    def topic(self):
        return Topics.WEEKLY_PLAN_CREATED

    @property
    def group_id(self):
        return "plan-processor-group"

    def on_message(self, data, message):
        # Process the message
        print(f"Received plan: {data}")
        # Implement your business logic here

# Initialize and start the consumer
consumer = WeeklyPlanConsumer(kafka_client)
consumer.consume()  # This will start listening in a blocking manner
```

### Disconnecting

```python
# Clean up when done
kafka_client.disconnect()
```

## API Reference

### KafkaClient

The main client for managing Kafka connections.

| Method/Property | Description |
|-----------------|-------------|
| `connect(bootstrap_servers: List[str])` | Connect to Kafka brokers |
| `disconnect()` | Disconnect and clean up resources |
| `create_consumer(group_id, **kwargs)` | Create a consumer with the given group ID |
| `create_topics(topics: List[str])` | Create topics if they don't exist |
| `producer` | Property to access the producer instance |
| `admin` | Property to access the admin client |
| `bootstrap_servers` | Property to access the list of bootstrap servers |

### KafkaProducer (Abstract)

Base class for producers. Subclasses must implement:

| Property/Method | Description |
|-----------------|-------------|
| `topic` | Abstract property returning the `Topics` enum value |
| `send(data)` | Send a message (data is JSON-serialized automatically) |

### KafkaConsumer (Abstract)

Base class for consumers. Subclasses must implement:

| Property/Method | Description |
|-----------------|-------------|
| `topic` | Abstract property returning the `Topics` enum value |
| `group_id` | Abstract property returning the consumer group ID |
| `on_message(data, message)` | Called for each received message |
| `consume()` | Start consuming messages (blocking) |
| `ensure_topics_exist()` | Ensures the topic exists before consuming |

### Topics

Enum containing available Kafka topics:

| Topic | Value |
|-------|-------|
| `Topics.WEEKLY_PLAN_CREATED` | `weekly-plan-created` |
| `Topics.UPDATE_WEEEKLY_NUTRTION_PLAN` | `update-weekly-nutrition-plan` |
| `Topics.UPDATE_WEEKLY_PLAN` | `update-weekly-plan` |
| `Topics.CONVERSATION_SUMMARY` | `conversation-summary` |
| `Topics.PROFILE_UPDATED` | `profile-updated` |
| `Topics.DAILY_PLAN_CREATED` | `daily-plan-created` |
| `Topics.DIET_CREATED` | `diet-created` |
| `Topics.EXERCISE_LOGGED` | `exercise-logged` |
| `Topics.GLUCOSE_LOGGED` | `glucose-logged` |
| `Topics.MEAL_LOGGED` | `meal-logged` |
| `Topics.METRIC_LOGGED` | `metric-logged` |
| `Topics.VOICE_ANALYTICS` | `voice-analytics` |

## Example Service Integration

```python
import threading
import logging
from taphealth_kafka import KafkaClient, KafkaProducer, KafkaConsumer, Topics

logger = logging.getLogger(__name__)

# Initialize Kafka
kafka_client = KafkaClient()
kafka_client.connect(["kafka-broker:9092"])

# Producer
class WeeklyPlanProducer(KafkaProducer):
    @property
    def topic(self):
        return Topics.WEEKLY_PLAN_CREATED

# Consumer
class WeeklyPlanConsumer(KafkaConsumer):
    @property
    def topic(self):
        return Topics.WEEKLY_PLAN_CREATED

    @property
    def group_id(self):
        return "plan-service"

    def on_message(self, data, message):
        logger.info(f"Processing plan: {data}")

# Start consumer in a separate thread
def start_consumer():
    consumer = WeeklyPlanConsumer(kafka_client)
    consumer.consume()

consumer_thread = threading.Thread(target=start_consumer)
consumer_thread.daemon = True
consumer_thread.start()

# Use producer in your API endpoints
producer = WeeklyPlanProducer(kafka_client)

def create_plan(plan_data):
    producer.send(plan_data)
    return {"status": "plan created"}
```

## Configuration

The library uses confluent-kafka's configuration format internally. When creating consumers, you can pass additional configuration options:

```python
consumer = kafka_client.create_consumer(
    group_id="my-group",
    auto_offset_reset="earliest",      # or "latest"
    enable_auto_commit=True,
    session_timeout_ms=6000,
    max_poll_interval_ms=300000
)
```

## License

MIT
