Metadata-Version: 2.4
Name: ct-mlflow-lib
Version: 0.3.0
Summary: CleverTap internal MLflow client library
Project-URL: Repository, https://github.com/CleverTap-DS/MLflow
Requires-Python: >=3.11
Requires-Dist: databricks-sdk<1,>=0.38.0
Requires-Dist: gitpython<4,>=3.1.40
Requires-Dist: graphene<4,>=3.2.0
Requires-Dist: mlflow==3.11.1
Requires-Dist: opentelemetry-api<3,>=1.25.0
Requires-Dist: opentelemetry-sdk<3,>=1.25.0
Provides-Extra: dev
Requires-Dist: pytest-cov>=4.0; extra == 'dev'
Requires-Dist: pytest-xdist>=3.0; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: tensorflow>=2.10; extra == 'dev'
Provides-Extra: keras
Requires-Dist: tensorflow>=2.10; extra == 'keras'
Description-Content-Type: text/markdown

# ct-mlflow-lib

Thin MLflow client library for CleverTap datascience products.

Application code should use **only** this package for tracking (metrics, params, artifacts).
Do not `import mlflow` in product code; use `ct_mlflow_lib` helpers so logging is a no-op when
no run is active.

## Install

Pin a git tag in your `requirements.txt` or `pyproject.toml`:

```bash
pip install "git+https://github.com/CleverTap-DS/MLflow@v0.1.0#subdirectory=ct_mlflow_lib"
```

Or with `uv`:

```bash
uv add "ct-mlflow-lib @ git+https://github.com/CleverTap-DS/MLflow@v0.1.0#subdirectory=ct_mlflow_lib"
```

### Optional: Keras support

To use `MLflowKerasCallback`, install with the `keras` extra:

```bash
pip install "git+https://github.com/CleverTap-DS/MLflow@v0.1.0#subdirectory=ct_mlflow_lib[keras]"
```

The core library works without TensorFlow. `MLflowKerasCallback` will raise an `ImportError` if TensorFlow is not installed.

## Quick Start

The primary API is `try_init_product()`. Pass the **task**, **product**, and **env** names;
the experiment name is built as `{product}.{env}.{task}`.

```python
import ct_mlflow_lib

success, reason = ct_mlflow_lib.try_init_product(
    "train",
    product="recommendation",
    env="prod",
    tags={
        "account_id": "123",
        "catalog_id": "456",
    },
)

if not success:
    logger.warning(f"MLflow disabled: {reason}")
else:
    ct_mlflow_lib.log_metric("auc", 0.87)
    ct_mlflow_lib.log_param("lr", 0.01)
    ct_mlflow_lib.end_mlflow(exit_code=0)
```

Logging helpers (`log_metric`, `log_metrics`, `log_param`, `log_params`, `set_tag`, `log_artifact`)
are safe to call even when init failed or no run is active: they no-op.

## API Reference

### `try_init_product(task: str, product: str, env: str, tags: dict | None = None) -> tuple[bool, str | None]`

Initialize MLflow for a product run. Returns immediately with status instead of raising exceptions.

**Args:**

- `task`: Task segment only (e.g. `"train"`, `"eval"`). Full experiment name is
  `{product}.{env}.{task}` (e.g. `recommendation.prod.train`).
- `product`: Product name (e.g. `"recommendation"`, `"prediction"`).
- `env`: Deployment environment (e.g. `"prod"`, `"staging"`, `"dev"`).
- `tags`: Optional dict of product-specific tags (merged with defaults: `product`, `env`).

**Returns:** Tuple of `(success: bool, reason: str | None)`

- If `success=True`: MLflow is initialized and a run is active. `reason=None`.
- If `success=False`: MLflow init failed. `reason` explains why (e.g. `MLFLOW_TRACKING_URI is not set`,
  or a connection error).

**Never raises.** Gracefully handles missing config, network errors, etc.

