Quickstart¶
Use leasepool when async code needs to run synchronous work without leaking
raw executors throughout your codebase.
The usual lifecycle is:
create a
LeasedExecutorManager;await manager.start()during application startup;acquire leases around synchronous work;
await manager.stop()during application shutdown.
Basic thread backend¶
Use the thread backend for blocking I/O or legacy synchronous SDKs:
"""
Quickstart: run one blocking sync function from async code.
This is the most basic `leasepool` usage:
1. create a manager
2. start it
3. acquire a lease
4. run sync work inside the leased executor
5. stop the manager
"""
from __future__ import annotations
import asyncio
import time
from leasepool import ExecutorBackend, LeasedExecutorManager
def blocking_uppercase(value: str) -> str:
time.sleep(0.2)
return value.upper()
async def main() -> None:
manager = LeasedExecutorManager(
backend=ExecutorBackend.THREAD,
max_pools=2,
min_pools=1,
workers_per_pool=4,
name_prefix="quickstart-worker",
)
await manager.start()
try:
async with await manager.acquire(owner="quickstart") as lease:
result = await lease.run(blocking_uppercase, "hello leasepool")
print("Result:", result)
finally:
await manager.stop()
if __name__ == "__main__":
asyncio.run(main())
Context-managed leases¶
The async context manager is the recommended style because it returns the lease even when your work raises an exception.
async with await manager.acquire(owner="vendor-sdk-call") as lease:
result = await lease.run(blocking_vendor_call, device_id)
lease.run() accepts positional and keyword arguments. It runs a synchronous
callable in the leased executor and awaits the result.
Manual release¶
Manual release is available for control flow that cannot fit inside one context:
lease = await manager.acquire(owner="manual-flow")
try:
result = await lease.run(sync_function)
finally:
await lease.release()
Prefer the context-manager form unless manual control is necessary.
CPU work with the process backend¶
Use the process backend for CPU-heavy Python work that should run across CPU cores:
"""
CPU-heavy work with ProcessPoolExecutor backend on Python 3.11.
Use the process backend for CPU-bound work that should use multiple CPU cores.
Important:
- submitted functions must be top-level importable functions
- arguments and return values must be picklable
- do not submit lambdas, nested functions, open sockets, Redis clients, or DB clients
"""
from __future__ import annotations
import asyncio
from leasepool import ExecutorBackend, LeasedExecutorManager
def count_primes(limit: int) -> int:
count = 0
for number in range(2, limit):
for divisor in range(2, int(number**0.5) + 1):
if number % divisor == 0:
break
else:
count += 1
return count
async def main() -> None:
manager = LeasedExecutorManager(
backend=ExecutorBackend.PROCESS,
max_pools=1,
min_pools=1,
workers_per_pool=4,
)
await manager.start()
try:
async with await manager.acquire(owner="cpu-primes") as lease:
results = await asyncio.gather(
lease.run(count_primes, 20_000),
lease.run(count_primes, 21_000),
lease.run(count_primes, 22_000),
lease.run(count_primes, 23_000),
)
print("Prime counts:", results)
finally:
await manager.stop()
if __name__ == "__main__":
asyncio.run(main())
Important
With the process backend, submitted functions, arguments, and return values must be picklable. Prefer top-level functions and simple serializable data.
Batch many small jobs with WorkGrinder¶
WorkGrinder is useful when many async callers submit small pieces of
synchronous work and you want one component to control batching and leasing.
"""
WorkGrinder with `submit()`.
`submit()` queues work and waits for the result.
The grinder processes a batch when either:
- pending count reaches batch_size_threshold
- the oldest pending item waits max_wait_seconds
"""
from __future__ import annotations
import asyncio
import time
from leasepool import LeasedExecutorManager, WorkGrinder
def blocking_square(value: int) -> int:
time.sleep(0.05)
return value * value
async def main() -> None:
manager = LeasedExecutorManager(
backend="thread",
max_pools=1,
min_pools=1,
workers_per_pool=4,
)
grinder = WorkGrinder(
executor_manager=manager,
batch_size_threshold=10,
max_wait_seconds=1.0,
lease_seconds=30.0,
owner_prefix="square-grinder",
)
await manager.start()
await grinder.start()
try:
results = await asyncio.gather(
*(grinder.submit(blocking_square, i, owner=f"item-{i}") for i in range(20))
)
print("Results:", results)
print("Grinder stats:", grinder.stats())
finally:
await grinder.stop(cancel_pending=True)
await manager.stop()
if __name__ == "__main__":
asyncio.run(main())