Metadata-Version: 2.4
Name: mlfastflow
Version: 0.2.9.0
Summary: Packages for fast dataflow and workflow processing
Home-page: https://pypi.org/project/mlfastflow/
Author: Xileven
Author-email: Xileven <hi@bringyouhome.com>
License-Expression: MIT
Project-URL: Documentation, https://github.com/xileven/mlfastflow
Project-URL: Source, https://github.com/xileven/mlfastflow
Project-URL: Tracker, https://github.com/xileven/mlfastflow/issues
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: faiss-cpu
Requires-Dist: google-auth>=2.0.0
Requires-Dist: google-cloud-bigquery>=3.0.0
Requires-Dist: google-cloud-storage>=2.0.0
Requires-Dist: graphviz>=0.19.0
Requires-Dist: ipywidgets>=7.6.0
Requires-Dist: numpy>=1.20.0
Requires-Dist: pandas>=1.3.0
Requires-Dist: pandas-gbq>=0.17.0
Requires-Dist: polars>=1.29.0
Requires-Dist: pyarrow>=12.0.0
Requires-Dist: python-dotenv>=0.19.0
Requires-Dist: tqdm>=4.64.0
Requires-Dist: ydata-profiling>=4.5.0
Dynamic: author
Dynamic: home-page
Dynamic: license-file
Dynamic: requires-python

# MLFastFlow

A Python package for fast dataflow and workflow processing.

## Installation

```bash
pip install mlfastflow
```

## Features

- **FastKNN** — FAISS-backed vector similarity search designed for ML data sourcing
  - Four index types: exact (`flat`) and approximate (`ivf_flat`, `ivf_pq`, `hnsw`)
  - L2 and cosine (inner product) distance metrics
  - Built-in label-recall validation
- **BigQueryClient** — unified Pandas + Polars BigQuery client with GCS integration
- **Utility functions** — CSV→Parquet conversion, file concatenation, data profiling, timer decorator

## Quick Start

```python
from mlfastflow import FastKNN

# query_df: labelled data — rows where label == 1 are the positive anchors
# search_df: the pool to search through
# knn_keys: feature columns used for similarity vectors
# label: binary column marking your positive examples

knn = FastKNN(
    query_df=labelled_df,
    search_df=pool_df,
    knn_keys=["feature_1", "feature_2", "feature_3"],
    label="is_positive",
)

sourced_df, sourced_df_with_label = knn.run()
knn.validate()
```

## FastKNN — Vector Similarity Search

`FastKNN` is designed for ML data-sourcing workflows. It uses the `label` column
as an anchor: rows where `label == 1` in `query_df` are the positive examples,
and the index finds the most similar candidates from `search_df`.

### Index Types

| `index_type` | Exact? | Speed | Memory | Best for |
|---|---|---|---|---|
| `'flat'` (default) | ✅ Yes | Moderate | High | Small datasets, baseline |
| `'ivf_flat'` | ≈ tunable | Fast | Moderate | Large datasets (>100K) |
| `'ivf_pq'` | ≈ tunable | Very fast | Low | Very large / memory-constrained |
| `'hnsw'` | ≈ tunable | Very fast | Moderate | High recall + speed |

### Examples

```python
from mlfastflow import FastKNN

# Exact search (default) — L2 distance
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=["f1", "f2", "f3"], label="is_positive",
)

# Cosine similarity
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=["f1", "f2", "f3"], label="is_positive",
    metric="ip", normalize=True,
)

# Approximate search — IVF Flat
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=["f1", "f2", "f3"], label="is_positive",
    index_type="ivf_flat", nlist=200, nprobe=16,
)

# Memory-efficient — IVF-PQ
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=[f"emb_{i}" for i in range(64)], label="is_positive",
    index_type="ivf_pq", nlist=500, nprobe=32, pq_m=8,
)

# Graph-based — HNSW
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=["f1", "f2", "f3"], label="is_positive",
    index_type="hnsw", hnsw_m=32, ef_construction=64, ef_search=32,
)

sourced_df, sourced_df_with_label = knn.run()
knn.validate()  # logs rows, label counts, and recall %
```

