Metadata-Version: 2.3
Name: defistream
Version: 1.7.1
Summary: Python client for the DeFiStream API
Project-URL: Homepage, https://defistream.dev
Project-URL: Documentation, https://docs.defistream.dev
Project-URL: Repository, https://github.com/Eren-Nevin/DeFiStream_PythonClient
Author-email: DeFiStream <support@defistream.dev>
License: MIT
Keywords: api,blockchain,crypto,defi,ethereum,web3
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.9
Requires-Dist: httpx>=0.25.0
Requires-Dist: pandas>=2.0.0
Requires-Dist: pyarrow>=14.0.0
Requires-Dist: pydantic>=2.0.0
Provides-Extra: dev
Requires-Dist: pytest-asyncio>=0.21.0; extra == 'dev'
Requires-Dist: pytest>=7.0.0; extra == 'dev'
Requires-Dist: python-dotenv>=1.0.0; extra == 'dev'
Provides-Extra: polars
Requires-Dist: polars>=0.20.0; extra == 'polars'
Description-Content-Type: text/markdown

# DeFiStream Python Client

Official Python client for the [DeFiStream API](https://defistream.dev).

## Getting an API Key

To use the DeFiStream API, you need to sign up for an account at [defistream.dev](https://defistream.dev) to obtain your API key.

## Installation

```bash
pip install defistream
```

This includes pandas and pyarrow by default for DataFrame support.

With polars support (in addition to pandas):
```bash
pip install defistream[polars]
```

## Quick Start

```python
from defistream import DeFiStream

# Initialize client (reads DEFISTREAM_API_KEY from environment if not provided)
client = DeFiStream()

# Or with explicit API key
client = DeFiStream(api_key="dsk_your_api_key")

# Query ERC20 transfers using builder pattern
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

print(df.head())
```

## Features

- **Builder pattern**: Fluent query API with chainable methods
- **Aggregate queries**: Bucket events into time or block intervals with summary statistics
- **Type-safe**: Full type hints and Pydantic models
- **Multiple formats**: DataFrame (pandas/polars), CSV, Parquet, JSON
- **Async support**: Native async/await with `AsyncDeFiStream`
- **All protocols**: ERC20, AAVE, Uniswap, Lido, Native tokens

## Supported Protocols

| Protocol | Events |
|----------|--------|
| ERC20 | `transfers` |
| Native Token | `transfers` |
| AAVE V3 | `deposits`, `withdrawals`, `borrows`, `repays`, `flashloans`, `liquidations` |
| Uniswap V3 | `swaps`, `deposits`, `withdrawals`, `collects` |
| Lido | `deposits`, `withdrawal_requests`, `withdrawals_claimed`, `l2_deposits`, `l2_withdrawal_requests` |
| Binance | `raw_trades` (tick data), `ohlcv` (candle data) |

## Usage Examples

### Builder Pattern

The client uses a fluent builder pattern. The query is only executed when you call a terminal method like `as_df()`, `as_file()`, or `as_dict()`.

```python
from defistream import DeFiStream

client = DeFiStream()

# Build query step by step
query = client.erc20.transfers("USDT")
query = query.network("ETH")
query = query.block_range(21000000, 21010000)
query = query.min_amount(1000)

# Execute and get DataFrame
df = query.as_df()

# Or chain everything
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .min_amount(1000)
    .as_df()
)
```

### ERC20 Transfers

```python
# Get USDT transfers over 10,000 USDT
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .min_amount(10000)
    .as_df()
)

# Query multiple tokens at once (known symbols only, not contract addresses)
df = (
    client.erc20.transfers("USDT", "USDC", "DAI")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# Or set multiple tokens via chain method
df = (
    client.erc20.transfers()
    .token("USDT", "USDC")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# Filter by sender
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .sender("0x28c6c06298d514db089934071355e5743bf21d60")
    .as_df()
)
```

### AAVE Events

```python
# Get deposits
df = (
    client.aave_v3.deposits()
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# Use a specific market type on ETH (Core, Prime, or EtherFi)
df = (
    client.aave_v3.deposits()
    .network("ETH")
    .block_range(21000000, 21010000)
    .eth_market_type("Prime")
    .as_df()
)
```

### Uniswap Swaps

```python
# Get swaps for WETH/USDC pool with 0.05% fee tier
df = (
    client.uniswap_v3.swaps("WETH", "USDC", 500)
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# Or build with chain methods
df = (
    client.uniswap_v3.swaps()
    .symbol0("WETH")
    .symbol1("USDC")
    .fee(500)
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)
```

### Native Token Transfers

```python
# Get ETH transfers >= 1 ETH
df = (
    client.native_token.transfers()
    .network("ETH")
    .block_range(21000000, 21010000)
    .min_amount(1.0)
    .as_df()
)
```

### Label & Category Filters

```python
# Get USDT transfers involving Binance wallets
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .involving_label("Binance")
    .as_df()
)

# Get USDT transfers FROM exchanges TO DeFi protocols
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .sender_category("exchange")
    .receiver_category("defi")
    .as_df()
)

# Get AAVE deposits involving exchange addresses
df = (
    client.aave_v3.deposits()
    .network("ETH")
    .block_range(21000000, 21010000)
    .involving_category("exchange")
    .as_df()
)

# Get native ETH transfers FROM Binance or Coinbase (multi-value)
df = (
    client.native_token.transfers()
    .network("ETH")
    .block_range(21000000, 21010000)
    .sender_label("Binance,Coinbase")
    .as_df()
)
```

### Trade Data (Binance)

Query exchange-sourced tick and OHLCV data from Binance. Unlike on-chain events, trade data uses time ranges rather than block ranges.

```python
# Raw tick trades — CSV and Parquet only (no JSON/as_dict)
df = (
    client.binance.raw_trades()
    .token("BTC")
    .start_time("2024-01-01")
    .end_time("2024-02-01")
    .as_df()
)

# OHLCV candles — all formats supported
df = (
    client.binance.ohlcv()
    .token("BTC")
    .window("4h")
    .start_time("2024-01-01")
    .end_time("2024-02-01")
    .as_df()
)

# as_dict() also supported for OHLCV
candles = (
    client.binance.ohlcv()
    .token("ETH")
    .window("1h")
    .start_time("2024-01-01")
    .end_time("2024-01-02")
    .as_dict()
)

# Get a download link
link_info = (
    client.binance.raw_trades()
    .token("BTC")
    .start_time("2024-01-01")
    .end_time("2024-02-01")
    .as_link("parquet")
)
import polars as pl
df = pl.read_parquet(link_info.link)
```

> **Note:** `raw_trades()` only supports `as_df()`, `as_file()` (CSV/Parquet), and `as_link()`. Calling `as_dict()` or using a `.json` file extension raises a `ValidationError`.
>
> **Time range limit:** Raw trades queries are limited to a maximum range of **31 days**. Exceeding this raises a `ValidationError`. OHLCV queries have no such limit.

Valid OHLCV window sizes: `1m`, `5m`, `15m`, `30m`, `1h`, `4h`, `1d`.

### Aggregate Queries

Use `.aggregate()` to bucket raw events into time or block intervals with summary statistics. All existing filters work before `.aggregate()` is called.

```python
# Aggregate USDT transfers into 2-hour buckets
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .aggregate(group_by="time", period="2h")
    .as_df()
)

# Aggregate by block intervals
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .aggregate(group_by="block", period="100b")
    .as_df()
)

# Combine with filters — large transfers from exchanges, bucketed hourly
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .sender_category("exchange")
    .min_amount(10000)
    .aggregate(group_by="time", period="1h")
    .as_df()
)

# Aggregate Uniswap swaps
df = (
    client.uniswap_v3.swaps("WETH", "USDC", 500)
    .network("ETH")
    .block_range(21000000, 21100000)
    .aggregate(group_by="time", period="1h")
    .as_df()
)
```

You can also discover what aggregate fields are available for a protocol:

```python
schema = client.aggregate_schema("erc20")
print(schema)
```

### Verbose Mode

By default, responses omit metadata fields to reduce payload size. Use `.verbose()` to include all fields:

```python
# Default: compact response (no tx_hash, tx_id, log_index, network, name)
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# Verbose: includes all metadata fields
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .verbose()
    .as_df()
)
```

### Value Enrichment

Use `.with_value()` to enrich events with USD value data. This adds a `value_usd` (amount × price) column to individual events. On aggregate endpoints, it produces an `agg_value_usd` (sum) column.

Supported protocols: AAVE, Uniswap, Lido, ERC20, Native Token.

```python
# Individual events with value data
df = (
    client.aave_v3.deposits()
    .network("ETH")
    .block_range(21000000, 21010000)
    .with_value()
    .as_df()
)
# df now includes 'value_usd' column

# Aggregate with value — adds agg_value_usd column
df = (
    client.aave_v3.deposits()
    .network("ETH")
    .block_range(21000000, 21100000)
    .with_value()
    .aggregate(group_by="time", period="2h")
    .as_df()
)
# df now includes 'agg_value_usd' column
```

### Return as DataFrame

```python
# As pandas DataFrame (default)
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# As polars DataFrame
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df("polars")
)
```

### Save to File

Format is automatically determined by file extension:

```python
# Save as Parquet (recommended for large datasets)
(
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .as_file("transfers.parquet")
)

# Save as CSV
(
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .as_file("transfers.csv")
)

# Save as JSON
(
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_file("transfers.json")
)
```

### Get Download Link

Get a shareable download link instead of the data directly. Useful for passing to other tools or libraries:

```python
from defistream import DeFiStream

client = DeFiStream()

# Get a download link (CSV format by default)
link_info = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .as_link()
)

print(link_info.filename)  # erc20_transfer_ETH_21000000_21100000.csv
print(link_info.link)      # https://dl.defistream.dev/dh/abc123/...
print(link_info.expiry)    # 2026-02-03 15:30:00
print(link_info.size)      # 1.29 MB

# Get as Parquet link
link_info = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .as_link(format="parquet")
)

# Use with polars (reads directly from URL)
import polars as pl
df = pl.read_parquet(link_info.link)

# Use with pandas
import pandas as pd
df = pd.read_parquet(link_info.link)
```

> **Note:** Links expire after 1 hour. The `as_link()` method only supports `csv` and `parquet` formats.

### Calculate Query Cost

Preview how many blocks a query will cost **before** executing it. No quota is deducted.

```python
# Build a query as usual, then call calculate_cost() instead of as_df()
cost = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .calculate_cost()
)

print(cost.cost)                  # 10000
print(cost.quota_remaining)       # 500000
print(cost.quota_remaining_after) # 490000

# Also works on aggregate queries
cost = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .aggregate(group_by="time", period="1h")
    .calculate_cost()
)
```

### Return as Dictionary (JSON)

For small queries, you can get results as a list of dictionaries:

```python
transfers = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_dict()
)

for transfer in transfers:
    print(f"{transfer['sender']} -> {transfer['receiver']}: {transfer['amount']}")
```

> **Note:** `as_dict()` and `as_file("*.json")` use JSON format which has a **10,000 block limit**. For larger block ranges, use `as_df()` or `as_file()` with `.parquet` or `.csv` extensions, which have no block limit.

### Context Manager

Both sync and async clients support context managers to automatically close connections:

```python
# Sync
with DeFiStream() as client:
    df = (
        client.erc20.transfers("USDT")
        .network("ETH")
        .block_range(21000000, 21010000)
        .as_df()
    )
```

### Async Usage

```python
import asyncio
from defistream import AsyncDeFiStream

async def main():
    async with AsyncDeFiStream() as client:
        df = await (
            client.erc20.transfers("USDT")
            .network("ETH")
            .block_range(21000000, 21010000)
            .as_df()
        )
        print(f"Found {len(df)} transfers")

asyncio.run(main())
```

### List Available Protocols

```python
client = DeFiStream()
protocols = client.protocols()
print(protocols)  # ['native_token', 'erc20', 'aave_v3', 'uniswap_v3', 'lido']
```

## Configuration

### Environment Variables

```bash
export DEFISTREAM_API_KEY=dsk_your_api_key
export DEFISTREAM_BASE_URL=https://api.defistream.dev/v1  # optional
```

```python
from defistream import DeFiStream

# API key from environment
client = DeFiStream()

# Or explicit
client = DeFiStream(api_key="dsk_...", base_url="https://api.defistream.dev/v1")
```

### Timeout and Retries

```python
client = DeFiStream(
    api_key="dsk_...",
    timeout=60.0,  # seconds
    max_retries=3
)
```

## Error Handling

```python
from defistream import DeFiStream
from defistream.exceptions import (
    DeFiStreamError,
    AuthenticationError,
    QuotaExceededError,
    RateLimitError,
    ValidationError
)

client = DeFiStream()

try:
    df = (
        client.erc20.transfers("USDT")
        .network("ETH")
        .block_range(21000000, 21010000)
        .as_df()
    )
except AuthenticationError:
    print("Invalid API key")
except QuotaExceededError as e:
    print(f"Quota exceeded. Remaining: {e.remaining}")
except RateLimitError as e:
    print(f"Rate limited. Retry after: {e.retry_after}s")
except ValidationError as e:
    print(f"Invalid request: {e.message}")
except DeFiStreamError as e:
    print(f"API error: {e}")
```

## Response Headers

Access rate limit and quota information:

```python
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# Access response metadata
print(f"Rate limit: {client.last_response.rate_limit}")
print(f"Remaining quota: {client.last_response.quota_remaining}")
print(f"Request cost: {client.last_response.request_cost}")
```

## Builder Methods Reference

### Common Methods (all protocols)

| Method | Description |
|--------|-------------|
| `.network(net)` | Set network (ETH, ARB, BASE, OP, POLYGON, etc.) |
| `.start_block(n)` | Set starting block number |
| `.end_block(n)` | Set ending block number |
| `.block_range(start, end)` | Set both start and end blocks |
| `.start_time(ts)` | Set starting time (ISO format or Unix timestamp) |
| `.end_time(ts)` | Set ending time (ISO format or Unix timestamp) |
| `.time_range(start, end)` | Set both start and end times |
| `.verbose()` | Include all metadata fields |
| `.with_value()` | Enrich events with USD value data (`value_usd` column) |

### Protocol-Specific Parameters

| Method | Protocols | Description |
|--------|-----------|-------------|
| `.token(*symbols)` | ERC20 | Token symbol(s) (USDT, USDC) or contract address. Accepts multiple known symbols for multi-token queries (multi-value). |
| `.sender(*addrs)` | ERC20, Native | Filter by sender address (multi-value) |
| `.receiver(*addrs)` | ERC20, Native | Filter by receiver address (multi-value) |
| `.involving(*addrs)` | All | Filter by any involved address (multi-value) |
| `.from_address(*addrs)` | ERC20, Native | Alias for `.sender()` |
| `.to_address(*addrs)` | ERC20, Native | Alias for `.receiver()` |
| `.min_amount(amt)` | ERC20, Native | Minimum transfer amount |
| `.max_amount(amt)` | ERC20, Native | Maximum transfer amount |
| `.eth_market_type(type)` | AAVE | Market type for ETH: 'Core', 'Prime', 'EtherFi' |
| `.symbol0(sym)` | Uniswap | First token symbol (required) |
| `.symbol1(sym)` | Uniswap | Second token symbol (required) |
| `.fee(tier)` | Uniswap | Fee tier: 100, 500, 3000, 10000 (required) |
| `.window(size)` | Binance OHLCV | Candle window: `1m`, `5m`, `15m`, `30m`, `1h`, `4h`, `1d` |
| `.skip_id(id)` | Binance raw trades | Pagination: skip trades with ID <= id |

### Address Label & Category Filters

Filter events by entity names or categories using the labels database. Available on all protocols.

| Method | Protocols | Description |
|--------|-----------|-------------|
| `.involving_label(label)` | All | Filter where any involved address matches a label substring (e.g., "Binance") |
| `.involving_category(cat)` | All | Filter where any involved address matches a category (e.g., "exchange") |
| `.sender_label(label)` | ERC20, Native | Filter sender by label substring |
| `.sender_category(cat)` | ERC20, Native | Filter sender by category |
| `.receiver_label(label)` | ERC20, Native | Filter receiver by label substring |
| `.receiver_category(cat)` | ERC20, Native | Filter receiver by category |

**Multi-value support:** Pass multiple values as separate arguments (e.g., `.sender_label("Binance", "Coinbase")`) or as a comma-separated string (e.g., `.sender_label("Binance,Coinbase")`). Both forms are equivalent.

**Mutual exclusivity:** Within each slot (involving/sender/receiver), only one of address/label/category can be set. `involving*` filters cannot be combined with `sender*`/`receiver*` filters.

### Aggregate Methods

| Method | Description |
|--------|-------------|
| `.aggregate(group_by, period)` | Transition to aggregate query. `group_by`: `"time"` or `"block"`. `period`: bucket size (e.g. `"1h"`, `"100b"`). Returns an `AggregateQueryBuilder` that supports all the same terminal and filter methods. |
| `client.aggregate_schema(protocol)` | Get available aggregate fields for a protocol (e.g. `"erc20"`, `"aave_v3"`). |

### Terminal Methods

| Method | Description |
|--------|-------------|
| `.as_df()` | Execute and return pandas DataFrame |
| `.as_df("polars")` | Execute and return polars DataFrame |
| `.as_file(path)` | Execute and save to file (format from extension) |
| `.as_file(path, format="csv")` | Execute and save with explicit format |
| `.as_dict()` | Execute and return list of dicts (JSON, 10K block limit) |
| `.as_link()` | Execute and return download link (CSV, 1hr expiry) |
| `.as_link(format="parquet")` | Execute and return download link (Parquet) |
| `.calculate_cost()` | Estimate query cost without executing (no quota deducted) |

## License

MIT License
