Metadata-Version: 2.4
Name: ingestion-engine
Version: 1.0.0
Summary: 
Author: jalal
Author-email: jalalkhaldi3@gmail.com
Requires-Python: >=3.12,<3.14
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Dist: docling (>=2.0.0,<3.0.0)
Requires-Dist: pymupdf (>=1.27.2.3)
Requires-Dist: retrievalbase[transformers] (>=2.1.3,<3.0.0)
Description-Content-Type: text/markdown

# 📦 Ingestion Engine
Ingestion Engine is a typed Python library for building document ingestion pipelines. It gives you small async building blocks for loading data, transforming it, and writing it somewhere useful.


## 🌟 Highlights

- Simple `Source`, `Transformer`, and `Sink` contracts for ingestion workflows.
- Async-first interfaces for I/O-heavy data processing.
- Built-in PDF parsing with Docling.
- LangChain-compatible document objects for retrieval and AI workflows.
- Local JSON Lines output for embedded documents.
- Pydantic settings models for explicit, typed component configuration.
- Extensible design: bring your own source, transformer, embedder, or sink.


## ℹ️ Overview

Ingestion Engine helps you structure document ingestion code without locking you into one storage backend, embedding provider, or orchestration framework. A pipeline is built from three concepts:

1. A `Source` loads input data.
2. A `Transformer` converts that data into another representation.
3. A `Sink` writes the final output.

The current library focuses on document and retrieval workflows. It includes a Docling-based PDF parser that turns PDFs into page-level documents with metadata, plus a local JSONL sink for storing embedded documents during development, testing, or batch handoff.

This project is useful when you want clear ingestion boundaries before sending data to a vector database, search index, data lake, or downstream AI application.


### ✍️ Authors

Created by Efysent.


## 🚀 Usage

Parse a PDF into page-level documents:

```py
from pydantic import BaseModel

from ingestion_engine.transformer.pdf_parser import (
    DoclingPDFParserTransformer,
    RawPDFDocument,
)
from ingestion_engine.transformer.settings import DoclingPDFParserTransformerSettings


class PaperMetadata(BaseModel):
    paper_id: str
    title: str


settings = DoclingPDFParserTransformerSettings(
    module_path="ingestion_engine.transformer.pdf_parser.DoclingPDFParserTransformer",
)
transformer = DoclingPDFParserTransformer(settings)

raw_document = RawPDFDocument(
    metadata=PaperMetadata(paper_id="paper-123", title="Example Paper"),
    pdf_path="/path/to/paper.pdf",
)

documents = await transformer.transform(raw_document)
```

Write embedded documents to JSON Lines:

```py
from ingestion_engine.sink.local_json import LocalJSONSink
from ingestion_engine.sink.settings import LocalJSONSinkSettings
from ingestion_engine.transformer.embedder import EmbeddedDocument


sink = LocalJSONSink(
    LocalJSONSinkSettings(
        module_path="ingestion_engine.sink.local_json.LocalJSONSink",
        output_path="./data/embedded_documents.jsonl",
    )
)

await sink.write(
    [
        EmbeddedDocument(
            page_content="first page",
            metadata={"page": 1},
            embedding=[0.1, 0.2, 0.3],
        )
    ]
)
```

Define your own components by implementing the base contracts:

```py
from collections.abc import AsyncGenerator

from ingestion_engine.source import Source
from ingestion_engine.source.settings import SourceSettings
from ingestion_engine.transformer import Transformer
from ingestion_engine.transformer.settings import TransformerSettings
from ingestion_engine.sink import Sink
from ingestion_engine.sink.settings import SinkSettings


class TextSource(Source[TextSourceSourceSettings, str]):
    async def load(self) -> AsyncGenerator[str]:
        yield "hello"


class UppercaseTransformer(Transformer[UppercaseTransformerSettings, str, str]):
    async def transform(self, data: str) -> str:
        return data.upper()


class PrintSink(Sink[PrintSinkSettings, str]):
    async def write(self, data: str) -> None:
        print(data)
```


## ⬇️ Installation

This project requires Python 3.12 or newer.

Install it with:

```bash
pip install ingestion-engine
```

For local development from this repository, use `uv`:

```bash
uv sync --group dev --all-extras
```


## 🧱 Project Structure

```text
ingestion-engine/
|-- src/ingestion_engine/
|   |-- source/          # Source base class and settings
|   |-- transformer/     # Transformer base class, documents, PDF parser, embedder contracts
|   |-- sink/            # Sink base class, settings, local JSONL sink
|   |-- exceptions.py    # Package-level base exception
|   `-- py.typed         # Type information marker
|-- tests/
|   |-- fixtures/        # Shared pytest fixtures
|   |-- unit/            # Unit tests
|   `-- integration/     # Integration tests
|-- pyproject.toml
|-- Makefile
`-- README.md
```


## 🧩 Common Use Cases

- Parse PDFs into page-level documents for retrieval systems.
- Preserve source metadata while adding parser metadata like `doc_id`, `page_number`, and `total_pages`.
- Build ingestion pipelines for vector databases, search indexes, data lakes, or local JSONL exports.
- Keep embedding logic replaceable behind an `EmbedderTransformer` implementation.
- Test ingestion pieces independently with mocked sources, transformers, and sinks.
- Prototype local document workflows before wiring production infrastructure.


## 🧪 Development

Run tests:

```bash
make test
```

Run formatting and linting:

```bash
make format
make lint
```

Run type checking:

```bash
make type-check
```

Run the local CI equivalent:

```bash
make ci
```

## 💭 Feedback and Contributing

Bug reports, feature requests, and implementation ideas are welcome. Open an issue or discussion in the repository with:

- What you expected to happen.
- What actually happened.
- A minimal example or test case when possible.
- The Python version and relevant dependency versions.

Good contributions for this project include new sources, transformers, sinks, tests, examples, and documentation improvements.

