Metadata-Version: 2.4
Name: quent
Version: 4.0.0
Summary: Fluent chain interface for Python with transparent sync/async handling. Write pipeline code once -- Quent automatically detects coroutines and handles them. Pure Python, zero dependencies.
Author-email: Ohad Drukman <drukmano@icloud.com>
License-Expression: MIT
Project-URL: Documentation, https://quent.readthedocs.io
Project-URL: Repository, https://github.com/drukmano/quent
Project-URL: Changelog, https://github.com/drukmano/quent/releases
Project-URL: Issues, https://github.com/drukmano/quent/issues
Keywords: chain,chaining,pipeline,fluent,pipe,async,sync,asyncio,coroutine,await,pure-python,performance,function-composition,middleware
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Framework :: AsyncIO
Classifier: Topic :: Software Development
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Dynamic: license-file

<p align="center">
  <br>
  <strong>quent</strong>
  <br>
  <em>Write it once. Run it sync or async.</em>
  <br><br>
  <a href="https://pypi.org/project/quent/"><img src="https://img.shields.io/pypi/v/quent" alt="PyPI"></a>
  <a href="https://pypi.org/project/quent/"><img src="https://img.shields.io/pypi/pyversions/quent" alt="Python"></a>
  <a href="https://github.com/drukmano/quent/blob/master/LICENSE"><img src="https://img.shields.io/pypi/l/quent" alt="License"></a>
  <a href="https://github.com/drukmano/quent/actions/workflows/ci.yml"><img src="https://github.com/drukmano/quent/actions/workflows/ci.yml/badge.svg" alt="CI"></a>
  <img src="https://img.shields.io/badge/coverage-98%25-brightgreen" alt="Coverage">
  <img src="https://img.shields.io/badge/typed-PEP%20561-blue" alt="Typed">
</p>

A transparent sync/async bridge for Python. Define a pipeline once and run it with sync callables, async callables, or any mix of both -- quent detects awaitables at runtime and handles the transition automatically. Pure Python, zero dependencies.

---

## Why quent?

### The Problem

Processing pipelines start simple but quickly become hard to read:

```python
# Nested calls -- read inside-out, hard to follow
result = send_data(normalize_data(validate_data(fetch_data(id))))

# Intermediate variables -- verbose and repetitive
data = fetch_data(id)
data = validate_data(data)
data = normalize_data(data)
result = send_data(data)

# Async makes it worse -- now you maintain two versions
data = await fetch_data(id)
data = validate_data(data)
data = await normalize_data(data)
result = await send_data(data)
```

And the real cost: if you need to support both sync and async callers, you write the same logic twice. Two functions. Same steps. They drift apart. A bug fix in one gets forgotten in the other.

### The Solution

```python
from quent import Chain

pipeline = (
    Chain()
    .then(fetch_data)
    .then(validate_data)
    .then(normalize_data)
    .then(send_data)
)

result = pipeline.run(id)            # sync caller
result = await pipeline.run(id)      # async caller
```

One definition. Both worlds. quent inspects each return value at runtime -- if it's an awaitable, execution seamlessly transitions to async mode. No annotations, no wrappers, no ceremony.

Write your code once, use it everywhere:

```python
def process_data(data_source):
    """Works with both sync and async data sources."""
    return (
        Chain(data_source.fetch)
        .then(validate)
        .then(transform)
        .then(data_source.save)
        .run()
    )

result = process_data(sync_database)          # returns a value
result = await process_data(async_database)   # returns a coroutine
```

## Installation

```bash
pip install quent
```

Requires Python 3.10+. Zero runtime dependencies.

## Quick Start

```python
from quent import Chain

# Build a pipeline with .then() and .do()
result = Chain(42).then(lambda x: x * 2).do(print).then(str).run()
#> 84
#> '84'

# Collection operations
result = Chain([1, 2, 3, 4, 5]).filter(lambda x: x % 2 == 0).map(lambda x: x ** 2).run()
#> [4, 16]

# Async transparency -- same chain, sync or async callables
chain = Chain().then(fetch_user).then(serialize)
result = chain.run(user_id)          # returns value if fetch_user is sync
result = await chain.run(user_id)    # returns coroutine if fetch_user is async
```

## Features

### Pipeline Building

All pipeline methods return `self` for fluent chaining.

```python
Chain(data).then(transform).do(log).then(save).run()
```