## BigQuery Integration

MLFastFlow provides a powerful `BigQueryClient` class for seamless integration with Google BigQuery and Google Cloud Storage (GCS). It supports both **Pandas** and **Polars** DataFrames in a single unified client.

### Initialization

```python
from mlfastflow import BigQueryClient

# Initialize with a service account key file
bq_client = BigQueryClient(
    project_id="your-gcp-project-id",
    dataset_id="your_dataset",
    key_file="/path/to/your/service-account-key.json"
)

# Or use Application Default Credentials (no key file needed on GCP)
bq_client = BigQueryClient(
    project_id="your-gcp-project-id",
    dataset_id="your_dataset"
)

# Display client configuration
bq_client.show()
```

### Context Manager

The client supports Python's `with` statement for automatic resource cleanup:

```python
with BigQueryClient(project_id="...", dataset_id="...") as client:
    df = client.sql2df("SELECT * FROM your_table LIMIT 10")
# Client is automatically closed here
```

### Running SQL Queries

```python
# Execute a SQL query and get results as a Pandas DataFrame
df = bq_client.sql2df("SELECT * FROM your_dataset.your_table LIMIT 10")

# Execute a SQL query and get results as a Polars DataFrame
df = bq_client.sql2polars("SELECT * FROM your_dataset.your_table LIMIT 10")

# Get a Polars LazyFrame for deferred computation
lf = bq_client.sql2polars("SELECT * FROM your_table", lazy=True)
result = lf.filter(pl.col("status") == "active").collect()

# Save query results directly to a local file (.parquet, .csv, .json)
bq_client.sql2file(
    sql="SELECT * FROM your_dataset.your_table",
    file_path="output.parquet"
)

# Run a DDL/DML query and get the job ID for tracking
job_id = bq_client.run_sql("CREATE TABLE your_dataset.new_table AS SELECT * FROM your_dataset.source_table")

# Check the status of an asynchronous query job
job_status = bq_client.check_job_status(job_id)
```

### Cost Estimation (Dry Run)

Check how much data a query will scan before executing it:

```python
estimate = bq_client.sql2df("SELECT * FROM huge_table", dry_run=True)
print(f"This query will scan {estimate['estimated_gb']} GB")
```

### Table Operations

```python
# Truncate a table (requires interactive confirmation by typing the table name)
bq_client.truncate_table("your_table_name")
```

### DataFrame to BigQuery

```python
import pandas as pd
import polars as pl

# Upload a Pandas DataFrame
df = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'value': [100, 200, 300]
})

bq_client.df2table(
    df=df,
    table_id="your_table_name",
    if_exists="fail"  # Options: 'fail', 'replace', 'append'
)

# Upload a Polars DataFrame (uses efficient Arrow → Parquet path, no Pandas conversion)
pl_df = pl.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie']
})

bq_client.polars2table(
    df=pl_df,
    table_id="your_table_name",
    if_exists="replace"
)
```

### BigQuery to Google Cloud Storage

```python
# Export query results to GCS (server-side, no data passes through your machine)
bq_client.sql2gcs(
    sql="SELECT * FROM your_dataset.your_table",
    destination_uri="gs://your-bucket/path/to/export.parquet",
    format="PARQUET",       # Options: 'PARQUET', 'CSV', 'JSON', 'AVRO'
    compression="SNAPPY",   # Options: 'NONE', 'GZIP', 'SNAPPY', 'DEFLATE'
    timeout=300,
    use_sharding=True       # Automatically adds wildcard for parallel export
)

# Export a DataFrame (Pandas or Polars) to GCS via a temporary BigQuery table
bq_client.df2gcs(
    df=pl_df,
    destination_uri="gs://your-bucket/path/to/export.parquet"
)
```

