Metadata-Version: 2.4
Name: bigdata-smart-batching
Version: 1.4.0
Summary: High-performance semantic search with intelligent company grouping and parallel execution
Project-URL: homepage, https://bigdata.com/api
Author-email: "Bigdata.com" <support@ravenpack.com>
License: MIT
License-File: LICENSE
Requires-Python: <4.0,>=3.11
Requires-Dist: matplotlib>=3.7
Requires-Dist: numpy>=1.24
Requires-Dist: pandas>=2.0
Requires-Dist: python-dotenv>=1.2.2
Requires-Dist: requests>=2.31.0
Description-Content-Type: text/markdown

# Smart Batching Search

A high-performance semantic search system that reduces API queries by **67-99%** (varies by topic specificity) through intelligent company grouping and parallel execution.

This module provides a two-step pipeline for efficient semantic search:

1. **Planning** -- organize the search via smart batching and return a chunk upper bound estimate.
2. **Execution** -- run the plan with proportional sampling to preserve the result distribution.

## Key Benefits

- **67-99% query reduction** -- search 4,732 companies with only 17-3,699 queries (varies by topic).
- **Parallel execution** -- rate-limited concurrent requests with semaphore-controlled concurrency.
- **Proportional sampling** -- retrieve a percentage of results while preserving the distribution across baskets.
- **Category filtering** -- restrict searches to specific document categories (news, transcripts, filings, ...).
- **Source filtering** -- include or exclude documents by source identifiers (`INCLUDE` / `EXCLUDE` plus a list of source IDs).
- **Sentiment filtering** -- restrict searches to one or more inclusive document-level sentiment ranges.
- **Throughput controls** -- shared sliding-window rate limiter and concurrency semaphore across all planning HTTP work, with shared exponential backoff on `429` / `403` / timeout retries.
- **Deterministic plans** -- every request stays within service limits; plans can be saved and replayed.
- **Scalable** -- handles universes with 10,000+ companies efficiently.

## Installation

Install the package from PyPI (Python 3.11+):

```bash
pip install bigdata-smart-batching
```

