Metadata-Version: 2.4
Name: olympipe
Version: 1.8.3
Summary: A powerful parallel pipelining tool
License: MIT
License-File: LICENSE
Keywords: pipeline,multiprocessing
Author: Gabriel Kasser
Author-email: gabriel.kasser@gmail.com
Requires-Python: >=3.8,<4.0
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Requires-Dist: cloudpickle (>=3.1.2,<4.0.0)
Requires-Dist: dpkt (>=1.9.8,<2.0.0)
Requires-Dist: psutil (>=5.9.5,<6.0.0)
Requires-Dist: setproctitle (>=1.3.3,<2.0.0)
Requires-Dist: tqdm (>=4.66.1,<5.0.0)
Requires-Dist: types-psutil (>=5.9.5.16,<6.0.0.0)
Project-URL: Homepage, https://gitlab.com/gabraken/olympipe
Project-URL: Repository, https://gitlab.com/gabraken/olympipe
Description-Content-Type: text/markdown

# Olympipe

![coverage](https://gitlab.com/gabraken/olympipe/badges/master/coverage.svg?job=tests)
![status](https://gitlab.com/gabraken/olympipe/badges/master/pipeline.svg)

**Zero-boilerplate parallel pipelines for Python.**

Turn any iterator into a multi-process pipeline with a single line. Each `.task()` runs in its own process, bypassing the GIL — no `multiprocessing.Pool`, no queues, no plumbing.

```python
from olympipe import Pipeline

results = (
    Pipeline(range(1000))
    .task(heavy_compute, count=8)   # 8 parallel workers
    .filter(lambda x: x > 0)
    .batch(32)
    .wait_for_result()
)
```

## Installation

```bash
pip install olympipe
```

## Why Olympipe?

- **Chainable API** — compose steps like pandas or streams
- **True multiprocessing** — each step is a separate process, GIL-free
- **PyTorch-safe** — auto-detects torch and switches to `spawn` context to avoid CUDA/DataLoader deadlocks
- **Type-safe** — fully typed, pipeline errors caught at IDE time
- **Batteries included** — split/gather, time-windows, disk caching, HTTP server source

---

## Basic usage

```python
from olympipe import Pipeline

results = (
    Pipeline(range(20))
    .task(lambda x: x * 2)           # transform
    .filter(lambda x: x % 4 == 0)    # keep even multiples of 4
    .batch(3)                         # group into lists of 3
    .wait_for_result()
)
# [[0, 8, 16], [24, 32, 40]]
```

---

## Parallel workers

Scale any step horizontally by passing `count=N`. Workers share the same input queue and output queue — no manual coordination needed.

```python
import time
from olympipe import Pipeline

def slow_io(url: str) -> bytes:
    # simulate network call
    time.sleep(0.5)
    return b"data"

# 200 URLs processed with 20 concurrent workers
results = (
    Pipeline(my_urls)
    .task(slow_io, count=20)
    .wait_for_result()
)
```

---

## Machine learning inference

Olympipe shines for ML workloads: saturate your GPU(s) with a pipeline that loads data in parallel, batches it, runs inference, and post-processes results — all without a single thread/process management line.

```python
import torch
from torch import nn
from olympipe import Pipeline

class Classifier:
    """One instance per worker process — each gets its own model copy."""

    def __init__(self, device: str = "cuda"):
        self.device = device
        self.model = nn.Sequential(nn.Linear(512, 128), nn.ReLU(), nn.Linear(128, 10))
        self.model.load_state_dict(torch.load("weights.pt"))
        self.model.eval().to(device)

    def predict(self, batch: list) -> list:
        with torch.no_grad():
            x = torch.stack(batch).to(self.device)
            return self.model(x).argmax(dim=1).tolist()


def load_and_preprocess(path: str) -> torch.Tensor:
    # CPU-bound preprocessing, runs in parallel with GPU inference
    img = load_image(path)
    return preprocess(img)


predictions = (
    Pipeline(image_paths)
    .task(load_and_preprocess, count=4)   # 4 CPU workers loading/preprocessing
    .batch(64)                             # feed GPU in batches of 64
    .class_task(Classifier, Classifier.predict, ["cuda"])  # 1 GPU worker
    .explode(lambda x: x)                 # flatten back to individual predictions
    .wait_for_result()
)
```

> **Note:** Olympipe detects torch at startup and automatically uses `spawn` context instead of `fork`, preventing CUDA deadlocks. Set `OLYMPIPE_FORCE_FORK=1` to override.

---

## Stateful workers

Use `.class_task()` when each worker needs persistent state (model weights, DB connection, accumulator…). The class is instantiated once per worker.

```python
from olympipe import Pipeline

class RunningStats:
    def __init__(self):
        self.n = 0
        self.total = 0.0

    def update(self, x: float) -> dict:
        self.n += 1
        self.total += x
        return {"n": self.n, "mean": self.total / self.n}

results = Pipeline(sensor_stream).class_task(RunningStats, RunningStats.update).wait_for_result()
```

---

## Split & gather

Branch your pipeline into independent streams and merge them back.

```python
from typing import Optional, Tuple
from olympipe import Pipeline

def route(x: int) -> Tuple[Optional[int], Optional[int]]:
    return (x, None) if x % 2 == 0 else (None, x)

evens, odds = Pipeline(range(20)).split(route, n=2)

results = (
    evens.task(lambda x: x * 10)          # multiply evens
         .gather(odds.task(lambda x: -x)) # negate odds, then merge
         .wait_for_result()
)
```

---

## HTTP server pipeline

Turn an HTTP endpoint into a pipeline source. Each incoming request becomes a packet; downstream tasks process and respond.

```python
import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response

def handle_request(pair):
    conn: socket.socket
    data: dict
    conn, data = pair

    result = heavy_computation(data["payload"])
    send_json_response(conn, {"result": result})
    return result

Pipeline.server(
    [("POST", "/compute", lambda body: body)],
    port=8000,
    inactivity_timeout=60.0,
).task(handle_request, count=4).wait_for_completion()
```

---

## LLM inference server (HuggingFace)

Serve a HuggingFace model as an HTTP API. The model loads once per worker process; requests are batched and processed in parallel while the server stays responsive.

```python
import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response


class LLMWorker:
    """Loaded once per worker — keeps the model in GPU memory across requests."""

    def __init__(self, model_name: str = "mistralai/Mistral-7B-Instruct-v0.3"):
        from transformers import pipeline as hf_pipeline

        self.pipe = hf_pipeline(
            "text-generation",
            model=model_name,
            device_map="auto",      # spreads across available GPUs
            torch_dtype="auto",
        )

    def generate(self, pair: tuple) -> tuple:
        conn: socket.socket
        data: dict
        conn, data = pair

        prompt = data.get("prompt", "")
        max_new_tokens = data.get("max_new_tokens", 256)

        output = self.pipe(prompt, max_new_tokens=max_new_tokens, do_sample=False)
        generated = output[0]["generated_text"][len(prompt):]

        send_json_response(conn, {"response": generated, "model": self.pipe.model.name_or_path})
        return {"prompt": prompt, "response": generated}


Pipeline.server(
    [("POST", "/generate", lambda body: body)],
    port=8000,
).class_task(LLMWorker, LLMWorker.generate, ["mistralai/Mistral-7B-Instruct-v0.3"]).wait_for_completion()
```

Call it:
```bash
curl -X POST http://localhost:8000/generate \
     -H "Content-Type: application/json" \
     -d '{"prompt": "Explain transformers in one sentence:", "max_new_tokens": 128}'
```

> **Tip:** Scale to multiple GPUs by passing `count=N` to `.class_task()` — each worker gets its own model replica on a separate device.

---

## Step caching

Cache intermediate results to disk. Reruns skip already-computed steps automatically — great for iterating on the end of a slow pipeline.

```python
import tempfile
from olympipe import Pipeline

def expensive_step_1(x: int) -> int:
    time.sleep(1)  # simulate heavy computation
    return x ** 2

def expensive_step_2(x: int) -> int:
    time.sleep(1)
    return x + 1

with tempfile.TemporaryDirectory() as cache_dir:
    # First run: computes everything, writes .pkl files
    results = (
        Pipeline(range(100))
        .cached_task(expensive_step_1, cache_dir=cache_dir)
        .cached_task(expensive_step_2, cache_dir=cache_dir)
        .uncache()
        .wait_for_result()
    )

    # Second run: instant — reads from disk, skips all computation
    results_again = (
        Pipeline(range(100))
        .cached_task(expensive_step_1, cache_dir=cache_dir)
        .cached_task(expensive_step_2, cache_dir=cache_dir)
        .uncache()
        .wait_for_result()
    )
```

---

## Temporal batching (streams)

Group items arriving within a time window — ideal for audio frames, sensor feeds, or any real-time stream.

```python
import time
from olympipe import Pipeline

def slow_producer(x: int) -> int:
    time.sleep(0.05)
    return x

results = (
    Pipeline(range(100))
    .task(slow_producer)
    .temporal_batch(0.5)     # collect items for 500ms, then emit as a list
    .task(lambda batch: sum(batch))
    .wait_for_result()
)
```

---

## API reference

| Method | Description |
|---|---|
| `.task(fn, count=1)` | Apply `fn` to each item; `count` parallel workers |
| `.filter(fn=None)` | Keep items where `fn(x)` is truthy (or non-`None`) |
| `.batch(n, complete=True)` | Group into lists of size `n` |
| `.explode(fn)` | Flatten: one item → many |
| `.split(fn, n=2)` | Route items to `n` independent branches |
| `.gather(*pipes)` | Merge multiple pipelines into one |
| `.reduce(acc, fn)` | Fold stream into a single accumulated value |
| `.temporal_batch(s)` | Group items arriving within `s` seconds |
| `.cached_task(fn, cache_dir=…)` | Compute + persist to disk; skip on re-run |
| `.class_task(Cls, Cls.method)` | Stateful per-worker instance |
| `.timeout(s)` | Abort if no item arrives within `s` seconds |
| `.limit(n)` | Stop after `n` items pass through |
| `.debug()` | Print each item as it passes (inspect mid-pipeline) |
| `Pipeline.server(routes, port=…)` | HTTP server as a pipeline source |
| `.wait_for_result()` | Block and collect all results as a list |
| `.wait_for_completion()` | Block until the pipeline finishes (discard output) |
| `.wait_and_reduce(acc, fn)` | Combine `reduce` + `wait_for_result` in one call |

