WorkGrinder

WorkGrinder batches many submitters into executor work batches.

Use it when many async callers submit small synchronous jobs and you want one component to control batching, leasing, completion, and shutdown.

How batching works

The grinder processes a batch when either:

  • pending work count reaches batch_size_threshold;

  • the oldest pending item has waited max_wait_seconds.

Each batch acquires one lease from the configured manager, submits each work item through that lease, resolves each caller’s future, and releases the lease.

Basic submit

submit() queues work and waits for the result:

examples/08_work_grinder_submit.py
"""
WorkGrinder with `submit()`.

`submit()` queues work and waits for the result.

The grinder processes a batch when either:

- pending count reaches batch_size_threshold
- the oldest pending item waits max_wait_seconds
"""

from __future__ import annotations

import asyncio
import time

from leasepool import LeasedExecutorManager, WorkGrinder


def blocking_square(value: int) -> int:
    time.sleep(0.05)
    return value * value


async def main() -> None:
    manager = LeasedExecutorManager(
        backend="thread",
        max_pools=1,
        min_pools=1,
        workers_per_pool=4,
    )
    grinder = WorkGrinder(
        executor_manager=manager,
        batch_size_threshold=10,
        max_wait_seconds=1.0,
        lease_seconds=30.0,
        owner_prefix="square-grinder",
    )

    await manager.start()
    await grinder.start()

    try:
        results = await asyncio.gather(
            *(grinder.submit(blocking_square, i, owner=f"item-{i}") for i in range(20))
        )

        print("Results:", results)
        print("Grinder stats:", grinder.stats())

    finally:
        await grinder.stop(cancel_pending=True)
        await manager.stop()


if __name__ == "__main__":
    asyncio.run(main())

enqueue

enqueue() queues work and returns an asyncio.Future immediately. This is useful when you want to queue multiple items first and await later.

examples/09_work_grinder_enqueue.py
"""
WorkGrinder with `enqueue()`.

`enqueue()` returns an asyncio.Future immediately. This is useful when you want
to queue work first and await it later.
"""

from __future__ import annotations

import asyncio
import time

from leasepool import LeasedExecutorManager, WorkGrinder


def blocking_format(value: int) -> str:
    time.sleep(0.05)
    return f"value={value}"


async def main() -> None:
    manager = LeasedExecutorManager(
        backend="thread",
        max_pools=1,
        min_pools=1,
        workers_per_pool=4,
    )
    grinder = WorkGrinder(
        executor_manager=manager,
        batch_size_threshold=5,
        max_wait_seconds=1.0,
        lease_seconds=30.0,
    )

    await manager.start()
    await grinder.start()

    try:
        futures = []

        for i in range(5):
            future = await grinder.enqueue(blocking_format, i, owner=f"format-{i}")
            futures.append(future)

        print("Queued futures:", len(futures))
        print("Stats after enqueue:", grinder.stats())

        results = await asyncio.gather(*futures)

        print("Results:", results)
        print("Stats after completion:", grinder.stats())

    finally:
        await grinder.stop(cancel_pending=True)
        await manager.stop()


if __name__ == "__main__":
    asyncio.run(main())

submit_from_thread

submit_from_thread() is for non-async code or another OS thread. It returns a concurrent.futures.Future.

examples/10_submit_from_thread.py
"""
Submitting work to WorkGrinder from another OS thread.

Use `submit_from_thread()` only from non-async code or another thread. It returns
a `concurrent.futures.Future`.
"""

from __future__ import annotations

import asyncio
import threading
import time

from leasepool import LeasedExecutorManager, WorkGrinder


def blocking_add(left: int, right: int) -> int:
    time.sleep(0.05)
    return left + right


def thread_entrypoint(grinder: WorkGrinder) -> None:
    future = grinder.submit_from_thread(
        blocking_add,
        20,
        22,
        owner="external-thread",
    )

    print("Worker thread got result:", future.result(timeout=5))


async def main() -> None:
    manager = LeasedExecutorManager(
        backend="thread",
        max_pools=1,
        min_pools=1,
        workers_per_pool=2,
    )
    grinder = WorkGrinder(
        executor_manager=manager,
        batch_size_threshold=1,
        max_wait_seconds=1.0,
    )

    await manager.start()
    await grinder.start()

    try:
        thread = threading.Thread(
            target=thread_entrypoint,
            args=(grinder,),
            name="external-submitter",
        )
        thread.start()

        while thread.is_alive():
            await asyncio.sleep(0.05)

        thread.join()

    finally:
        await grinder.stop(cancel_pending=True)
        await manager.stop()


if __name__ == "__main__":
    asyncio.run(main())

Shutdown behavior

await grinder.stop(cancel_pending=False)

Drains pending work before stopping.

await grinder.stop(cancel_pending=True)

Cancels queued pending work immediately.

Stop the grinder before stopping the manager it depends on:

await grinder.stop(cancel_pending=True)
await manager.stop()

Diagnostics

Call grinder.stats() from the event-loop thread to get:

  • started;

  • stopping;

  • pending;

  • batch_size_threshold;

  • max_wait_seconds;

  • lease_seconds;

  • oldest_wait_seconds.