With [uv](https://docs.astral.sh/uv/):

```bash
uv add bigdata-smart-batching
```

### Development

To work on this repository locally, from the project root:

```bash
uv sync
```

### Environment Setup

Set up environment variables:

```bash
export BIGDATA_API_KEY="your_api_key_here"
export BIGDATA_API_BASE_URL="https://api.bigdata.com"  # Optional, defaults to this
```

Or create a `.env` file:

```
BIGDATA_API_KEY=your_api_key_here
BIGDATA_API_BASE_URL=https://api.bigdata.com
```

## Universe (CSV path or entity list)

`plan_search()` takes a **`universe`** argument: either a path to a UTF-8 CSV file, or a `list[str]` of entity IDs. IDs must match the identifiers used by the Bigdata API for your dataset.

When using a CSV, two layouts are supported:

**1. Header row with an `id` column** (optional extra columns such as `name` are ignored):

```csv
id,name
B8EF97,Example Corp A
BB07E4,Example Corp B
3461CF,Example Corp C
```

You can also pass a plain list instead of a CSV path:

```python
plan = plan_search(
    universe=["B8EF97", "BB07E4", "3461CF"],
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="earnings revenue profit",
)
```

## Quick Start

```python
from bigdata_smart_batching import (
    plan_search,
    execute_search,
    deduplicate_documents,
    convert_to_dataframe,
)

plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="earnings revenue profit",
    api_key="your_api_key",  # or set BIGDATA_API_KEY env var
)

print(f"Chunk upper bound estimate: {plan['chunk_upper_bound_estimate']:,}")

results_raw = execute_search(
    search_plan=plan,
    chunk_percentage=0.1,
    requests_per_minute=100,
)

results = deduplicate_documents(results_raw)
print(f"Retrieved {len(results)} documents (deduplicated)")

df = convert_to_dataframe(results)  # one row per chunk
```

### Category Filtering

Restrict searches to a specific set of document categories using the `category`
argument. It accepts either a `CategoryFilter` dataclass or a plain dict.

```python
from bigdata_smart_batching import CategoryFilter, CategoryMode, plan_search

plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="merger acquisition",
    category=CategoryFilter(
        mode=CategoryMode.INCLUDE,
        values=("news_premium", "transcripts"),
    ),
)

# Equivalent dict form:
plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="merger acquisition",
    category={"mode": "INCLUDE", "values": ["news_premium", "transcripts"]},
)
```

Valid category values are exposed via `VALID_CATEGORY_VALUES` and include:
`expert_interviews`, `filings`, `my_files`, `news`, `news_premium`,
`news_public`, `podcasts`, `research`, `research_academic_journals`,
`research_investment_research`, `transcripts`.

### Source Filtering

Restrict searches by **source** (document source IDs) using the `source` argument.
It accepts either a `SourceFilter` dataclass or a plain dict with `mode`
(`INCLUDE` or `EXCLUDE`) and `values` (a non-empty list of source ID strings).
The filter is serialized to `query.filters.source` on co-mention, volume, and
search payloads during planning.

```python
from bigdata_smart_batching import SourceFilter, SourceMode, plan_search

plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="merger acquisition",
    source=SourceFilter(
        mode=SourceMode.INCLUDE,
        values=("source-id-1", "source-id-2"),
    ),
)

# Equivalent dict form:
plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="merger acquisition",
    source={"mode": "EXCLUDE", "values": ["source-id-to-drop"]},
)
```

`SourceMode` is the same `INCLUDE` / `EXCLUDE` enum as `CategoryMode`. String
values are trimmed; empty strings are rejected. Use
`validate_source_filter(source)` to validate and normalize input into the API
dict shape.

### Sentiment Filtering

Restrict searches to documents whose sentiment falls inside one or more
inclusive `min` / `max` ranges using the `sentiment` argument. It accepts a
single `SentimentRange` / dict, or a list of them (multiple disjoint ranges).
The filter is emitted as `query.filters.sentiment.ranges` on co-mention,
volume, and search payloads.

```python
from bigdata_smart_batching import SentimentRange, plan_search

plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="earnings revenue profit",
    sentiment=SentimentRange(min=0.3, max=1.0),
)

plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="earnings revenue profit",
    sentiment=[
        {"min": -1.0, "max": -0.5},
        {"min": 0.5, "max": 1.0},
    ],
)
```

Each range must satisfy `min <= max`. Use
`validate_sentiment_range(sentiment)` to validate and normalize input into the
list-of-dicts shape the API expects.

### Save and Load Plans

```python
from bigdata_smart_batching import plan_search, execute_search, save_plan, load_plan

plan = plan_search(
    universe="id_name_mapping_us_top_3000.csv",
    start_date="2023-01-01",
    end_date="2023-12-31",
    text="merger acquisition",
)
save_plan(plan, "my_search_plan.json")

plan = load_plan("my_search_plan.json")
raw_10 = execute_search(plan, chunk_percentage=0.1)
raw_50 = execute_search(plan, chunk_percentage=0.5)
```

## How It Works

### Architecture Overview

```
Step 1: PLANNING
  Universe  -->  Co-mention volumes  -->  Entity groups (by volume)  -->  Sub-period split  -->  Search plan

Step 2: EXECUTION
  Proportional sampling  -->  Parallel search (rate limited)  -->  Collect & aggregate
```

### Planning (`plan_search()`)

1. Loads the universe of companies from a CSV path or inline list.
2. Queries the co-mention endpoint to get chunk volumes per company (split into sub-ranges of up to 365 days, summed per company).
3. Splits date ranges by volume when a company exceeds the chunk limit, honoring `min_period_days`.
4. Sorts companies by chunk volume and greedy-packs them into entity groups under `MAX_CHUNKS_PER_BASKET` (1000) and `MAX_ENTITIES_IN_ANY_OF` (500).
5. Classifies each group by `volume_bucket` (`high`, `medium`, `low`, `very_low`).
6. Returns a plan with the chunk upper bound estimate and per-basket query configurations.

### Execution (`execute_search()`)

1. Calculates proportional chunks per basket, with a minimum of 1 chunk per basket when `expected_chunks > 0`.
2. Executes searches in parallel with sliding-window rate limiting and a concurrency semaphore.
3. Optionally performs a **second pass**, removing already-retrieved entities from each basket and re-querying for the remainder.
4. Enriches each chunk with `entity_ids` and `primary_entity_id`, and returns the raw documents.

## API Reference

### `plan_search()`

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `universe` | `str \| list[str]` | required | CSV path or list of entity IDs |
| `start_date` | `str` | required | Start date (YYYY-MM-DD) |
| `end_date` | `str` | required | End date (YYYY-MM-DD) |
| `text` | `str \| None` | `None` | Search query text; omitted from all payloads when `None` |
| `api_key` | `str \| None` | env var | API key |
| `api_base_url` | `str \| None` | env var | API base URL |
| `volume_query_mode` | `str` | `"three_pass"` | `"three_pass"` or `"iterative"` |
| `max_iterations_per_batch` | `int` | `10` | Max iterations per batch in `"iterative"` mode |
| `apply_volume_splits` | `bool` | `True` | Use volume time series for period splitting |
| `min_period_days` | `int` | `30` | Minimum days per sub-period |
| `min_entities_per_basket` | `int` | `1` | Lower bound on entities per basket |
| `category` | `CategoryFilter \| dict \| None` | `None` | Optional category filter (`mode` + `values`) |
| `source` | `SourceFilter \| dict \| None` | `None` | Optional source filter by source IDs (`mode` + `values`) |
| `sentiment` | `SentimentRange \| dict \| list \| None` | `None` | Optional sentiment filter: one inclusive `{min, max}` range or a list of disjoint ranges |
| `content_diversification` | `bool` | `True` | Enable/disable content diversification in ranking |
| `requests_per_minute` | `int` | `100` | Global rate limit shared across all planning HTTP work (co-mention + volume) |
| `max_workers` | `int` | `40` | Maximum concurrent in-flight planning requests (shared semaphore) |

### `execute_search()`

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `search_plan` | `dict` | required | Plan from `plan_search()` |
| `chunk_percentage` | `float` | required | 0.0 to 1.0 sampling ratio |
| `requests_per_minute` | `int` | `100` | Rate limit |
| `api_key` | `str \| None` | env var | API key |
| `api_base_url` | `str \| None` | env var | API base URL |
| `max_workers` | `int` | `40` | Parallel workers |
| `overwrite_chunks_per_basket` | `int \| None` | `None` | Override the proportional chunk allocation |
| `second_pass` | `bool` | `False` | Re-query each basket for entities not yet retrieved |
| `basket_filtered_entities` | `bool` | `False` | When `True`, each chunk's `entity_ids` is filtered to only include IDs present in the basket's `query.filters.entity.any_of` list; entity IDs are always deduplicated |

### Helper Functions

- `deduplicate_documents(documents)` -- merge duplicate documents by `id`, combining their chunks.
- `convert_to_dataframe(raw_results)` -- convert documents to a DataFrame with one row per chunk.
- `resolve_universe(universe)` -- load entity IDs from a CSV path or validate a `list[str]`.
- `load_universe_from_csv(csv_path)` -- load entity IDs from a CSV only.
- `save_plan(plan, path)` / `load_plan(path)` -- persist plans as JSON.
- `validate_category_filter(category)` -- validate and normalize a category filter to the API dict shape.
- `validate_source_filter(source)` -- validate and normalize a source filter to the API dict shape (`filters.source`).
- `validate_sentiment_range(sentiment)` -- validate and normalize sentiment range(s) into the list-of-`{min, max}` dicts emitted as `filters.sentiment.ranges`.
- `build_comention_payload(...)`, `build_volume_payload(...)`, `build_search_query(...)` -- low-level payload builders used by the planner; each accepts optional `category_filter`, `source_filter`, and `sentiment_filter` alongside entities and dates. Useful if you need to construct custom requests.

### Public Types

- `CategoryFilter` -- frozen dataclass with `mode: CategoryMode` and `values: tuple[str, ...]`.
- `CategoryMode` -- `StrEnum` with `INCLUDE` and `EXCLUDE` values.
- `CategoryFilterInput` -- type alias: `CategoryFilter | dict[str, str | list[str]]`.
- `SourceFilter` -- frozen dataclass with `mode: SourceMode` and `values: tuple[str, ...]` (source IDs).
- `SourceMode` -- type alias of `CategoryMode` (`INCLUDE`, `EXCLUDE`).
- `SourceFilterInput` -- type alias: `SourceFilter | dict[str, str | list[str]]`.
- `SentimentRange` -- frozen dataclass with inclusive `min: float` and `max: float`.
- `SentimentRangeInput` -- type alias for one `SentimentRange` / `{min, max}` dict, or a non-empty list of either.
- `UniverseInput` -- type alias: `str | list[str]`.
- `VALID_CATEGORY_VALUES` -- `frozenset[str]` of allowed category strings.

## Testing

```bash
# Run all tests
uv run pytest

# With coverage
uv run pytest --cov=bigdata_smart_batching --cov-report=term-missing

# Specific test file
uv run pytest tests/test_validation.py -v
```

## Project Structure

```
bigdata-smart-batching/
├── pyproject.toml
├── README.md
├── CHANGELOG.md
├── LICENSE
├── Makefile
├── .python-version
├── src/
│   └── bigdata_smart_batching/
│       ├── __init__.py
│       ├── smart_batching.py
│       ├── smart_batching_config.py
│       ├── search_function.py
│       └── output_converter.py
├── examples/
│   └── example.py
└── tests/
    ├── __init__.py
    ├── test_config.py
    ├── test_output_converter.py
    ├── test_rate_limiter.py
    └── test_validation.py
```

## Configuration

### Environment Variables

- `BIGDATA_API_KEY` -- required; your Bigdata API key.
- `BIGDATA_API_BASE_URL` -- optional; API base URL (default: `https://api.bigdata.com`).

### Default Settings

- `requests_per_minute`: 100
- `max_workers`: 40
- `MAX_CHUNKS_PER_BASKET`: 1000
- `MAX_ENTITIES_IN_ANY_OF`: 500
- `volume_query_mode`: `"three_pass"`

### Volume Buckets

Companies (and basket groups) are classified by total chunk volume. Ranges use
`[lower, upper)` semantics:

| Bucket | Range |
|--------|-------|
| `high` | 500+ chunks |
| `medium` | 100-499 chunks |
| `low` | 1-99 chunks |
| `very_low` | 0 chunks (headlines only) |

Ties are resolved using `VOLUME_BUCKET_PRIORITY`: `high > medium > low > very_low`.

## Changelog

See [CHANGELOG.md](CHANGELOG.md) for release notes. The package is currently at
version **1.4.0**.

## License

This project is part of Bigdata.com.

**Disclaimer**: This software is provided "as is" without warranty of any kind, express or implied. The authors and contributors assume no responsibility for the accuracy, completeness, or usefulness of any information, results, or processes provided. This software is for educational and research purposes only and is not intended to be used as financial advice.