- **`.then(v)`** -- result replaces the current pipeline value
- **`.do(fn)`** -- side-effect; result is discarded, current value passes through
- **`.map(fn)`** -- apply `fn` to each element, collect results into a list
- **`.foreach(fn)`** -- apply `fn` to each element as side-effect, keep originals
- **`.filter(fn)`** -- keep elements where `fn` returns truthy

### Concurrent Execution

Run multiple functions on the current value in parallel. If any returns an awaitable, all are gathered concurrently via `asyncio.gather`.

```python
results = Chain(data).gather(validate, enrich, score).run()
```

### Context Managers

Works with both sync and async context managers.

```python
content = Chain("data.txt").then(open).with_(lambda f: f.read()).run()
```

- **`.with_(fn)`** -- enter current value as context manager, call `fn` with the context value
- **`.with_do(fn)`** -- same, but `fn`'s result is discarded

### Conditional Logic

```python
result = (
    Chain(value)
    .if_(lambda x: x > 0, then=process_positive)
    .else_(process_negative)
    .run()
)
```

### Error Handling and Retry

One exception handler and one finally handler per chain. Retry re-executes the entire chain from scratch.

```python
result = (
    Chain(url)
    .then(fetch)
    .then(parse)
    .except_(handle_error, exceptions=ConnectionError)
    .finally_(cleanup)
    .retry(max_attempts=3, backoff=lambda attempt: 2 ** attempt)
    .run()
)
```

### Control Flow

Early return and iteration break, usable inside any step.

```python
Chain.return_(value)  # exit the chain early with a value
Chain.break_(value)   # break from a map/foreach/filter iteration
```

### Iteration

Yields each element of the chain's output. Supports both `for` and `async for`.

```python
for item in Chain(fetch_all).iterate():
    process(item)

async for item in Chain(fetch_all).iterate():
    await process(item)
```

### Reusable Chains

**Clone** a chain to create independent copies for fork-and-extend patterns:

```python
base = Chain().then(validate).then(normalize)

for_api = base.clone().then(serialize_json)
for_db = base.clone().then(serialize_sql)
```

**Nest** chains inside other chains -- inner chains execute as a single step:

```python
validate_chain = Chain().then(check_schema).then(check_permissions)
pipeline = Chain(request).then(validate_chain).then(handle).run()
```

**Wrap** a chain as a function **decorator**:

```python
pipeline = Chain().then(validate).then(transform)

@pipeline.decorator()
def handle(request):
    return parse(request)
```

## Enhanced Tracebacks

When an exception occurs inside a chain, quent injects a visualization into the traceback showing the full pipeline and exactly which step failed:

```
Traceback (most recent call last):
  File "example.py", line 28, in <module>
    .run()
     ^^^^^
  File "<quent>", line 1, in
    Chain(fetch_data, 42) = {'id': 42, 'value': 100}
    .then(validate) <----
    .then(transform)
    .then(save)
  File "example.py", line 11, in validate
    raise ValueError("Value too large")
ValueError: Value too large
```

The `<----` marker points to the step that raised the exception. Intermediate values are shown next to each step so you can trace exactly what data flowed through the pipeline. Internal quent frames are automatically cleaned from the traceback.

Set `QUENT_NO_TRACEBACK=1` to disable.

## Real-World Example

A Redis pipeline wrapper that works identically with both `redis` (sync) and `redis.asyncio` (async):

```python
class RedisBatch:
    def __init__(self, r, transaction=False):
        self.r = r
        self.transaction = transaction
        self.operations = []

    def add(self, op):
        self.operations.append(op)

    def flush(self):
        pipe = self.r.pipeline(transaction=self.transaction)
        for op in self.operations:
            op(pipe)
        return (
            Chain(pipe.execute)
            .then(self.process_results)
            .finally_(pipe.reset, ...)  # always reset; ... means "call with no args"
            .run()
        )
```

The same `flush()` method works whether `self.r` is a sync `redis.Redis` client or an async `redis.asyncio.Redis` client. No `if`/`else`, no duplication, no `async def` variant. The `...` (Ellipsis) in `.finally_(pipe.reset, ...)` tells quent to call `pipe.reset()` with no arguments, overriding the default behavior of passing the current value.

## Documentation

Full documentation is available at [quent.readthedocs.io](https://quent.readthedocs.io).

- [Quickstart](https://quent.readthedocs.io/en/latest/quickstart.html)
- [API Reference](https://quent.readthedocs.io/en/latest/api.html)
- [Changelog](https://github.com/drukmano/quent/releases)

## License

MIT -- Copyright (c) 2023 Ohad Drukman
