Metadata-Version: 2.1
Name: concurra
Version: 1.1.7
Summary: Concurra — Structured concurrency, effortless parallelism, built-in dependency management
Home-page: https://github.com/Concurra/concurra
Author: Sahil Pardeshi
Author-email: parallelexecute@gmail.com
License: MIT
Classifier: Development Status :: 5 - Production/Stable
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Operating System :: OS Independent
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: System Administrators
Classifier: Intended Audience :: Information Technology
Classifier: Intended Audience :: Science/Research
Classifier: Intended Audience :: Education
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Software Development :: Build Tools
Classifier: Topic :: System :: Distributed Computing
Classifier: Topic :: Utilities
Classifier: Topic :: Scientific/Engineering
Requires-Python: >=3.6
Description-Content-Type: text/markdown
Provides-Extra: dev
License-File: LICENSE

<div align="center">
  <a href="https://pypi.org/project/concurra/">
    <img src="https://github.com/Concurra/concurra/blob/main/docs/concurra_logo.png?raw=true" alt="Concurra" width="300">
  </a>
  <div>
    <em>Concurra — Structured concurrency, effortless parallelism, built-in dependency management</em>
    <br>
    <br>
  </div>

  <!-- Badges -->
  <a href="https://github.com/Concurra/concurra/actions/workflows/python-tests.yml" target="_blank">
    <img src="https://github.com/Concurra/concurra/actions/workflows/python-tests.yml/badge.svg?event=push&branch=main" alt="Test">
  </a>
  <a href="https://github.com/Concurra/concurra/blob/main/LICENSE" target="_blank">
    <img src="https://img.shields.io/github/license/Concurra/concurra.svg" alt="License">
  </a>
  <a href="https://concurra.readthedocs.io/en/latest/" target="_blank">
    <img src="https://readthedocs.org/projects/concurra/badge/?version=latest" alt="Documentation Status">
  </a>
  <a href="https://pepy.tech/projects/concurra" target="_blank">
    <img src="https://static.pepy.tech/badge/concurra" alt="PyPI Downloads">
  </a>
  <a href="https://pypi.org/project/concurra" target="_blank">
    <img src="https://img.shields.io/pypi/v/concurra?color=%2334D058&label=pypi%20package" alt="Package version">
  </a>
  <a href="https://pypi.org/project/concurra" target="_blank">
    <img src="https://img.shields.io/pypi/pyversions/concurra.svg?color=%2334D058" alt="Supported Python versions">
  </a>
</div>

---

**Concurra** is a lightweight Python library for **concurrent and parallel task execution**, built to simplify the orchestration of complex workflows.
It provides a high-level interface for running tasks using threads or processes, while automatically handling **dependencies, timeouts, errors,** and **fast-fail** behavior.

With built-in support for **dependency management**, you can define execution chains where tasks wait for others to finish—allowing for flexible and safe coordination across multiple workers.
Whether you're handling I/O-bound or CPU-bound operations, Concurra helps you manage concurrency with minimal boilerplate.

---

# 🚀 Features

- ✅ **Simple API**: Add tasks and execute them in parallel with minimal setup.
- 🔀 **Parallel Task Execution**: Run multiple tasks concurrently using threading or multiprocessing.
- 🔗 **Dependency Management**: Define task dependencies (a DAG) to ensure correct execution order across complex pipelines.
- 🔁 **Cycle & Validation Checks**: Circular dependencies (of any length) and unknown dependency labels are detected and rejected.
- 💥 **Fast Fail Support**: Stop all remaining tasks as soon as one fails (optional).
- ⚠️ **Error Handling**: Automatically captures exceptions and tracebacks, with optional logging.
- 📊 **Progress & Status Tracking**: Track execution status and view structured results.
- 🪄 **Background Execution**: Run tasks asynchronously and fetch results later.
- 🧠 **Multiprocessing Support**: Bypass the GIL for CPU-bound tasks using true parallelism.
- 🛟 **Pickle-safe Multiprocessing**: Unpicklable tasks fail with a clear message, or optionally fall back to a thread.
- 🛑 **Abort Support**: Gracefully abort background task execution.
- ⏱️ **Per-task Timeouts**: Set a timeout that applies to each task individually.

---

## ❓ Why Not Use Native Threading or Multiprocessing?

