Metadata-Version: 2.4
Name: p-moderation
Version: 0.0.3
Summary: Moderation and guardrails library for Pruna AI
License: Apache-2.0
License-File: LICENSE
Requires-Python: >=3.10
Requires-Dist: aiohttp>=3.9
Requires-Dist: httpx[http2]>=0.27
Requires-Dist: openai>=1.0
Requires-Dist: pillow>=10.0
Requires-Dist: pydantic>=2.0
Requires-Dist: replicate>=0.26
Provides-Extra: dev
Requires-Dist: pre-commit>=4.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.24; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: ruff>=0.4; extra == 'dev'
Provides-Extra: transformers
Requires-Dist: huggingface-hub>=0.25; extra == 'transformers'
Requires-Dist: torch>=2.4; extra == 'transformers'
Requires-Dist: transformers>=4.46; extra == 'transformers'
Description-Content-Type: text/markdown

# p-moderation

Async moderation library for Pruna AI with a unified, non-blocking provider-based architecture.

All providers support the same async interface:
- `submit()` returns immediately with a job ID (non-blocking)
- `retrieve()` fetches the result (when ready)
- `check_status()` polls for completion without blocking

**Providers:**
- `OpenAIProvider`: text + image moderation via OpenAI API
- `ReplicateProvider`: text + image moderation via Replicate deployments (async, background threading)
- `TransformersImageClassifierProvider`: local image-only classification (allowlisted models)
- `TransformersTextClassifierProvider`: local text-only classification (allowlisted models, default: `ezb/NSFW-Prompt-Detector`)

## Install

Base package:

```bash
uv add p-moderation
# or
pip install p-moderation
```

With local `transformers` support:

```bash
pip install 'p-moderation[transformers]'
```

## Quick Start

### OpenAI

Async moderation with the convenience constructor:

```python
import asyncio
from p_moderation import ModerationClient

async def main() -> None:
    client = ModerationClient.from_openai(api_key="sk-...", fail_open=True)
    result = await client.moderate(text="Hello, this is benign.")
    print(result.action)  # ModerationAction.PASS
    print(result.flagged)  # False
    await client.aclose()

asyncio.run(main())
```

Or explicit provider injection:

```python
from p_moderation import OpenAIProvider, ModerationClient

provider = OpenAIProvider(api_key="sk-...", model="omni-moderation-latest")
client = ModerationClient(provider=provider)
```

### Replicate

High-throughput moderation with background threading (best for sync callers or fire-and-forget scenarios):

```python
from p_moderation.providers.replicate import get_replicate_provider

provider = get_replicate_provider()

# Run moderation in background thread
q, event = provider.create_and_wait_deployment_thread(
    deployment_name="prunaai/p-moderation",
    input_data={"text": "Text to moderate", "timeout": 0.5},
)

# Do other work...
event.wait(timeout=30)
status, result = q.get_nowait()

if status == "success":
    print(f"Action: {result.get('action')}")
```

**Performance**: ~0.5s latency, 33+ req/sec sustained, early detection <0.35s with connection pooling.

## Provider Usage

### OpenAI

Text and/or image moderation (single request, all input types):

```python
client = ModerationClient.from_openai(api_key="sk-...")

# Text only
result = await client.moderate(text="This is safe text")
print(result.item_results[0].input_type)  # "text"

# Image only
result = await client.moderate(image="https://example.com/image.jpg")
print(result.item_results[0].input_type)  # "image"

# Text + Image combined
result = await client.moderate(text="User prompt", image=image_bytes)
print(result.item_results[0].input_type)  # "combined"
```

Categories: `violence`, `self-harm`, `sexual`, `harassment`, `hate`, and `illicit`. Response includes `category_applied_input_types` showing which inputs evaluated each category.

### Replicate

High-throughput moderation via Replicate deployments with optimized async polling and connection pooling:

```python
from p_moderation.providers.replicate import get_replicate_provider

provider = get_replicate_provider()

# Direct async usage (returns when complete)
result = await provider.create_and_wait(
    "prunaai/p-moderation",
    {"text": "Text to moderate", "timeout": 0.5},
    timeout=30.0,
)
print(result.get("action"))  # "pass" or "block"
```

**For sync callers (background threading):**

