Examples

The repository examples are a guided learning path. Run them from the project root after installing the package in editable mode:

pip install -e .
python examples/00_quickstart_thread_backend.py

For process-backend examples, run the file directly. Do not paste process-pool code into a REPL because worker processes need importable top-level functions.

Example map

File

What it teaches

00_quickstart_thread_backend.py

Minimal manager lifecycle, lease acquisition, and lease.run().

01_lease_context_manager.py

Safe context-managed leases, owner labels, expiry fields, keyword args.

02_manual_acquire_release.py

Manual release and direct lease.executor.submit() through the proxy.

03_wait_timeout_unavailable.py

wait=False, timeout, and backpressure when all pools are leased.

04_adaptive_sizing.py

size_provider, units_per_pool, and notify_scale_changed().

05_stats_and_counts.py

backend, counts, desired target, and manager.stats().

06_lease_expiry_and_revocation.py

Soft expiry, hard expiry, and LeaseExpiredError.

07_process_backend_cpu_work.py

Process backend for CPU-heavy picklable functions.

08_work_grinder_submit.py

WorkGrinder.submit() batching and result awaiting.

09_work_grinder_enqueue.py

WorkGrinder.enqueue() and awaiting futures later.

10_submit_from_thread.py

WorkGrinder.submit_from_thread() from another OS thread.

11_error_handling.py

Common leasepool exceptions.

12_interpreter_backend_future_python314.py

Future InterpreterPoolExecutor backend behavior.

13_process_log_forwarding.py

ProcessLoggingConfig and process-worker log forwarding.

14_complete_library_walkthrough.py

Combined lifecycle, adaptive sizing, leases, grinder, and diagnostics.

fastapi_lifespan_pattern.py

Store a manager on app.state and stop it during lifespan shutdown.

Complete walkthrough

examples/14_complete_library_walkthrough.py
"""
Complete leasepool usage walkthrough.

This example combines the APIs most applications use together:

- manager lifecycle
- adaptive sizing
- explicit owner labels
- context-managed leases
- WorkGrinder batching
- manager and grinder diagnostics
- clean shutdown order
"""

from __future__ import annotations

import asyncio
import time
from pprint import pprint

from leasepool import LeasedExecutorManager, WorkGrinder


def blocking_vendor_lookup(device_id: str) -> dict[str, str]:
    time.sleep(0.05)
    return {"device_id": device_id, "status": "ok"}


def blocking_score(value: int) -> int:
    time.sleep(0.02)
    return value * value


async def main() -> None:
    connected_devices: set[str] = {"device-1", "device-2"}

    manager = LeasedExecutorManager(
        backend="thread",
        max_pools=3,
        min_pools=1,
        units_per_pool=2,
        size_provider=lambda: len(connected_devices),
        workers_per_pool=4,
        default_lease_seconds=60.0,
        lease_grace_seconds=5.0,
        name_prefix="walkthrough-worker",
    )
    grinder = WorkGrinder(
        executor_manager=manager,
        batch_size_threshold=4,
        max_wait_seconds=0.5,
        lease_seconds=30.0,
        owner_prefix="walkthrough-grinder",
    )

    await manager.start()
    await grinder.start()

    try:
        print("Initial manager stats:")
        pprint(manager.stats())

        # Adaptive sizing: update your signal and wake the checker immediately.
        connected_devices.update({"device-3", "device-4", "device-5"})
        manager.notify_scale_changed()
        await asyncio.sleep(0.05)

        print("\nAfter adaptive signal changed:")
        pprint(manager.stats())

        # Direct lease: best for one request or one coordinated group of calls.
        async with await manager.acquire(owner="device-status:device-1") as lease:
            status = await lease.run(blocking_vendor_lookup, "device-1")

        print("\nDirect lease result:", status)

        # Grinder: best for many small pieces of sync work submitted by callers.
        scores = await asyncio.gather(
            *(
                grinder.submit(blocking_score, value, owner=f"score:{value}")
                for value in range(8)
            )
        )

        print("\nGrinder scores:", scores)
        print("\nGrinder stats:")
        pprint(grinder.stats())
        print("\nFinal manager stats:")
        pprint(manager.stats())

    finally:
        # Stop producers/batchers before shutting down the manager they depend on.
        await grinder.stop(cancel_pending=True)
        await manager.stop()


if __name__ == "__main__":
    asyncio.run(main())

FastAPI pattern

examples/fastapi_lifespan_pattern.py
"""
FastAPI integration pattern.

This file requires FastAPI if you want to run it:

    pip install fastapi uvicorn

Run:

    uvicorn examples.fastapi_lifespan_pattern:app --reload

The important idea:
- create the manager during lifespan startup
- store it on app.state
- stop it during lifespan shutdown
"""

from __future__ import annotations

import time
from contextlib import asynccontextmanager
from typing import AsyncIterator

from fastapi import FastAPI

from leasepool import LeasedExecutorManager


def blocking_vendor_sdk_call(device_id: str) -> dict[str, str]:
    time.sleep(0.2)
    return {"device_id": device_id, "status": "ok"}


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    manager = LeasedExecutorManager(
        backend="thread",
        max_pools=4,
        min_pools=1,
        workers_per_pool=4,
        name_prefix="fastapi-blocking-worker",
    )

    await manager.start()

    app.state.executor_manager = manager

    try:
        yield
    finally:
        await manager.stop()


app = FastAPI(lifespan=lifespan)


@app.get("/devices/{device_id}/status")
async def get_device_status(device_id: str) -> dict[str, str]:
    manager: LeasedExecutorManager = app.state.executor_manager

    async with await manager.acquire(owner=f"device-status:{device_id}") as lease:
        return await lease.run(blocking_vendor_sdk_call, device_id)


@app.get("/executor/stats")
async def executor_stats() -> dict:
    manager: LeasedExecutorManager = app.state.executor_manager
    return manager.stats()