Python offers several ways to run tasks concurrently — `threading`, `multiprocessing`, `asyncio`, and executors like `ThreadPoolExecutor`. These are powerful tools, but they come with steep learning curves, hidden complexities, and minimal guardrails — especially when managing multiple interdependent tasks.

**Concurra** builds on top of these foundations to provide a **clean, opinionated abstraction** that simplifies concurrent execution, **dependency management**, and runtime control — so you can focus on *what* to execute rather than *how*. Acting as a smart orchestration layer, Concurra emphasizes **safe**, **structured**, and **configurable concurrency**, enabling developers to build **reliable task pipelines** without reinventing the wheel.

Concurra models task dependencies using principles of a **Directed Acyclic Graph (DAG)**. Each task declares its dependencies, and Concurra ensures correct execution order by resolving these relationships dynamically at runtime.

| Challenge Using Native APIs                       | How Concurra Solves It                                     |
| ------------------------------------------------- | ---------------------------------------------------------- |
| Setting up thread/process pools                   | ✅ Built-in with `max_concurrency`, no boilerplate          |
| Handling exceptions from worker threads/processes | ✅ Automatically captured, logged, and available in results |
| Task identification                               | ✅ Assign unique labels for tracking and debugging          |
| Coordinating dependent tasks                      | ✅ Declarative `depends_on`, resolved as a DAG              |
| Terminating long-running or stuck tasks           | ✅ Built-in per-task `timeout` and `abort()` support        |
| Ensuring a task runner is only used once          | ✅ Enforced internally—no accidental re-use                 |
| Progress logging                                  | ✅ Automatic progress display and task status updates       |
| Fast fail if a task breaks                        | ✅ Opt-in `fast_fail` support for early termination         |
| Safe background execution                         | ✅ `execute_in_background()` and `get_background_results()` |
| Verifying task success                            | ✅ One-call `verify()` to ensure everything worked          |
| Preventing duplicate task labels                  | ✅ Built-in validation                                      |

---

## Why Developers Love Concurra
- ***Fewer bugs:*** No manual thread/process management.
- ***More control:*** Configure concurrency, fast-fail, timeout, and logging easily.
- ***Safer pipelines:*** Tasks execute only when dependencies are met.
- ***Better visibility:*** Structured results help with monitoring and debugging.
- ***Great for pipelines:*** Ideal for data processing, test automation, ETL jobs, and more.

Whether you're running 3 tasks or 300, Concurra gives you composability, clarity, and control—all while making concurrent execution feel intuitive and safe.

---

# 📦 Installation

```bash
pip install concurra
```

---

# 🚀 Quick Start
Run your first parallel tasks in under a minute with Concurra.

***🧱 Step 1: Create a `TaskRunner`***

```python
import concurra

runner = concurra.TaskRunner(max_concurrency=2)
```

***➕ Step 2: Add your tasks***

```python
def say_hello():
    return "Hello World"

def say_universe():
    return "Hello Universe"

runner.add_task(say_hello, label="greet_world")
runner.add_task(say_universe, label="greet_universe")
```

***▶️ Step 3: Run tasks and collect results***

```python
results = runner.run()
print(results["greet_world"]["result"])     # "Hello World"
print(results["greet_universe"]["result"])  # "Hello Universe"
```

***⚠️ Important Notes:***
- A `TaskRunner` object can be run **only once**.
- Once `run()` or `execute_in_background()` is called, you cannot add more tasks.
- For a new batch of parallel tasks, create a new `TaskRunner` object.

---

# 🧠 Core Concepts

### Execution model
Concurra keeps tasks in a pending queue, starts only tasks whose dependencies are ready, runs them through thread or process workers, and records structured results.