```python
q, event = provider.create_and_wait_deployment_thread(
    "prunaai/p-moderation",
    {"text": "Text to moderate", "timeout": 0.5},
)
event.wait(timeout=30)
status, result = q.get_nowait()
```

**Performance**: ~0.5s latency, 33+ req/sec, early detection <0.35s. Auto-tuned polling: interval = min(0.05, timeout/10), exponential backoff up to 0.5s.

### Local Image Classification

Fast local NSFW detection (no API calls):

```python
client = ModerationClient.from_image_classifier(
    model_id="Falconsai/nsfw_image_detection_26",
)

result = await client.moderate(image="/path/to/image.jpg")
print(result.action)  # ModerationAction.PASS or ModerationAction.MONITOR
```

Only allowlisted Hugging Face models accepted. Categories: `nsfw` / `normal`. Image-only (rejects text).

### Local Text Classification

Local NSFW detection for text (default: `ezb/NSFW-Prompt-Detector`):

```python
client = ModerationClient.from_text_classifier()

result = await client.moderate(text="Have a great day!")
print(result.action)  # ModerationAction.PASS
print(result.categories)  # {'sfw': ModerationAction.PASS, 'nsfw': ModerationAction.PASS}
```

Only allowlisted Hugging Face models accepted. Categories: `sfw` / `nsfw`. Text-only (rejects images).

## Response Format & Optimization

### Configurable Detail Levels

All providers return the same `ModerationResult` with policy already applied. Serialize to JSON with appropriate detail:

```python
result = await client.moderate(text="Hello, this is safe text")

# Minimal (~50B): action + flagged only
minimal = result.to_dict(detail="minimal")

# Balanced (~200B, recommended): action + categories
balanced = result.to_dict(detail="balanced")

# Full (~1KB): complete response with all metadata
full = result.to_dict(detail="full")
```

### Response Examples

**Minimal** (~50 bytes, for polling):
```json
{
  "success": true,
  "action": "pass",
  "flagged": false
}
```

**Balanced** (~200 bytes, **recommended** for production):
```json
{
  "success": true,
  "action": "pass",
  "flagged": false,
  "categories": {
    "harassment": "pass",
    "hate": "pass",
    "self-harm": "pass",
    "sexual": "pass",
    "violence": "pass"
  },
  "provider_response_id": "modr-8MZo6m9F5OMVdXcg3SBF6I0z"
}
```

**Full** (~1KB, for debugging):
```json
{
  "success": true,
  "provider_name": "openai",
  "model": "omni-moderation-latest",
  "action": "pass",
  "flagged": false,
  "categories": {
    "harassment": "pass",
    "hate": "pass",
    "self-harm": "pass",
    "sexual": "pass",
    "violence": "pass"
  },
  "category_scores": {
    "harassment": 0.000023,
    "hate": 0.000010,
    "self-harm": 0.000005,
    "sexual": 0.000048,
    "violence": 0.000487
  },
  "category_applied_input_types": {
    "harassment": ["text"],
    "sexual": ["text", "image"],
    "violence": ["text", "image"]
  },
  "item_results": [
    {
      "item_index": 0,
      "input_type": "text",
      "action": "pass",
      "flagged": false,
      "categories": {
        "harassment": "pass",
        "sexual": "pass",
        "violence": "pass"
      },
      "category_scores": {
        "harassment": 0.000023,
        "sexual": 0.000048,
        "violence": 0.000487
      },
      "category_applied_input_types": {
        "harassment": ["text"],
        "sexual": ["text", "image"],
        "violence": ["text", "image"]
      }
    }
  ],
  "provider_response_id": "modr-8MZo6m9F5OMVdXcg3SBF6I0z",
  "request_id": null,
  "client_id": null,
  "error": null
}
```

**Error Response** (fail-open, action=pass):
```json
{
  "success": false,
  "action": "pass",
  "flagged": false,
  "provider_response_id": null,
  "error": {
    "type": "TimeoutError",
    "message": "OpenAI API request timed out"
  }
}
```

**Balanced is recommended** for production (58% smaller than full, retains all decision transparency).

### Image Optimization: Pre-Encoded Bytes

If you have bounded JPEG/PNG bytes, use `EncodedImage` to skip normalization:

