Metadata-Version: 2.4
Name: datalinks
Version: 1.2.8
Summary: Base package to build indexing scripts for DataLinks
Project-URL: Homepage, https://datalinks.com
Author-email: Rui Lopes <rui.lopes@datasetlinks.com>, Francisco Ferreira <francisco@datasetlinks.com>, Andrzej Grzesik <ags@datasetlinks.com>, Rui Valente <rui.valente@datasetlinks.com>
License-File: LICENSE
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Requires-Python: >=3.11
Requires-Dist: caseutil==0.7.2
Requires-Dist: python-dotenv>=1.0.1
Requires-Dist: requests>=2.32
Provides-Extra: dev
Requires-Dist: hatch-fancy-pypi-readme>=24.1.0; extra == 'dev'
Requires-Dist: pytest==9.0.3; extra == 'dev'
Requires-Dist: tox>=4.19; extra == 'dev'
Requires-Dist: twine>=6.2.0; extra == 'dev'
Provides-Extra: docs
Requires-Dist: myst-parser>=4.0.1; extra == 'docs'
Requires-Dist: sphinx-autodoc-typehints>=3.0.1; extra == 'docs'
Requires-Dist: sphinx-markdown-builder>=0.6.8; extra == 'docs'
Requires-Dist: sphinx-material>=0.0.36; extra == 'docs'
Requires-Dist: sphinx>=8.1.3; extra == 'docs'
Description-Content-Type: text/markdown

# DataLinks Python SDK

The **DataLinks Python SDK** is designed to simplify data ingestion, normalization, linking, and querying processes with DataLinks.
It integrates with the DataLinks API to provide a seamless development experience for managing data workflows, including entity resolution and inference steps, with robust configuration options.

This SDK is designed to accelerate the development of applications with DataLinks by wrapping the API integrations with a Pythonic interface, supporting flexible chaining of inference and validation steps.