### Google Cloud Storage to BigQuery

```python
# Load data from GCS to BigQuery
bq_client.gcs2table(
    gcs_uri="gs://your-bucket/path/to/data/*.parquet",
    table_id="your_destination_table",
    write_disposition="WRITE_TRUNCATE",  # Options: 'WRITE_TRUNCATE', 'WRITE_APPEND', 'WRITE_EMPTY'
    source_format="PARQUET"              # Options: 'PARQUET', 'CSV', 'JSON', 'AVRO', 'ORC'
)
```

### GCS Folder Management

```python
# Create a folder in GCS
bq_client.create_gcs_folder("gs://your-bucket/new-folder/")

# Delete a folder and all its contents (dry run first)
success, deleted_count = bq_client.delete_gcs_folder(
    gcs_folder_path="gs://your-bucket/folder-to-delete/",
    dry_run=True  # Set to False to actually delete
)
print(f"Would delete {deleted_count} files" if success else "Error occurred")
```

### Entity Relationship Diagram (ERD)

```python
# Generate an ERD for a list of BigQuery tables
bq_client.erd(
    table_list=[
        "project.dataset.table1",
        "project.dataset.table2"
    ],
    output_filename="bq_erd",
    output_format="png",       # Options: 'png', 'svg', 'pdf'
    view_diagram=True          # Automatically open the generated diagram
)
```

### Resource Management

```python
# Using context manager (recommended)
with BigQueryClient(...) as client:
    df = client.sql2df("SELECT ...")

# Or close manually
bq_client.close()
```

### Data Type Handling

```python
# Fix mixed data types in a Pandas DataFrame before uploading
# Inference chain: numeric → datetime → string (nulls preserved)
df = BigQueryClient.fix_mixed_types(
    df=your_dataframe,
    columns=["column1", "column2"],  # Optional: defaults to all object-dtype columns
    strategy="infer"                 # Options: 'infer', 'to_string'
)
```

## Utility Functions

### CSV to Parquet Conversion

Convert CSV files to Parquet using Polars streaming (`sink_parquet`) for constant memory usage, even on multi-GB files:

```python
from mlfastflow import csv2parquet

# Convert a single CSV file
csv2parquet("path/to/file.csv")

# Convert all CSV files in a directory (recursively)
csv2parquet("path/to/directory", sub_folders=True)

# Custom output directory, zstd compression, skip already-converted files
csv2parquet(
    "path/to/source",
    output_dir="path/to/destination",
    compression="zstd",    # Options: 'snappy', 'zstd', 'lz4', 'gzip', 'uncompressed'
    overwrite=False         # Skip files that already have a .parquet counterpart
)
```

### File Concatenation

Combine all CSV or Parquet files in a folder into a single file using lazy scanning:

```python
from mlfastflow import concat_files

# Combine all Parquet files in a folder
output_path = concat_files("path/to/folder", file_type="parquet")

# Combine CSVs with a source column, custom output location
output_path = concat_files(
    "path/to/folder",
    file_type="csv",
    add_source_column=True,             # Adds a SOURCE column with the filename
    output_file="path/to/combined.csv",
    how="diagonal_relaxed"              # Handles schema mismatches across files
)
```

### Timer Decorator

Log the execution time of any function with adaptive formatting (ms/s/min):

```python
from mlfastflow import timer_decorator

@timer_decorator
def my_function():
    # ... your code ...
    pass

my_function()
# INFO:mlfastflow.utils:Finished 'my_function' in 342.5 ms
```

### Data Profiling

Generate an HTML profiling report for any Pandas or Polars DataFrame:

```python
from mlfastflow import profile

# Generate a minimal profiling report
profile(df, title="Customer Data Report", output_path="reports/")

# Full report with correlations
profile(df, title="Full Analysis", minimal=False)
```

Requires `ydata-profiling`: `pip install ydata-profiling`

## License

MIT

## Author

Xileven
