Examples¶
The repository examples are a guided learning path. Run them from the project root after installing the package in editable mode:
pip install -e .
python examples/00_quickstart_thread_backend.py
For process-backend examples, run the file directly. Do not paste process-pool code into a REPL because worker processes need importable top-level functions.
Example map¶
File |
What it teaches |
|---|---|
|
Minimal manager lifecycle, lease acquisition, and |
|
Safe context-managed leases, owner labels, expiry fields, keyword args. |
|
Manual release and direct |
|
|
|
|
|
|
|
Soft expiry, hard expiry, and |
|
Process backend for CPU-heavy picklable functions. |
|
|
|
|
|
|
|
Common leasepool exceptions. |
|
Future |
|
|
|
Combined lifecycle, adaptive sizing, leases, grinder, and diagnostics. |
|
Store a manager on |
Complete walkthrough¶
"""
Complete leasepool usage walkthrough.
This example combines the APIs most applications use together:
- manager lifecycle
- adaptive sizing
- explicit owner labels
- context-managed leases
- WorkGrinder batching
- manager and grinder diagnostics
- clean shutdown order
"""
from __future__ import annotations
import asyncio
import time
from pprint import pprint
from leasepool import LeasedExecutorManager, WorkGrinder
def blocking_vendor_lookup(device_id: str) -> dict[str, str]:
time.sleep(0.05)
return {"device_id": device_id, "status": "ok"}
def blocking_score(value: int) -> int:
time.sleep(0.02)
return value * value
async def main() -> None:
connected_devices: set[str] = {"device-1", "device-2"}
manager = LeasedExecutorManager(
backend="thread",
max_pools=3,
min_pools=1,
units_per_pool=2,
size_provider=lambda: len(connected_devices),
workers_per_pool=4,
default_lease_seconds=60.0,
lease_grace_seconds=5.0,
name_prefix="walkthrough-worker",
)
grinder = WorkGrinder(
executor_manager=manager,
batch_size_threshold=4,
max_wait_seconds=0.5,
lease_seconds=30.0,
owner_prefix="walkthrough-grinder",
)
await manager.start()
await grinder.start()
try:
print("Initial manager stats:")
pprint(manager.stats())
# Adaptive sizing: update your signal and wake the checker immediately.
connected_devices.update({"device-3", "device-4", "device-5"})
manager.notify_scale_changed()
await asyncio.sleep(0.05)
print("\nAfter adaptive signal changed:")
pprint(manager.stats())
# Direct lease: best for one request or one coordinated group of calls.
async with await manager.acquire(owner="device-status:device-1") as lease:
status = await lease.run(blocking_vendor_lookup, "device-1")
print("\nDirect lease result:", status)
# Grinder: best for many small pieces of sync work submitted by callers.
scores = await asyncio.gather(
*(
grinder.submit(blocking_score, value, owner=f"score:{value}")
for value in range(8)
)
)
print("\nGrinder scores:", scores)
print("\nGrinder stats:")
pprint(grinder.stats())
print("\nFinal manager stats:")
pprint(manager.stats())
finally:
# Stop producers/batchers before shutting down the manager they depend on.
await grinder.stop(cancel_pending=True)
await manager.stop()
if __name__ == "__main__":
asyncio.run(main())
FastAPI pattern¶
"""
FastAPI integration pattern.
This file requires FastAPI if you want to run it:
pip install fastapi uvicorn
Run:
uvicorn examples.fastapi_lifespan_pattern:app --reload
The important idea:
- create the manager during lifespan startup
- store it on app.state
- stop it during lifespan shutdown
"""
from __future__ import annotations
import time
from contextlib import asynccontextmanager
from typing import AsyncIterator
from fastapi import FastAPI
from leasepool import LeasedExecutorManager
def blocking_vendor_sdk_call(device_id: str) -> dict[str, str]:
time.sleep(0.2)
return {"device_id": device_id, "status": "ok"}
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
manager = LeasedExecutorManager(
backend="thread",
max_pools=4,
min_pools=1,
workers_per_pool=4,
name_prefix="fastapi-blocking-worker",
)
await manager.start()
app.state.executor_manager = manager
try:
yield
finally:
await manager.stop()
app = FastAPI(lifespan=lifespan)
@app.get("/devices/{device_id}/status")
async def get_device_status(device_id: str) -> dict[str, str]:
manager: LeasedExecutorManager = app.state.executor_manager
async with await manager.acquire(owner=f"device-status:{device_id}") as lease:
return await lease.run(blocking_vendor_sdk_call, device_id)
@app.get("/executor/stats")
async def executor_stats() -> dict:
manager: LeasedExecutorManager = app.state.executor_manager
return manager.stats()