### `log_metric`, `log_metrics`, `log_param`, `log_params`, `set_tag`, `log_artifact`

Delegate to MLflow when a run is active; otherwise no-op. Never raise.

### `end_mlflow(exit_code: int | None = None, error_message: str | None = None) -> None`

End the current MLflow run and log exit status. Call this at the end of a job.

**Never raises.** Safe to call even if no run is active.

```python
try:
    ct_mlflow_lib.log_metric("auc", 0.87)
    ct_mlflow_lib.end_mlflow(exit_code=0)
except Exception as e:
    ct_mlflow_lib.end_mlflow(exit_code=1, error_message=str(e))
```

### `is_mlflow_active() -> bool`

Check if a run is currently active.

### `get_mlflow_run_id() -> str | None`

Get the current run ID, or `None` if no run is active.

### `MLflowKerasCallback(prefix: str = "")`

Keras callback for logging epoch-level metrics to MLflow. Use with `model.fit()`.

```python
import tensorflow as tf
from ct_mlflow_lib import MLflowKerasCallback

model = tf.keras.Sequential([...])
model.fit(
    x_train, y_train,
    epochs=10,
    callbacks=[MLflowKerasCallback(prefix="model")],
)
```

## Experiment naming convention

The experiment name is always `{product}.{env}.{task}`.

| product | env | task | Experiment name |
|---------|-----|------|-----------------|
| recommendation | prod | train | `recommendation.prod.train` |
| prediction | staging | train | `prediction.staging.train` |

## Required environment variables

Set these in ECS task definitions, Batch job definitions, EC2 launch templates, or local `.env`.

| Variable | Required for init | Description | Example |
|----------|-------------------|-------------|---------|
| `MLFLOW_TRACKING_URI` | Yes | MLflow server URL (no `#` fragment) | `https://mlflow.example.com` |

If `MLFLOW_TRACKING_URI` is missing, `try_init_product()` returns `(False, reason)` instead of raising.

## Cloudflare Access (production)

When the tracking server is behind Cloudflare Access, batch jobs must send a **service token** on
every HTTP request. This library registers an MLflow `RequestHeaderProvider` entry point that
adds the headers when credentials are present.

| Variable | When | Description |
|----------|------|-------------|
| `CF_ACCESS_CLIENT_ID` | Production behind Access | Service token client ID |
| `CF_ACCESS_CLIENT_SECRET` | Production behind Access | Service token client secret |

If either variable is unset, the provider does not inject headers (safe for local dev or servers not behind Access).

## Error handling examples

### Example 1: MLflow is optional (typical case)

```python
import ct_mlflow_lib

success, reason = ct_mlflow_lib.try_init_product(
    "train",
    product="recommendation",
    env="prod",
    tags={"account_id": account_id},
)

if not success:
    logger.warning(f"MLflow not available ({reason}), training without tracking")
else:
    logger.info(f"MLflow tracking enabled (run: {ct_mlflow_lib.get_mlflow_run_id()})")

# Safe without guards: helpers no-op when inactive
ct_mlflow_lib.log_metric("auc", 0.87)
ct_mlflow_lib.end_mlflow(exit_code=0)
```

### Example 2: Wrap in product-specific helper

```python
import ct_mlflow_lib

def init_mlflow_for_recommendation(account_id, catalog_id, recommendation_id, env):
    success, reason = ct_mlflow_lib.try_init_product(
        "train",
        product="recommendation",
        env=env,
        tags={
            "account_id": str(account_id),
            "catalog_id": str(catalog_id),
            "recommendation_id": str(recommendation_id),
        },
    )
    if not success:
        logger.warning(f"MLflow disabled: {reason}")
    return success
```

## Development

Run `pre-commit install` before coding.

Install dev dependencies, then run tests:

```bash
cd ct_mlflow_lib
uv sync --group dev
pytest tests/
```

With pip only (no uv), use the optional `dev` extra:

```bash
pip install -e ".[dev]"
pytest tests/
```
