Metadata-Version: 2.4
Name: ct-mlflow-lib
Version: 0.4.0
Summary: Small MLflow client helpers with graceful no-op logging
Requires-Python: >=3.11
Requires-Dist: databricks-sdk<1,>=0.38.0
Requires-Dist: gitpython<4,>=3.1.40
Requires-Dist: graphene<4,>=3.2.0
Requires-Dist: mlflow==3.11.1
Requires-Dist: opentelemetry-api<3,>=1.25.0
Requires-Dist: opentelemetry-sdk<3,>=1.25.0
Provides-Extra: dev
Requires-Dist: pytest-cov>=4.0; extra == 'dev'
Requires-Dist: pytest-xdist>=3.0; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: tensorflow>=2.10; extra == 'dev'
Provides-Extra: keras
Requires-Dist: tensorflow>=2.10; extra == 'keras'
Description-Content-Type: text/markdown

# ct-mlflow-lib

Small MLflow client helper library for applications that treat experiment tracking as optional.

Application code can use this package for tracking metrics, params, tags, and artifacts. 
The logging helpers are safe to call even when MLflow is not configured or no run is active.

## Install

Install from PyPI:

```bash
uv add ct-mlflow-lib
```

Or with the `keras` extra:

```bash
uv add "ct-mlflow-lib[keras]"
```

### Optional: Keras support

The core library works without TensorFlow. `MLflowKerasCallback` will raise an `ImportError` if TensorFlow is not installed.

## Quick Start

The primary API is `try_init_product()`. Pass the **task**, **product**, and **env** names;
the experiment name is built as `{product}.{env}.{task}`.

```python
import ct_mlflow_lib

success, reason = ct_mlflow_lib.try_init_product(
    "train",
    product="example_product",
    env="prod",
    tags={
        "job_id": "123",
        "model_type": "classifier",
    },
)

if not success:
    logger.warning(f"MLflow disabled: {reason}")
else:
    ct_mlflow_lib.log_metric("auc", 0.87)
    ct_mlflow_lib.log_param("lr", 0.01)
    ct_mlflow_lib.end_mlflow(exit_code=0)
```

Logging helpers (`log_metric`, `log_metrics`, `log_param`, `log_params`, `set_tag`, `log_artifact`)
are safe to call even when init failed or no run is active: they no-op.

## API Reference

### `try_init_product(task: str, product: str, env: str, tags: dict | None = None) -> tuple[bool, str | None]`

Initialize MLflow for a product run. Returns immediately with status instead of raising exceptions.

**Args:**

- `task`: Task segment only (e.g. `"train"`, `"eval"`). Full experiment name is
  `{product}.{env}.{task}` (e.g. `example_product.prod.train`).
- `product`: Product or application name (e.g. `"example_product"`, `"batch_model"`).
- `env`: Deployment environment (e.g. `"prod"`, `"staging"`, `"dev"`).
- `tags`: Optional dict of run-specific tags (merged with defaults: `product`, `env`).

**Returns:** Tuple of `(success: bool, reason: str | None)`

- If `success=True`: MLflow is initialized and a run is active. `reason=None`.
- If `success=False`: MLflow init failed. `reason` explains why (e.g. `MLFLOW_TRACKING_URI is not set`,
  or a connection error).

**Never raises.** Gracefully handles missing config, network errors, etc.

### `log_metric`, `log_metrics`, `log_param`, `log_params`, `set_tag`, `log_artifact`

Delegate to MLflow when a run is active; otherwise no-op. Never raise.

### `end_mlflow(exit_code: int | None = None, error_message: str | None = None) -> None`

End the current MLflow run and log exit status. Call this at the end of a job.

**Never raises.** Safe to call even if no run is active.

```python
try:
    ct_mlflow_lib.log_metric("auc", 0.87)
    ct_mlflow_lib.end_mlflow(exit_code=0)
except Exception as e:
    ct_mlflow_lib.end_mlflow(exit_code=1, error_message=str(e))
```

### `is_mlflow_active() -> bool`

Check if a run is currently active.

### `get_mlflow_run_id() -> str | None`

Get the current run ID, or `None` if no run is active.

### `MLflowKerasCallback(prefix: str = "")`

Keras callback for logging epoch-level metrics to MLflow. Use with `model.fit()`.

```python
import tensorflow as tf
from ct_mlflow_lib import MLflowKerasCallback

model = tf.keras.Sequential([...])
model.fit(
    x_train, y_train,
    epochs=10,
    callbacks=[MLflowKerasCallback(prefix="model")],
)
```

## Experiment naming convention

The experiment name is always `{product}.{env}.{task}`.

| product | env | task | Experiment name |
|---------|-----|------|-----------------|
| example_product | prod | train | `example_product.prod.train` |
| batch_model | staging | train | `batch_model.staging.train` |

## Required environment variables

Set these in your runtime environment or local `.env`.

| Variable | Required for init | Description | Example |
|----------|-------------------|-------------|---------|
| `MLFLOW_TRACKING_URI` | Yes | MLflow server URL (no `#` fragment) | `https://mlflow.example.com` |

If `MLFLOW_TRACKING_URI` is missing, `try_init_product()` returns `(False, reason)` instead of raising.

## Cloudflare Access (production)

When the tracking server is behind Cloudflare Access, jobs must send a **service token** on
every HTTP request. This library registers an MLflow `RequestHeaderProvider` entry point that
adds the headers when credentials are present.

| Variable | When | Description |
|----------|------|-------------|
| `CF_ACCESS_CLIENT_ID` | Production behind Access | Service token client ID |
| `CF_ACCESS_CLIENT_SECRET` | Production behind Access | Service token client secret |

If either variable is unset, the provider does not inject headers (safe for local dev or servers not behind Access).

## Error handling examples

### Example 1: MLflow is optional (typical case)

```python
import ct_mlflow_lib

success, reason = ct_mlflow_lib.try_init_product(
    "train",
    product="example_product",
    env="prod",
    tags={"job_id": job_id},
)

if not success:
    logger.warning(f"MLflow not available ({reason}), training without tracking")
else:
    logger.info(f"MLflow tracking enabled (run: {ct_mlflow_lib.get_mlflow_run_id()})")

# Safe without guards: helpers no-op when inactive
ct_mlflow_lib.log_metric("auc", 0.87)
ct_mlflow_lib.end_mlflow(exit_code=0)
```

### Example 2: Wrap in application-specific helper

```python
import ct_mlflow_lib

def init_mlflow_for_job(job_id, model_name, env):
    success, reason = ct_mlflow_lib.try_init_product(
        "train",
        product="example_product",
        env=env,
        tags={
            "job_id": str(job_id),
            "model_name": str(model_name),
        },
    )
    if not success:
        logger.warning(f"MLflow disabled: {reason}")
    return success
```

## Development

Run `pre-commit install` before coding.

Install dev dependencies, then run tests:

```bash
cd ct_mlflow_lib
uv sync --group dev
uv run pytest tests/
```