```python
from p_moderation import EncodedImage

encoded = EncodedImage(content=image_bytes, mime_type="image/jpeg")
result = await client.moderate(image=encoded)
```

For most cases, pass `bytes`, `Path`, or URL strings directly—the library handles encoding. Use `EncodedImage` only if profiling shows image preparation as a bottleneck.

## Async Jobs: Submit and Retrieve

All providers support the same unified, non-blocking API. Submit a request (returns immediately), do other work, and retrieve results when ready.

**How it works:**
- `submit()` extracts the first text/image from lists and launches a background task
- Returns immediately with a `job_id` and `status="pending"`
- Background task runs moderation and stores the result
- `retrieve()` returns the stored `ProviderModerationResponse` when ready
- Jobs are stored per-instance (memory-based for local providers, server-based for Replicate)

### Quick Example

Submit a request and retrieve later:

```python
provider = OpenAIProvider(api_key="sk-...")

# Submit request
prediction = await provider.submit(text="Some text to moderate")
job_id = prediction.id

# Do other work...
await run_expensive_computation()

# Retrieve result
result = await provider.retrieve(job_id)
print(result.item_results[0].flagged)  # False
```

### Polling Status

You can check the status of a job without blocking:

```python
prediction = await provider.submit(text="Text to moderate")

# Poll for completion
while True:
    status = await prediction.check_status()
    if status == "completed":
        break
    await asyncio.sleep(1)

result = prediction.output
print(result.item_results[0].input_type)  # "text"
```

### Wait for Completion

Or use the prediction's `.wait()` method to block until done:

```python
prediction = await provider.submit(text="Text to moderate")

# Wait for result (blocks until completion)
result = await prediction.wait(timeout=30.0)
print(result.provider_response_id)  # "modr-..."
```

### Important: Job Storage and Persistence

- **OpenAI/Transformers**: Results stored in memory (per provider instance)
  - Jobs are lost on restart or instance deletion
  - Best for: short-lived requests, single-session processing

- **Replicate**: Results persisted on Replicate's servers
  - Jobs survive client disconnect and process restart
  - Best for: long-running jobs, reliable persistence needed

All providers support the same `submit()` / `retrieve()` interface, but Replicate's server-side storage means jobs remain accessible indefinitely.

## Batch Moderation: `moderate_many`

Process multiple items efficiently using provider-specific optimizations.

### Input Format

Each item is a dictionary with optional `text` and/or `image`:

```python
items = [
    {"text": "Check this text"},
    {"image": image_bytes},
    {"text": "With image", "image": img_bytes},
]

results = await client.moderate_many(items)
```

### Provider-Specific Strategies

**OpenAI/Replicate**: Submit-wait parallelization

- Submits up to `max_concurrency` items in parallel
- Respects API rate limits via concurrency tuning
- Best for: API-based moderation with many items

```python
client = ModerationClient.from_openai(api_key=api_key)

items = [
    {"text": f"Item {i}"} for i in range(100)
]

results = await client.moderate_many(
    items,
    max_concurrency=5,  # Tune based on rate limits
)
```

**Transformers Classifiers**: Sequential + batched pipeline

- Items processed sequentially
- Internal pipeline handles GPU batching via `batch_size`
- Best for: Local inference with many texts/images

```python
client = ModerationClient.from_text_classifier(batch_size=64)

items = [
    {"text": f"Text {i}"} for i in range(1000)
]

results = await client.moderate_many(items)
```

### Response Structure

`moderate_many` returns a list of `ModerationResult` objects in input order:

```python
results = await client.moderate_many(items)

for i, result in enumerate(results):
    print(f"Item {i}:")
    print(f"  action: {result.action}")  # ModerationAction enum
    print(f"  flagged: {result.flagged}")  # Boolean
    print(f"  input_type: {result.item_results[0].input_type}")  # "text", "image", "combined"
    print(f"  categories: {result.categories}")  # {category: action}
    print(f"  category_scores: {result.category_scores}")  # {category: float}
    if result.error:
        print(f"  error: {result.error}")
```

### Response Examples

**Text-only moderation:**

