Quickstart

Use leasepool when async code needs to run synchronous work without leaking raw executors throughout your codebase.

The usual lifecycle is:

  1. create a LeasedExecutorManager;

  2. await manager.start() during application startup;

  3. acquire leases around synchronous work;

  4. await manager.stop() during application shutdown.

Basic thread backend

Use the thread backend for blocking I/O or legacy synchronous SDKs:

examples/00_quickstart_thread_backend.py
"""
Quickstart: run one blocking sync function from async code.

This is the most basic `leasepool` usage:

1. create a manager
2. start it
3. acquire a lease
4. run sync work inside the leased executor
5. stop the manager
"""

from __future__ import annotations

import asyncio
import time

from leasepool import ExecutorBackend, LeasedExecutorManager


def blocking_uppercase(value: str) -> str:
    time.sleep(0.2)
    return value.upper()


async def main() -> None:
    manager = LeasedExecutorManager(
        backend=ExecutorBackend.THREAD,
        max_pools=2,
        min_pools=1,
        workers_per_pool=4,
        name_prefix="quickstart-worker",
    )

    await manager.start()

    try:
        async with await manager.acquire(owner="quickstart") as lease:
            result = await lease.run(blocking_uppercase, "hello leasepool")

        print("Result:", result)
    finally:
        await manager.stop()


if __name__ == "__main__":
    asyncio.run(main())

Context-managed leases

The async context manager is the recommended style because it returns the lease even when your work raises an exception.

async with await manager.acquire(owner="vendor-sdk-call") as lease:
    result = await lease.run(blocking_vendor_call, device_id)

lease.run() accepts positional and keyword arguments. It runs a synchronous callable in the leased executor and awaits the result.

Manual release

Manual release is available for control flow that cannot fit inside one context:

lease = await manager.acquire(owner="manual-flow")

try:
    result = await lease.run(sync_function)
finally:
    await lease.release()

Prefer the context-manager form unless manual control is necessary.

CPU work with the process backend

Use the process backend for CPU-heavy Python work that should run across CPU cores:

examples/07_process_backend_cpu_work.py
"""
CPU-heavy work with ProcessPoolExecutor backend on Python 3.11.

Use the process backend for CPU-bound work that should use multiple CPU cores.

Important:
- submitted functions must be top-level importable functions
- arguments and return values must be picklable
- do not submit lambdas, nested functions, open sockets, Redis clients, or DB clients
"""

from __future__ import annotations

import asyncio

from leasepool import ExecutorBackend, LeasedExecutorManager


def count_primes(limit: int) -> int:
    count = 0

    for number in range(2, limit):
        for divisor in range(2, int(number**0.5) + 1):
            if number % divisor == 0:
                break
        else:
            count += 1

    return count


async def main() -> None:
    manager = LeasedExecutorManager(
        backend=ExecutorBackend.PROCESS,
        max_pools=1,
        min_pools=1,
        workers_per_pool=4,
    )

    await manager.start()

    try:
        async with await manager.acquire(owner="cpu-primes") as lease:
            results = await asyncio.gather(
                lease.run(count_primes, 20_000),
                lease.run(count_primes, 21_000),
                lease.run(count_primes, 22_000),
                lease.run(count_primes, 23_000),
            )

        print("Prime counts:", results)

    finally:
        await manager.stop()


if __name__ == "__main__":
    asyncio.run(main())

Important

With the process backend, submitted functions, arguments, and return values must be picklable. Prefer top-level functions and simple serializable data.

Batch many small jobs with WorkGrinder

WorkGrinder is useful when many async callers submit small pieces of synchronous work and you want one component to control batching and leasing.

examples/08_work_grinder_submit.py
"""
WorkGrinder with `submit()`.

`submit()` queues work and waits for the result.

The grinder processes a batch when either:

- pending count reaches batch_size_threshold
- the oldest pending item waits max_wait_seconds
"""

from __future__ import annotations

import asyncio
import time

from leasepool import LeasedExecutorManager, WorkGrinder


def blocking_square(value: int) -> int:
    time.sleep(0.05)
    return value * value


async def main() -> None:
    manager = LeasedExecutorManager(
        backend="thread",
        max_pools=1,
        min_pools=1,
        workers_per_pool=4,
    )
    grinder = WorkGrinder(
        executor_manager=manager,
        batch_size_threshold=10,
        max_wait_seconds=1.0,
        lease_seconds=30.0,
        owner_prefix="square-grinder",
    )

    await manager.start()
    await grinder.start()

    try:
        results = await asyncio.gather(
            *(grinder.submit(blocking_square, i, owner=f"item-{i}") for i in range(20))
        )

        print("Results:", results)
        print("Grinder stats:", grinder.stats())

    finally:
        await grinder.stop(cancel_pending=True)
        await manager.stop()


if __name__ == "__main__":
    asyncio.run(main())