WorkGrinder

class WorkGrinder(*, executor_manager, max_wait_seconds=10.0, batch_size_threshold=20, lease_seconds=60.0, owner_prefix='work-grinder', logger=None)[source]

Bases: object

Async work batcher backed by leased executors.

Multiple async callers submit sync work. The grinder starts processing a batch when either:

  • the oldest pending work has waited at least max_wait_seconds, or

  • pending work count reaches batch_size_threshold.

Once a batch is ready, it leases one executor and submits the whole batch.

Initialize a WorkGrinder instance.

Parameters:
  • executor_manager (LeasedExecutorManager) – The executor manager to lease executors from.

  • max_wait_seconds (float, optional) – The maximum time to wait before processing a batch. Defaults to 10.0.

  • batch_size_threshold (int, optional) – The number of pending work items to trigger batch processing. Defaults to 20.

  • lease_seconds (float, optional) – The duration to lease an executor for each batch. Defaults to 60.0.

  • owner_prefix (str, optional) – The prefix for the owner identifier of each batch. Defaults to “work-grinder”.

  • logger (logging.Logger | None)

Raises:
  • ValueError – If max_wait_seconds is not greater than 0.

  • ValueError – If batch_size_threshold is not greater than 0.

  • ValueError – If lease_seconds is not greater than 0.

__init__(*, executor_manager, max_wait_seconds=10.0, batch_size_threshold=20, lease_seconds=60.0, owner_prefix='work-grinder', logger=None)[source]

Initialize a WorkGrinder instance.

Parameters:
  • executor_manager (LeasedExecutorManager) – The executor manager to lease executors from.

  • max_wait_seconds (float, optional) – The maximum time to wait before processing a batch. Defaults to 10.0.

  • batch_size_threshold (int, optional) – The number of pending work items to trigger batch processing. Defaults to 20.

  • lease_seconds (float, optional) – The duration to lease an executor for each batch. Defaults to 60.0.

  • owner_prefix (str, optional) – The prefix for the owner identifier of each batch. Defaults to “work-grinder”.

  • logger (Logger | None)

Raises:
  • ValueError – If max_wait_seconds is not greater than 0.

  • ValueError – If batch_size_threshold is not greater than 0.

  • ValueError – If lease_seconds is not greater than 0.

async start()[source]

Start the WorkGrinder.

This method initializes the event loop and starts the grinder loop task.

Return type:

None

async stop(*, cancel_pending=False)[source]

Stop the WorkGrinder.

Parameters:

cancel_pending (bool, optional) – Whether to cancel pending work items. Defaults to False.

Return type:

None

async submit(fn, /, *args, owner=None, **kwargs)[source]

Submit a work item to the WorkGrinder.

Parameters:
  • fn (Callable[..., Any]) – The function to execute.

  • owner (str | None, optional) – The owner of the work item. Defaults to None.

  • args (Any)

  • kwargs (Any)

Returns:

The result of the work item.

Return type:

Any

async enqueue(fn, /, *args, owner=None, **kwargs)[source]

Enqueue a work item to the WorkGrinder.

Parameters:
  • fn (Callable[..., Any]) – The function to execute.

  • owner (str | None, optional) – The owner of the work item. Defaults to None.

  • args (Any)

  • kwargs (Any)

Raises:
Returns:

A future representing the result of the work item.

Return type:

asyncio.Future[Any]

submit_from_thread(fn, /, *args, owner=None, **kwargs)[source]

Submit a work item to the WorkGrinder from a different thread.

Parameters:
  • fn (Callable[..., Any]) – The function to execute.

  • owner (str | None, optional) – The owner of the work item. Defaults to None.

  • args (Any)

  • kwargs (Any)

Raises:

RuntimeError – If the WorkGrinder is not started.

Returns:

A future representing the result of the work item.

Return type:

ConcurrentFuture[Any]

stats()[source]

Get the current statistics of the WorkGrinder.

This method must be called from the WorkGrinder event-loop thread. Use stats_from_thread() from other threads.

Returns:

A dictionary containing the current statistics.

Return type:

dict[str, Any]

async astats()[source]

Get the current statistics of the WorkGrinder asynchronously.

Returns:

A dictionary containing the current statistics.

Return type:

dict[str, Any]

stats_from_thread(timeout=None)[source]

Get the current statistics of the WorkGrinder from a different thread.

Parameters:

timeout (float | None, optional) – The maximum time to wait for the statistics. Defaults to None.

Raises:

RuntimeError – If the WorkGrinder is not started.

Returns:

A dictionary containing the current statistics.

Return type:

dict[str, Any]

Manual summary

Constructor

WorkGrinder(
    *,
    executor_manager,
    max_wait_seconds=10.0,
    batch_size_threshold=20,
    lease_seconds=60.0,
    owner_prefix="work-grinder",
    logger=None,
)

Lifecycle

await grinder.start()

Start the background grinder task.

await grinder.stop(cancel_pending=False)

Stop the grinder. Pending work is drained by default. With cancel_pending=True, queued pending work is cancelled.

Submitting work

await grinder.submit(fn, *args, owner=None, **kwargs)

Queue work and wait for its result.

await grinder.enqueue(fn, *args, owner=None, **kwargs)

Queue work and return an asyncio.Future immediately.

grinder.submit_from_thread(fn, *args, owner=None, **kwargs)

Submit from another OS thread and receive a concurrent.futures.Future.

Diagnostics

grinder.stats()

Return a diagnostic snapshot. Call it from the event-loop thread.