```python
result = await client.moderate(text="Have a great day!")
print(result)
# ModerationResult(
#   provider_name='openai',
#   model='omni-moderation-latest',
#   action=ModerationAction.PASS,
#   flagged=False,
#   categories={
#     'harassment': ModerationAction.PASS,
#     'hate': ModerationAction.PASS,
#     'self-harm': ModerationAction.PASS,
#     'sexual': ModerationAction.PASS,
#     'violence': ModerationAction.PASS,
#     ...
#   },
#   category_scores={
#     'harassment': 1.2e-05,
#     'hate': 9.8e-06,
#     'self-harm': 1.5e-05,
#     'sexual': 4.8e-05,
#     'violence': 4.8e-04,
#     ...
#   },
#   category_applied_input_types={
#     'harassment': ['text'],
#     'sexual': ['text'],
#     'violence': ['text'],
#     ...
#   },
#   item_results=[
#     ProviderItemResult(
#       item_index=0,
#       input_type='text',
#       flagged=False,
#       categories={...}
#     )
#   ]
# )
```

**Combined text + image:**

```python
result = await client.moderate(text="User prompt", image=image_bytes)
print(result.item_results[0].input_type)  # "combined"
print(result.category_applied_input_types['violence'])  # ['text', 'image']
# Violence category was evaluated against both text and image
```

**Batch with mixed inputs:**

```python
items = [
    {"text": "Text 1"},
    {"image": img1},
    {"text": "Text 2", "image": img2},
]

results = await client.moderate_many(items, max_concurrency=3)

# results[0].item_results[0].input_type → "text"
# results[1].item_results[0].input_type → "image"
# results[2].item_results[0].input_type → "combined"
```

**Error handling:**

```python
results = await client.moderate_many(
    items,
    fail_open=True,  # Return PASS on errors
)

for i, result in enumerate(results):
    if result.error:
        print(f"Item {i} failed: {result.error.message}")
        print(f"Error type: {result.error.type}")
    else:
        print(f"Item {i}: {result.action}")
```

### Performance Tips

- **OpenAI**: Start with `max_concurrency=5`, increase gradually while monitoring for rate limits (429 errors)
- **Transformers**: Tune `batch_size` in provider initialization (larger = faster but more VRAM needed)
- **Mixed inputs**: Combine texts and images in the same batch safely (each gets appropriate evaluation)

## Policies

Policies are provider-specific. The policy's `provider_name` must match the active provider.

```python
from p_moderation import ModerationAction, ProviderPolicy

policy = ProviderPolicy(
    provider_name="openai",
    category_actions={
        "violence": ModerationAction.BLOCK,
        "violence/graphic": ModerationAction.BLOCK,
    },
    input_type_overrides={
        "image": {"sexual": ModerationAction.BLOCK},
    },
)
```

## Failure Behavior

- `fail_open=True`: provider/runtime errors return `action=pass` with `result.error`
- `fail_open=False`: provider/runtime errors raise
- provider configuration or capability errors raise regardless of `fail_open`

## Benchmarks

General moderation latency:

```bash
uv run python benchmarks/bench_moderation.py --provider openai --iterations 10
uv run python benchmarks/bench_moderation.py --provider image-classifier --iterations 10
uv run --extra transformers python benchmarks/bench_moderation.py --provider text-classifier --iterations 10
uv run python benchmarks/bench_moderation.py --provider openai --iterations 10 --concurrency 5
uv run --extra aiohttp python benchmarks/bench_moderation.py --provider openai --iterations 10 --concurrency 5 --http-backend aiohttp
```

The moderation benchmark reports per-request latency plus aggregate throughput for the selected concurrency level.
For OpenAI, you can also compare the default SDK transport versus `aiohttp`.
See [benchmarks/README.md](benchmarks/README.md) for recorded commands and the latest captured numbers.

## Development

Install the git hooks once:

```bash
uv run --extra dev pre-commit install
```

Useful local commands:

```bash
make check
make bench-openai BENCH_ARGS='--concurrency 5'
make bench-image-classifier BENCH_ARGS='--concurrency 5'
make bench-text-classifier BENCH_ARGS='--concurrency 5'
```

## Notes

- Categories are provider-specific by design. OpenAI, image-classifier, and text-classifier labels are not normalized into one shared taxonomy.
- Migration: old `FalconsAITransformersProvider` naming was removed; use `TransformersImageClassifierProvider` or `ModerationClient.from_image_classifier(...)`.
