WorkGrinder¶
WorkGrinder batches many submitters into executor work batches.
Use it when many async callers submit small synchronous jobs and you want one component to control batching, leasing, completion, and shutdown.
How batching works¶
The grinder processes a batch when either:
pending work count reaches
batch_size_threshold;the oldest pending item has waited
max_wait_seconds.
Each batch acquires one lease from the configured manager, submits each work item through that lease, resolves each caller’s future, and releases the lease.
Basic submit¶
submit() queues work and waits for the result:
"""
WorkGrinder with `submit()`.
`submit()` queues work and waits for the result.
The grinder processes a batch when either:
- pending count reaches batch_size_threshold
- the oldest pending item waits max_wait_seconds
"""
from __future__ import annotations
import asyncio
import time
from leasepool import LeasedExecutorManager, WorkGrinder
def blocking_square(value: int) -> int:
time.sleep(0.05)
return value * value
async def main() -> None:
manager = LeasedExecutorManager(
backend="thread",
max_pools=1,
min_pools=1,
workers_per_pool=4,
)
grinder = WorkGrinder(
executor_manager=manager,
batch_size_threshold=10,
max_wait_seconds=1.0,
lease_seconds=30.0,
owner_prefix="square-grinder",
)
await manager.start()
await grinder.start()
try:
results = await asyncio.gather(
*(grinder.submit(blocking_square, i, owner=f"item-{i}") for i in range(20))
)
print("Results:", results)
print("Grinder stats:", grinder.stats())
finally:
await grinder.stop(cancel_pending=True)
await manager.stop()
if __name__ == "__main__":
asyncio.run(main())
enqueue¶
enqueue() queues work and returns an asyncio.Future immediately. This is
useful when you want to queue multiple items first and await later.
"""
WorkGrinder with `enqueue()`.
`enqueue()` returns an asyncio.Future immediately. This is useful when you want
to queue work first and await it later.
"""
from __future__ import annotations
import asyncio
import time
from leasepool import LeasedExecutorManager, WorkGrinder
def blocking_format(value: int) -> str:
time.sleep(0.05)
return f"value={value}"
async def main() -> None:
manager = LeasedExecutorManager(
backend="thread",
max_pools=1,
min_pools=1,
workers_per_pool=4,
)
grinder = WorkGrinder(
executor_manager=manager,
batch_size_threshold=5,
max_wait_seconds=1.0,
lease_seconds=30.0,
)
await manager.start()
await grinder.start()
try:
futures = []
for i in range(5):
future = await grinder.enqueue(blocking_format, i, owner=f"format-{i}")
futures.append(future)
print("Queued futures:", len(futures))
print("Stats after enqueue:", grinder.stats())
results = await asyncio.gather(*futures)
print("Results:", results)
print("Stats after completion:", grinder.stats())
finally:
await grinder.stop(cancel_pending=True)
await manager.stop()
if __name__ == "__main__":
asyncio.run(main())
submit_from_thread¶
submit_from_thread() is for non-async code or another OS thread. It returns a
concurrent.futures.Future.
"""
Submitting work to WorkGrinder from another OS thread.
Use `submit_from_thread()` only from non-async code or another thread. It returns
a `concurrent.futures.Future`.
"""
from __future__ import annotations
import asyncio
import threading
import time
from leasepool import LeasedExecutorManager, WorkGrinder
def blocking_add(left: int, right: int) -> int:
time.sleep(0.05)
return left + right
def thread_entrypoint(grinder: WorkGrinder) -> None:
future = grinder.submit_from_thread(
blocking_add,
20,
22,
owner="external-thread",
)
print("Worker thread got result:", future.result(timeout=5))
async def main() -> None:
manager = LeasedExecutorManager(
backend="thread",
max_pools=1,
min_pools=1,
workers_per_pool=2,
)
grinder = WorkGrinder(
executor_manager=manager,
batch_size_threshold=1,
max_wait_seconds=1.0,
)
await manager.start()
await grinder.start()
try:
thread = threading.Thread(
target=thread_entrypoint,
args=(grinder,),
name="external-submitter",
)
thread.start()
while thread.is_alive():
await asyncio.sleep(0.05)
thread.join()
finally:
await grinder.stop(cancel_pending=True)
await manager.stop()
if __name__ == "__main__":
asyncio.run(main())
Shutdown behavior¶
await grinder.stop(cancel_pending=False)
Drains pending work before stopping.
await grinder.stop(cancel_pending=True)
Cancels queued pending work immediately.
Stop the grinder before stopping the manager it depends on:
await grinder.stop(cancel_pending=True)
await manager.stop()
Diagnostics¶
Call grinder.stats() from the event-loop thread to get:
started;stopping;pending;batch_size_threshold;max_wait_seconds;lease_seconds;oldest_wait_seconds.