Get started by [installing the SDK](#installation) and viewing the [Quick Start guide](#quick-start).

## Features

- **Ingestion API**: Easily ingest data into namespaces with built-in batching and retry mechanisms.
- **Multipart Upload**: Reliable large file ingestion via chunked S3 uploads with automatic abort on failure.
- **Ingestion Tracking**: Poll for async ingestion completion and progress after multipart uploads.
- **Inference Workflow Management**: Define custom chains of inference and validation steps.
- **Entity Resolution**: Match entities using configurable exact or geo-based matching methods.
- **Namespace Management**: Create and manage namespaces with privacy options.
- **Data Querying**: Query data with options to include/exclude metadata.
- **Custom Loaders**: Load custom data formats like JSON into defined workflows.
- **CLI Tool**: Standardized command-line interface for managing ingestion pipelines quickly.


---

## Installation


To install the SDK, simply use `pip` or `uv`:

```shell script
pip install datalinks
# or
uv add datalinks
```

If you want to install the package in editable development mode (includes pytest, tox, and twine):

1. Clone the repository from your version-control system.
2. Create a virtual environment with your tool of choice.
3. Run the following:

```shell script
pip install -e ".[dev]"
# or
uv pip install -e ".[dev]"
```


---

## Quick Start

### 1. Install

[Install the DataLinks SDK](#installation) if you haven't already.

### 2. Configure

Ensure the required environment variables are set:

| Variable | Required | Description |
|---|---|---|
| `DL_HOST` | Yes | Base URL of the DataLinks API (e.g. `https://api.datalinks.com`) |
| `DL_API_KEY` | Yes | Your DataLinks API key (JWT token) |
| `DL_NAMESPACE` | Yes | Target namespace for ingest and query operations |
| `OBJECT_NAME` | No | Object name scoping for ingest/query URLs; defaults to empty |

If you use the **Ingest Proxy** (`IngestProxyAPI`) for auto-modelled pipelines, set these instead:

| Variable | Required | Description |
|---|---|---|
| `DL_INGEST_PROXY` | Yes | Base URL of the ingest proxy (e.g. `https://your-proxy.vercel.app`) |
| `DL_API_KEY` | Yes | Your DataLinks API key (shared with the main API) |
| `DL_USERNAME` | Yes | Your DataLinks username |
| `DL_NAMESPACE` | Yes | Target namespace |

> Alternatively, you can use a `.env` file in the root of your project.

### 3. Ingest and query

```python
from datalinks.api import DataLinksAPI, DLConfig

# Initialize configuration
config = DLConfig.from_env()

# Instantiate API client
client = DataLinksAPI(config=config)

# Sample rows
rows = [
    {"brand": "Gilette", "category": "Razor", "product": "Heated Razor"},
    {"brand": "Oral-B",  "category": "Electric Toothbrush", "product": "iO Series 10"},
]

# Ingest pre-structured rows directly (no pipeline required)
result = client.ingest(data=rows)

# Query data
data = client.query_data(include_metadata=False)
print(data)
```

## CLI Usage

The SDK also provides a built-in CLI that can be extended:

```shell
datalinks-client [-h] --verbose <input-folder>
```


---

## Components


### 1. **DLConfig**
`DLConfig` reads configurations (e.g., API keys) via environment variables or `.env` files. This enables dynamic adaptation across deployment environments.

### 2. **DataLinksAPI**
`DataLinksAPI` handles interactions with the API. You can:
   - Ingest data directly or via multipart upload for large files.
   - Track and wait for async ingestion completion.
   - Query or retrieve data with complex parameters.
   - Manage namespaces.

### 3. **Inference Workflow**
Use a chain of inference and validation steps defined through classes like `ProcessUnstructured`, `Normalize`, and `Validate` to automate data preparation workflows.

```python
from datalinks.pipeline import Pipeline, ProcessUnstructured, Normalize, Validate, ValidateModes

# Define an inference pipeline
inference_steps = Pipeline(
   ProcessUnstructured(derive_from="source_field", helper_prompt="This extracts tables."),
   Normalize(target_cols={"email": "email_address"}, mode="all-in-one"),
   Validate(mode=ValidateModes.FIELDS, columns=["email", "phone"]),
)
```

### 4. **Entity Resolution**
Supports multiple resolution strategies, configurable via `MatchTypeConfig`:

```python
from datalinks.links import MatchTypeConfig, ExactMatch

entity_resolution = MatchTypeConfig(
   # parameters are optional
    exact_match=ExactMatch(minVariation=0.2, minDistinct=0.3)
)
```

### 5. **Loaders**
Abstract base loaders (e.g., `JSONLoader`) allow seamless data ingestion from custom file formats like `.json`.

### 6. **Parametrize LLMs**
You can choose the model and provider to be used in inference steps (eg.: `ProcessUnstructured`, `Normalize`, `Validate`).

```python

from datalinks.pipeline import Pipeline, ProcessUnstructured

steps = Pipeline(
        ProcessUnstructured(
            derive_from="text",
            helper_prompt="If you find a numeric field use only the value and omit the rest.",
            model="gpt-4.1-nano-2025-04-14",
            provider="openai"
        )
    )
```


---

## Examples

The following examples demonstrate end-to-end usage of the DataLinks Python SDK across a range of common scenarios. Each example is self-contained and can be run directly after configuring your environment variables.

---

# Direct Ingestion

Demonstrates the simplest possible ingestion flow: load pre-structured rows from a JSON file and push them to DataLinks **without** any inference pipeline or entity resolution. Use this pattern when your data is already in a clean, tabular format and requires no AI-assisted transformation.

**Components covered:** `DLConfig`, `DataLinksAPI`, `create_space`, `ingest`, `query_data`

```python
"""
Direct ingestion example using data/pgproducts.json.

Ingests pre-structured rows directly without any inference pipeline or
entity resolution — the data is stored as-is.
"""
import json
import logging
from pprint import pformat

import datalinks
from datalinks.api import DLConfig


def main():
    logging.basicConfig(level=logging.INFO)

    dl_config = DLConfig.from_env()
    dl_config.namespace = "pg"
    dl_config.objectname = "products_direct"

    dlapi = datalinks.api.DataLinksAPI(dl_config)
    dlapi.create_space(is_private=True)

    jsonfile = "data/pgproducts.json"
    logging.info(f"Loading {jsonfile}")
    with open(jsonfile) as f:
        rows = json.load(f)["rows"]

    logging.info(f"Ingesting {len(rows)} rows")
    result = dlapi.ingest(data=rows)

    logging.info(
        f"Ingestion result: {len(result.successful)} succeeded, "
        f"{len(result.failed)} failed"
    )

    data = dlapi.query_data()
    logging.info(f"Ingested data:\n{pformat(data)}")


if __name__ == "__main__":
    main()
```

---

# JSON Ingestion with Pipeline and Entity Resolution

Demonstrates how to ingest structured JSON data using a `ProcessStructured` pipeline step and `ExactMatch` entity resolution. The pipeline instructs DataLinks to derive tabular rows from the JSON `"rows"` key, and entity resolution deduplicates records by exact field matching.

**Components covered:** `DLConfig`, `DataLinksAPI`, `Pipeline`, `ProcessStructured`, `MatchTypeConfig`, `ExactMatch`, `ingest`, `query_data`

```python
import json
import logging
from pprint import pformat

import datalinks
from datalinks.api import DLConfig
from datalinks.links import EntityResolutionTypes, MatchTypeConfig, ExactMatch
from datalinks.pipeline import Pipeline, ProcessStructured


def main():
    logging.basicConfig(level=logging.INFO)

    dl_config = DLConfig.from_env()
    # we did not set namespace and object because it varies with each example
    dl_config.namespace = "pg"
    dl_config.objectname = "products"

    # OR
    #dl_config = DLConfig(
    #    host="http://localhost:9001",
    #    apikey="", # your DataLinks API key
    #    index="tests",
    #    namespace="pg",
    #    objectname="products"
    #)

    dlapi = datalinks.api.DataLinksAPI(dl_config)
    dlapi.create_space(is_private=True) # default

    jsonfile = "data/pgproducts.json"
    logging.info(f"Loading json data in {jsonfile}")
    with open(jsonfile) as f:
        data = json.load(f)

    steps = Pipeline(
        ProcessStructured(derive_from="rows") # Data is already tabular
    )

    entity_resolution = MatchTypeConfig(ExactMatch())

    result = dlapi.ingest(
        data = [data], # supports multiple files
        inference_steps=steps,
        entity_resolution=entity_resolution,
        batch_size=0 # default (no file batching)
    )

    logging.info(f"Ingestion result:"
                 f"\nSuccessfully ingested {len(result.successful)} dataset(s)."
                 f"\nFailed {len(result.failed)} dataset(s).")

    data = dlapi.query_data(
            model="gpt-4.1-nano-2025-04-14",
            provider="openai"
    )
    logging.info(f"Ingested data:"
                 f"{pformat(data)}")

if __name__ == '__main__':
    main()
```

---

# Tabular Inference from Unstructured Text

Demonstrates a full AI-powered inference pipeline that transforms raw unstructured text into a structured table. The three-step pipeline uses `ProcessUnstructured` to extract an initial table from free-form text, `Normalize` to map columns to a target schema, and `Validate` to verify row integrity — all powered by an LLM.

**Components covered:** `DLConfig`, `DataLinksAPI`, `Pipeline`, `ProcessUnstructured`, `Normalize`, `NormalizeModes`, `Validate`, `ValidateModes`, `MatchTypeConfig`, `ExactMatch`, `ingest`, `query_data`

```python
import logging
from pprint import pformat

import datalinks
from datalinks.api import DLConfig
from datalinks.links import MatchTypeConfig, ExactMatch
from datalinks.pipeline import Pipeline, ProcessUnstructured, Normalize, NormalizeModes, Validate, ValidateModes


def main():
    logging.basicConfig(level=logging.DEBUG)

    dl_config = DLConfig.from_env()
    # we did not set namespace and object because it varies with each example
    dl_config.namespace = "cinema"
    dl_config.objectname = "awards"

    # OR
    # dl_config = DLConfig(
    #    host="http://localhost:9001",
    #    apikey="", # your DataLinks API key
    #    index="tests",
    #    namespace="pg",
    #    objectname="products"
    # )

    dlapi = datalinks.api.DataLinksAPI(dl_config)
    dlapi.create_space(is_private=True) # default

    textfile = "data/movies.txt"
    logging.info(f"Loading text in {textfile}")
    with open(textfile) as f:
        data = {"text": f.read()}

    steps = Pipeline(
        ProcessUnstructured(
            derive_from="text",
            helper_prompt="If you find a numeric field use only the value and omit the rest.",
            model="gpt-4.1-mini-2025-04-14",
            provider="openai"
        ), # Infer table from unstructured text
        Normalize(
            target_cols={
                "Name": "the actor/actress name",
                "Titles": "the list of notable films where the actor was in",
                "Oscars": "the number of oscars won"
            },
            mode=NormalizeModes.ALL_IN_ONE,
            model="gpt-4.1-mini-2025-04-14",
            provider="openai"
        ),
        Validate(
            mode=ValidateModes.ROWS,
            columns=["Name", "Titles", "Oscars"],
            model="gpt-4.1-mini-2025-04-14",
            provider="openai"
        )
    )

    entity_resolution = MatchTypeConfig(ExactMatch())

    result = dlapi.ingest(
        data = [data], # supports multiple files
        inference_steps=steps,
        entity_resolution=entity_resolution,
        max_attempts=1,
        batch_size=0 # default (no file batching)
    )

    logging.info(f"Ingestion result:"
                 f"\nSuccessfully ingested {len(result.successful)} dataset(s)."
                 f"\nFailed {len(result.failed)} dataset(s).")

    data = dlapi.query_data(
        model="gpt-4.1-mini-2025-04-14",
        provider="openai",
        include_metadata=True
    )
    logging.info(f"Ingested data:\n"
                 f"{pformat(data)}")

if __name__ == '__main__':
    main()
```

---

# Multipart Upload

Demonstrates how to upload large files to DataLinks using the multipart upload API. The three-phase flow — prepare, upload, finish — streams the file in chunks directly to presigned S3 URLs, avoiding memory constraints for large datasets. If any part fails, the upload session is aborted to free server-side resources.

**Components covered:** `DLConfig`, `DataLinksAPI`, `prepare_multipart_upload`, `finish_multipart_upload`, `wait_for_ingestion`, `abort_multipart_upload`

```python
"""
Multipart upload example using data/pgproducts_mp.json (~10 MB, 2 parts).

Multipart upload is the recommended approach for large files. The flow is:
  1. Prepare  — DataLinks allocates an upload session and returns presigned S3 URLs
                and the server-side partSize to use when splitting the file.
  2. Upload   — Each file chunk (sized to partSize) is PUT directly to its presigned
                URL; S3 returns an ETag per part.
  3. Finish   — DataLinks assembles the parts and triggers ingestion.

If anything goes wrong during upload, abort is called to clean up the partial upload.
"""
import logging
import os

import requests

import datalinks
from datalinks.api import DLConfig


def upload_multipart(filepath: str):
    dl_config = DLConfig.from_env()
    dl_config.namespace = "pg_multipart"
    dl_config.objectname = "products"

    dlapi = datalinks.api.DataLinksAPI(dl_config)
    dlapi.create_space(is_private=True)

    filename = os.path.basename(filepath)
    size = os.path.getsize(filepath)
    logging.info(f"Preparing multipart upload for '{filename}' ({size:,} bytes)")

    prepare = dlapi.prepare_multipart_upload(filename, size)
    upload_id = prepare["uploadId"]
    key = prepare["key"]
    part_size = prepare["partSize"]
    presigned_urls = [entry["url"] for entry in prepare["presignedUrls"]]
    logging.info(
        f"Upload session ready: {len(presigned_urls)} part(s) of {part_size:,} bytes, "
        f"uploadId={upload_id}"
    )

    parts = []
    part_num = 1
    try:
        with open(filepath, "rb") as f:
            for part_num, url in enumerate(presigned_urls, start=1):
                chunk = f.read(part_size)
                if not chunk:
                    break
                logging.info(
                    f"Uploading part {part_num}/{len(presigned_urls)} ({len(chunk):,} bytes)"
                )
                response = requests.put(url, data=chunk)
                response.raise_for_status()
                etag = response.headers["ETag"].strip('"')
                parts.append({"partNumber": part_num, "etag": etag})
                logging.info(f"Part {part_num} uploaded, ETag={etag}")
    except Exception as e:
        logging.error(f"Upload failed on part {part_num}: {e} — aborting")
        dlapi.abort_multipart_upload(upload_id, key)
        raise

    logging.info("All parts uploaded, finishing ingestion")
    result = dlapi.finish_multipart_upload(upload_id, key, parts, name=filename)
    ingestion_id = result["id"]
    logging.info(f"Ingestion queued: id={ingestion_id}")

    final = dlapi.wait_for_ingestion(ingestion_id)
    logging.info(
        f"Ingestion finished: status={final.get('status')!r}, "
        f"rows={final.get('processedRows')}, message={final.get('statusMessage')!r}"
    )


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    upload_multipart("data/pgproducts_mp.json")
```

---

# Interactive Assistant

Demonstrates how to build an interactive command-line assistant that answers natural language questions about your data using the `ask` streaming API. The example handles each `AskEvent` type — `plan`, `step`, `answer`, and `error` — and renders responses using the `rich` library for a polished terminal experience.

**Components covered:** `DLConfig`, `DataLinksAPI`, `AskEvent`, `ask` (streaming)

```python
# NOTE:
# This example creates an interactive CLI to ask natural language questions about your data.

import datalinks
from datalinks.api import DLConfig, AskEvent
from rich.console import Console
from rich.markdown import Markdown
from rich.text import Text

console = Console()


def handle_event(event: AskEvent) -> None:
    if event.type == "plan":
        steps = event.data.get("steps", [])
        console.print()
        for step in steps:
            console.print(f"  {step}", style="dim")

    elif event.type == "step":
        idx = event.data.get("index", 0)
        reasoning = event.data.get("reasoning", "")
        query = event.data.get("query", "")
        data = event.data.get("data", [])
        console.print(f"  [{idx + 1}] {reasoning}", style="dim")
        if query:
            console.print(f"      query: {query}", style="dim italic")
        if data:
            console.print(f"      → {len(data)} record(s) retrieved", style="dim")

    elif event.type == "answer":
        response = event.data.get("response", "")
        console.print()
        console.print(Markdown(response))
        console.print()

    elif event.type == "error":
        message = event.data.get("message", "Unknown error")
        console.print(f"\nError: {message}\n", style="bold red")


def ask_loop(dlapi: datalinks.api.DataLinksAPI, namespace: str) -> None:
    console.print(Text.assemble(("DataLinks Assistant", "bold cyan"), " ", (f"({namespace})", "dim")))
    console.print("Type your question and press Enter. Type 'exit' to quit.\n", style="dim")

    while True:
        try:
            question = console.input("[bold yellow]You:[/bold yellow] ").strip()
        except (EOFError, KeyboardInterrupt):
            console.print()
            break

        if not question:
            continue
        if question.lower() == "exit":
            break

        console.print("[bold cyan]DataLinks:[/bold cyan]", end=" ")
        for event in dlapi.ask(question):
            handle_event(event)


def main() -> None:
    dl_config = DLConfig.from_env()

    if dl_config.namespace == "namespace-notset":
        try:
            dl_config.namespace = console.input("Namespace: ").strip()
        except (EOFError, KeyboardInterrupt):
            console.print()
            return

    dl_config.objectname = ""

    dlapi = datalinks.api.DataLinksAPI(dl_config)
    ask_loop(dlapi, dl_config.namespace)


if __name__ == "__main__":
    main()
```

---

# GitHub Tickets Ingestion with Auto-Modelled Ontology (Vercel)

Demonstrates how to use the `IngestProxyAPI` to ingest raw GitHub tickets and pull requests and let DataLinks automatically model an ontology from them. A natural-language prompt describes the desired schema — tables for tickets, contributors, triage insights, and applied technologies — and the pipeline surfaces that structure without any manual mapping. Events from the streaming pipeline run are logged as they arrive. The client reconnects automatically if the stream drops, resuming from the last received event.

**Components covered:** `IngestProxyConfig`, `IngestProxyAPI`, `run_pipeline`

**Required env vars:** `DL_INGEST_PROXY`, `DL_API_KEY`, `DL_USERNAME`, `DL_NAMESPACE`

```python
import json
import logging

from datalinks.api import IngestProxyAPI, IngestProxyConfig


USER_PROMPT = """
## What we are doing
  We are building an assistant to help with the coordinations tasks of a development team. It is effectively a combination of multiple agents that collaborate on a shared memory pool to perform their tasks, these agents include:
  - An agent that helps assign tickets and pull requests to the correct developer for implementation or investigation
  - An agent that helps to estimate effort of specific tasks
  - An agent that does duplicate detection
  - An agent that helps generating prompts for Claude Code to implement whenever the ticket is estimated to be of acceptable complexity and size
  - A workflow that ingests new tickets and and updates existing tickets

  ## What are we working with
  We're feeding all the data to DataLinks automodeler (note: that's you, and thank you for the help!), and we're expecting an ontology to surface automatically. We're submitting raw GitHub tickets and pull requests.
  
  ## What we want
  We want to design and populate an ontology for agents to write and read from, it should have at least the following tables:

1. **Tickets**: Core ticket or PR identity — ticket_id, title, body, labels, state, lock status, reporter, ticket_url.
2. **Contributors**: People/orgs involved (reporter, assignees, closers). Track roles and contributor ids and usernames.
3. **Triage**: Triage insights — inferred priority (Low/Med/High), effort size (Low/Med/High), technology stack, risk level, labels, and any other useful signals.
4. **Applied Technologies** - What are the dependencies and technologies a specific ticket has. A single ticket may have multiple technologies or frameworks, we want them to be represented here.
5. **Additional tables** as needed, we expect the ontology to evolve as new data comes in.

  Rules:
- Use identifiers always as string, never as numbers.
- Always include the available timestamps
"""

def main():
    logging.basicConfig(level=logging.INFO)

    config = IngestProxyConfig.from_env()
    config.namespace = "vercel"

    proxy = IngestProxyAPI(config)

    with open("data/vercel100.json") as f:
        data = json.load(f)

    logging.info(f"Starting pipeline run for {len(data)} records")

    with proxy.run_pipeline(
        data=data,
        user_prompt=USER_PROMPT,
    ) as run:
        logging.info(f"Run ID: {run.run_id}")
        for event in run:
            logging.info(f"Event: {event}")

    logging.info("Pipeline run complete")


if __name__ == "__main__":
    main()
```
## Run Unit Tests

Run tests to verify your implementation:
```shell script
tox
```
## License

**DataLinks Python SDK** is licensed under the MIT License. See the [LICENSE](https://opensource.org/license/mit) file for more details.
## Support

For questions or support, please [contact us](https://datalinks.com/newsletter).