![Concurra execution model](https://raw.githubusercontent.com/Concurra/concurra/private/fix-dependency-validation/docs/diagrams/execution-model.png)

### Threads vs. Processes
By default Concurra runs tasks in **threads**, which is ideal for **I/O-bound** work (network calls, disk, subprocess waits). For **CPU-bound** work, set `use_multiprocessing=True` to run tasks in separate **processes** and bypass the GIL.

```python
runner = concurra.TaskRunner(use_multiprocessing=True)  # CPU-bound
```

### Multiprocessing requires picklable tasks
When `use_multiprocessing=True`, each task's function and its arguments must be **picklable**, because they are sent to a worker process. This means:

- ✅ Use **top-level (module-level) functions**.
- ❌ Avoid lambdas, locally-defined (nested) functions, and unpicklable arguments (e.g., open file handles, locks).

If a task is not picklable, Concurra **fails that task with a clear error** while letting the remaining tasks run (unless `fast_fail=True`, in which case a pickle failure — like any other failure — terminates the rest). You can instead opt in to a thread fallback:

```python
runner = concurra.TaskRunner(
    use_multiprocessing=True,
    fallback_to_thread_on_pickle_error=True,  # run unpicklable tasks in a thread
)
```

> ⚠️ **Thread-fallback tradeoff.** A fallback task does **not** run in a separate process — it runs in the **parent process in a thread**. That means it:
> - runs under the **GIL** (no true CPU parallelism, so CPU-bound fallback tasks won't speed up), and
> - **shares the parent's memory** — it can read and mutate global/shared state, and an unstable C-extension can affect the parent process.
>
> Use it as a convenience escape hatch for unpicklable tasks, not as a substitute for real process isolation. The task's result reports `execution_mode == "thread_fallback"` and a `warning`.

> ℹ️ The result for each task includes an `execution_mode` field (`"process"`, `"thread"`, or `"thread_fallback"`) so you always know how it ran.

![Concurra execution modes and thread fallback](https://raw.githubusercontent.com/Concurra/concurra/private/fix-dependency-validation/docs/diagrams/execution-modes.png)

### Termination semantics
- **Processes** (`use_multiprocessing=True`) can be **force-terminated** on timeout or `abort()`.
- **Threads** cannot be forcibly killed in Python. On timeout/abort a threaded task is **marked `Terminated`** and its result is protected from being overwritten, but the underlying thread continues running to completion in the background. Use multiprocessing if you need hard termination.

---

# 📖 API Reference

### ⚙️ `TaskRunner(...)`

```python
runner = concurra.TaskRunner(
    max_concurrency=4,
    name="MyRunner",
    timeout=10,
    progress_stats=True,
    fast_fail=False,
    use_multiprocessing=False,
    logger=my_logger,
    log_errors=False,
    fallback_to_thread_on_pickle_error=False,
)
```

| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `max_concurrency` | int | Maximum number of tasks to run in parallel. Defaults to `os.cpu_count()`. Values `< 1` are coerced to `1`. |
| `name` | str | Name for the runner, used in logs and progress output. Defaults to `"TaskRunner"`. |
| `timeout` | float | Maximum duration **per task** (seconds). Tasks exceeding it are terminated. `None` means no timeout. |
| `progress_stats` | bool | Whether to log progress statistics. Defaults to `True`. |
| `fast_fail` | bool | If `True`, execution halts and remaining tasks are terminated as soon as any task fails or times out. |
| `use_multiprocessing` | bool | Use processes instead of threads. Recommended for CPU-bound tasks. |
| `logger` | logging.Logger | Custom logger. If not provided, a default module logger is used. |
| `log_errors` | bool | Whether to log task exceptions/tracebacks as they occur. |
| `fallback_to_thread_on_pickle_error` | bool | When multiprocessing is enabled, run unpicklable tasks in a thread instead of marking them failed. |

---

### ➕ `add_task(task, *args, label=None, depends_on=None, **kwargs)`

Queue a callable to run, with optional positional/keyword arguments, a unique label, and dependencies.

```python
runner.add_task(some_function, arg1, arg2, label="task1", kwarg1=value1)
runner.add_task(other_function, label="task2", depends_on=["task1"])
```

| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `task` | callable | The function to execute. |
| `*args` | any | Positional arguments forwarded to `task`. |
| `label` | hashable | Unique identifier for the task. If omitted, the task's numeric index is used. |
| `depends_on` | list | Labels this task depends on. It runs only after all of them complete successfully. |
| `**kwargs` | any | Keyword arguments forwarded to `task`. |

***📝 Notes***
- Labels must be unique per runner; reusing a label raises `ValueError`.
- A task cannot depend on itself.
- Circular dependencies of **any length** (e.g. A → B → C → A) are detected and raise `ValueError`.
- Unknown dependency labels are detected when execution starts and raise `ValueError`.
- Dependencies may be declared **before** the task they reference is added (forward references are allowed).

---

### ➕ Convenience adders

Concurra provides a few helpers around `add_task`:

```python
# add_func: like add_task, but the label comes from the reserved `key` kwarg
runner.add_func(my_func, arg1, key="task1", some_kwarg=1)

# add_function: explicit args/kwargs containers (good when a task argument
# happens to be named "label" or "depends_on")
runner.add_function(my_func, args=(1, 2), kwargs={"x": 3}, key="task1", depends_on=["task0"])

# add_work: add many tasks at once from a list of tuples
runner.add_work([
    (func_a,),                          # (func,)
    (func_b, (1, 2)),                   # (func, args)
    (func_c, (1,), {"x": 2}),           # (func, args, kwargs)
    (func_d, (), {}, "label_d"),        # (func, args, kwargs, label)
])
```

***📝 Notes***
- In `add_func`, `key` is **reserved** as the label and is **not** forwarded to the function. If your function needs a parameter literally named `key`, use `add_function(func, kwargs={"key": ...}, key=label)` or `add_task` instead.
- `add_func`/`add_work`/`add_function` forward all other keyword arguments (including ones named `label` or `depends_on`) to your task, so they never collide with framework parameters.
- In `add_work`, each item must be a tuple of 1–4 elements; `args` must be an iterable and `kwargs` a mapping, otherwise a clear `TypeError`/`ValueError` is raised.

---

### 🏃 `run(verify=True, raise_exception=False, error_message=None)` / `execute(...)`

Execute all tasks, block until completion, and return the results dictionary. `execute(...)` is an alias of `run(...)`.

| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `verify` | bool | After execution, check whether all tasks succeeded and log a report. |
| `raise_exception` | bool | If `True`, raise an `Exception` when any task failed. If `False`, failures are logged only. |
| `error_message` | str | Custom message included in the raised exception/log report. |

```python
results = runner.run(verify=True, raise_exception=True, error_message="Pipeline failed")
```

---

### 🎯 `execute_in_background()`

Start execution on a background thread and return immediately (non-blocking). No new tasks can be added afterward. Use `get_background_results()` to collect results.

```python
runner.execute_in_background()
# ... do other work ...
```

---

### 📥 `get_background_results(verify=True, raise_exception=False, error_message=None)`

Block until background execution finishes and return the results. Same parameters as `run()`. Calling this without `execute_in_background()` first will error.

```python
runner.execute_in_background()
results = runner.get_background_results()   # blocks until done; no manual polling needed
```

---

### 🟢 `is_running()`

Indicates whether the background runner is still **live** — i.e. it has been started with `execute_in_background()` and has not yet been finalized by `get_background_results()` or `abort()`.

> ⚠️ **Do not poll `is_running()` to detect completion.** It does **not** automatically flip to `False` when the last task finishes; it stays `True` until you call `get_background_results()` (or `abort()`). To wait for completion, simply call `get_background_results()` — it blocks until everything is done. To check how many tasks are currently executing, use `get_active_runner_count()`.

```python
runner.execute_in_background()
# ... do other work ...
results = runner.get_background_results()   # blocks until done — no polling needed
```

---

### ⛔ `abort()`

Stop execution and return the full results dict. Any task that hasn't already completed — whether running or still pending — is marked `Terminated` (running **processes** are force-terminated; threaded tasks are marked `Terminated` but cannot be force-killed, see [Termination semantics](#termination-semantics)). Use after `execute_in_background()` to cancel before completion.

```python
runner.execute_in_background()
results = runner.abort()
```

---

### ✅ `verify(raise_exception=False, error_message=None)`

Print/log a status report of all tasks and optionally raise if any failed. Raises if called while execution is still in progress.

---

# 📦 Result Structure

`run()`, `execute()`, `get_background_results()`, and `abort()` return a dict keyed by task label. Each entry has the following fields:

| Field | Type | Description |
| ----- | ---- | ----------- |
| `task_name` | str | The function's name. |
| `start_time` | str \| None | Start timestamp, `"YYYY-MM-DD HH:MM:SS"`. |
| `end_time` | str \| None | End timestamp. |
| `duration` | str | Human-readable duration, e.g. `"1.01 sec"` or `"2.5 min"`. |
| `duration_seconds` | float | Duration in seconds. |
| `result` | any | The task's return value (`None` if it failed or was terminated). |
| `output` | any | Alias of `result` (kept for backward compatibility). |
| `error` | str \| None | `"ExcType: message"` if the task failed/terminated, else `None`. |
| `trace` | str \| None | Full traceback string when available. |
| `status` | str | One of `"Successful"`, `"Failed"`, `"Terminated"`. |
| `has_failed` | bool | `True` for failed or terminated tasks. |
| `execution_mode` | str \| None | `"thread"`, `"process"`, `"thread_fallback"`, or `None` if the task never started. |
| `warning` | str \| None | Diagnostic note (e.g. set when a task ran via thread fallback). |

### Task lifecycle
Every task starts in the pending queue and eventually records a terminal status. Successful tasks return normally, failed tasks capture an exception and traceback, and terminated tasks include timeouts, aborts, fast-fail cascades, and dependency skips.

![Concurra task lifecycle and status](https://raw.githubusercontent.com/Concurra/concurra/private/fix-dependency-validation/docs/diagrams/task-lifecycle.png)

---

# ✅ Example: All Tasks Pass

```python
import concurra
import json
import time

def square(x):
    time.sleep(1)
    return x * x

def divide(x, y):
    return x / y

runner = concurra.TaskRunner(max_concurrency=4)

runner.add_task(square, 4, label="square_4")
runner.add_task(square, 5, label="square_5")
runner.add_task(divide, 10, 2, label="divide_10_2")

results = runner.run()
print(json.dumps(results, indent=4, default=str))
```

***Console Output:***

```
INFO:concurra.core:TaskRunner progress: [########.................] 1/3 [33.33%] in 0 min 0.0 sec
INFO:concurra.core:TaskRunner progress: [#################........] 2/3 [66.67%] in 0 min 1.04 sec
INFO:concurra.core:TaskRunner progress: [#########################] 3/3 [100.0%] in 0 min 1.04 sec
INFO:concurra.core:
+-------------+--------+------------+------------+
| label       | task   | status     | duration   |
|-------------+--------+------------+------------|
| square_4    | square | Successful | 1.01 sec   |
| square_5    | square | Successful | 1.01 sec   |
| divide_10_2 | divide | Successful | 0.0 sec    |
+-------------+--------+------------+------------+
```

***Result for `square_4`:***

```json
{
    "task_name": "square",
    "start_time": "2026-04-12 00:46:54",
    "end_time": "2026-04-12 00:46:55",
    "duration": "1.01 sec",
    "duration_seconds": 1.01,
    "result": 16,
    "error": null,
    "trace": null,
    "status": "Successful",
    "has_failed": false,
    "output": 16,
    "execution_mode": "thread",
    "warning": null
}
```

---

# ❌ Example: Partial Failures

```python
import concurra

def square(x):
    return x * x

def divide(x, y):
    return x / y

runner = concurra.TaskRunner(max_concurrency=4)

runner.add_task(square, 4, label="square_4")
runner.add_task(divide, 10, 2, label="divide_10_2")
runner.add_task(divide, 10, 0, label="divide_by_zero")  # This will fail

results = runner.run(verify=True)            # logs a failure report
print(results["divide_by_zero"]["status"])   # "Failed"
print(results["divide_by_zero"]["error"])    # "ZeroDivisionError: division by zero"
```

***Result for `divide_by_zero` (abridged):***

```json
{
    "task_name": "divide",
    "result": null,
    "error": "ZeroDivisionError: division by zero",
    "trace": "Traceback (most recent call last): ...",
    "status": "Failed",
    "has_failed": true,
    "output": null,
    "execution_mode": "thread",
    "warning": null
}
```

> To turn failures into a raised exception, pass `run(raise_exception=True)`.

---

# ⛔ Example: Fast-Fail

When `fast_fail=True`, the runner terminates all other tasks as soon as one fails (or times out).

```python
import concurra
import time

def will_fail():
    raise RuntimeError("Oops!")

def will_succeed():
    time.sleep(2)
    return "Success"

runner = concurra.TaskRunner(fast_fail=True, max_concurrency=2)
runner.add_task(will_succeed, label="ok")
runner.add_task(will_fail, label="fail")
results = runner.run(verify=False)

print(results["fail"]["status"])  # "Failed"
print(results["ok"]["status"])    # "Terminated"
print(results["ok"]["error"])     # "RuntimeError: Execution terminated before completion"
```

> Because `ok` runs in a thread, it is **marked** `Terminated` immediately; the underlying thread cannot be force-killed and finishes in the background. Use `use_multiprocessing=True` for hard termination.

---

# ⏱️ Example: Per-task Timeout

`timeout` applies to **each task individually**. A task that runs longer is terminated and reported as `Terminated`.

```python
import concurra
import time

def slow():
    time.sleep(15)
    return "Done"

runner = concurra.TaskRunner(timeout=4)
runner.add_task(slow, label="timeout_task")
results = runner.run(verify=False)

print(results["timeout_task"]["status"])  # "Terminated"
print(results["timeout_task"]["error"])   # "TimeoutError: Task exceeded timeout of 4 seconds"
```

---

# 🧩 Example: Multiprocessing (CPU-bound)

```python
import concurra

def heavy(n):  # must be a top-level, picklable function
    total = 0
    for i in range(n):
        total += i * i
    return total

if __name__ == "__main__":          # required for spawn (macOS/Windows)
    runner = concurra.TaskRunner(max_concurrency=4, use_multiprocessing=True)
    runner.add_task(heavy, 10_000_000, label="job1")
    runner.add_task(heavy, 20_000_000, label="job2")

    results = runner.run()
    print(results["job1"]["execution_mode"])  # "process"
```

> Remember: with `use_multiprocessing=True`, functions must be **top-level** and arguments must be **picklable**, and the runner must be created/executed inside an `if __name__ == "__main__":` guard on platforms that use the **spawn** start method (macOS and Windows). Set `fallback_to_thread_on_pickle_error=True` to run unpicklable tasks in a thread instead of failing them (see the caveat in [Multiprocessing requires picklable tasks](#multiprocessing-requires-picklable-tasks)).

---

# 🛑 Example: Abort Background Execution

```python
import concurra
import time

def long_task():
    time.sleep(30)

runner = concurra.TaskRunner()
runner.add_task(long_task, label="job")
runner.execute_in_background()

time.sleep(1)
results = runner.abort()
print(results["job"]["status"])  # "Terminated"
```

---

# 🔗 Example: Dependent Tasks (Pipelines)

Concurra resolves `depends_on` relationships as a DAG, running independent tasks in parallel and dependent tasks only after their prerequisites finish.

> ℹ️ **Ordering only — not data piping.** `depends_on` controls *when* a task runs, not *what* it receives. Concurra does **not** automatically feed a dependency's return value into a dependent task; you pass each task's arguments explicitly (as shown below). If a downstream task needs an upstream result, read it from the returned results dict after the run, or have the upstream task persist it somewhere the downstream task can fetch.

```python
import time
import concurra

def receive_order(order_id):
    time.sleep(1)
    return f"received {order_id}"

def validate_payment(order_id):
    time.sleep(1)
    return f"payment validated for {order_id}"

def check_inventory(order_id):
    time.sleep(1)
    return f"inventory reserved for {order_id}"

def pack_order(order_id):
    return f"packed {order_id}"

def ship_order(order_id):
    return f"shipped {order_id}"

def send_confirmation(order_id):
    return f"confirmation sent for {order_id}"

runner = concurra.TaskRunner(max_concurrency=3)
order_id = "ORD-1001"

# Receive the order first, then run payment and inventory checks in parallel.
runner.add_task(receive_order, order_id, label="receive_order")
runner.add_task(validate_payment, order_id, label="validate_payment",
                depends_on=["receive_order"])
runner.add_task(check_inventory, order_id, label="check_inventory",
                depends_on=["receive_order"])

# Fulfillment waits for both checks before packing, shipping, and notifying.
runner.add_task(pack_order, order_id, label="pack_order",
                depends_on=["validate_payment", "check_inventory"])
runner.add_task(ship_order, order_id, label="ship_order",
                depends_on=["pack_order"])
runner.add_task(send_confirmation, order_id, label="send_confirmation",
                depends_on=["ship_order"])

results = runner.run()

for label, info in results.items():
    print(f"{label}: {info['status']} → {info['result']}")
```

***⚙️ Dependency Diagram***
![Concurra dependency scheduling example](https://raw.githubusercontent.com/Concurra/concurra/private/fix-dependency-validation/docs/diagrams/dependency-order-processing.png)

***✅ How It Works***
- `receive_order` starts first.
- `validate_payment` and `check_inventory` run in parallel after the order is received.
- `pack_order` waits for **both** payment and inventory checks to finish successfully.
- `ship_order` and `send_confirmation` run only after fulfillment reaches their prerequisite steps.
- If payment, inventory, or another dependency **fails or is terminated**, its dependents are automatically **skipped** and recorded as `Terminated` with an explanatory error (`"Skipped: dependency [...] failed or was terminated"`).

---

# 🔐 License
MIT License.
