WorkGrinder¶
- class WorkGrinder(*, executor_manager, max_wait_seconds=10.0, batch_size_threshold=20, lease_seconds=60.0, owner_prefix='work-grinder', logger=None)[source]¶
Bases:
objectAsync work batcher backed by leased executors.
Multiple async callers submit sync work. The grinder starts processing a batch when either:
the oldest pending work has waited at least max_wait_seconds, or
pending work count reaches batch_size_threshold.
Once a batch is ready, it leases one executor and submits the whole batch.
Initialize a WorkGrinder instance.
- Parameters:
executor_manager (LeasedExecutorManager) – The executor manager to lease executors from.
max_wait_seconds (float, optional) – The maximum time to wait before processing a batch. Defaults to 10.0.
batch_size_threshold (int, optional) – The number of pending work items to trigger batch processing. Defaults to 20.
lease_seconds (float, optional) – The duration to lease an executor for each batch. Defaults to 60.0.
owner_prefix (str, optional) – The prefix for the owner identifier of each batch. Defaults to “work-grinder”.
logger (logging.Logger | None)
- Raises:
ValueError – If max_wait_seconds is not greater than 0.
ValueError – If batch_size_threshold is not greater than 0.
ValueError – If lease_seconds is not greater than 0.
- __init__(*, executor_manager, max_wait_seconds=10.0, batch_size_threshold=20, lease_seconds=60.0, owner_prefix='work-grinder', logger=None)[source]¶
Initialize a WorkGrinder instance.
- Parameters:
executor_manager (LeasedExecutorManager) – The executor manager to lease executors from.
max_wait_seconds (float, optional) – The maximum time to wait before processing a batch. Defaults to 10.0.
batch_size_threshold (int, optional) – The number of pending work items to trigger batch processing. Defaults to 20.
lease_seconds (float, optional) – The duration to lease an executor for each batch. Defaults to 60.0.
owner_prefix (str, optional) – The prefix for the owner identifier of each batch. Defaults to “work-grinder”.
logger (Logger | None)
- Raises:
ValueError – If max_wait_seconds is not greater than 0.
ValueError – If batch_size_threshold is not greater than 0.
ValueError – If lease_seconds is not greater than 0.
- async start()[source]¶
Start the WorkGrinder.
This method initializes the event loop and starts the grinder loop task.
- Return type:
- async enqueue(fn, /, *args, owner=None, **kwargs)[source]¶
Enqueue a work item to the WorkGrinder.
- Parameters:
- Raises:
RuntimeError – If the WorkGrinder is not started.
RuntimeError – If the WorkGrinder is stopping.
- Returns:
A future representing the result of the work item.
- Return type:
asyncio.Future[Any]
- submit_from_thread(fn, /, *args, owner=None, **kwargs)[source]¶
Submit a work item to the WorkGrinder from a different thread.
- Parameters:
- Raises:
RuntimeError – If the WorkGrinder is not started.
- Returns:
A future representing the result of the work item.
- Return type:
ConcurrentFuture[Any]
- stats()[source]¶
Get the current statistics of the WorkGrinder.
This method must be called from the WorkGrinder event-loop thread. Use stats_from_thread() from other threads.
- stats_from_thread(timeout=None)[source]¶
Get the current statistics of the WorkGrinder from a different thread.
- Parameters:
timeout (float | None, optional) – The maximum time to wait for the statistics. Defaults to None.
- Raises:
RuntimeError – If the WorkGrinder is not started.
- Returns:
A dictionary containing the current statistics.
- Return type:
Manual summary¶
Constructor¶
WorkGrinder(
*,
executor_manager,
max_wait_seconds=10.0,
batch_size_threshold=20,
lease_seconds=60.0,
owner_prefix="work-grinder",
logger=None,
)
Lifecycle¶
await grinder.start()Start the background grinder task.
await grinder.stop(cancel_pending=False)Stop the grinder. Pending work is drained by default. With
cancel_pending=True, queued pending work is cancelled.
Submitting work¶
await grinder.submit(fn, *args, owner=None, **kwargs)Queue work and wait for its result.
await grinder.enqueue(fn, *args, owner=None, **kwargs)Queue work and return an
asyncio.Futureimmediately.grinder.submit_from_thread(fn, *args, owner=None, **kwargs)Submit from another OS thread and receive a
concurrent.futures.Future.
Diagnostics¶
grinder.stats()Return a diagnostic snapshot. Call it from the event-loop thread.