Source code for leasepool.backends
from __future__ import annotations
from typing import Any
from enum import StrEnum
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
[docs]
class ExecutorBackend(StrEnum):
"""Enum representing the available executor backends.
Args:
StrEnum (_type_): String-based enum for executor backends.
"""
THREAD = "thread"
PROCESS = "process"
INTERPRETER = "interpreter"
[docs]
def normalize_backend(backend: ExecutorBackend | str) -> ExecutorBackend:
"""Normalize the backend to an ExecutorBackend enum.
Args:
backend (ExecutorBackend | str): The backend to normalize.
Raises:
ValueError: If the backend is not supported.
Returns:
ExecutorBackend: The normalized backend.
"""
if isinstance(backend, ExecutorBackend):
return backend
try:
return ExecutorBackend(str(backend).lower())
except ValueError as exc:
supported = ", ".join(item.value for item in ExecutorBackend)
raise ValueError(f"Unsupported backend {backend!r}. Supported: {supported}") from exc
[docs]
def resolve_executor_cls(backend: ExecutorBackend | str) -> type[Executor]:
"""Resolve the executor class for the given backend.
Args:
backend (ExecutorBackend | str): The backend to resolve.
Raises:
UnsupportedBackendError: If the backend is not supported.
UnsupportedBackendError: If the InterpreterPoolExecutor is not available.
Returns:
type[Executor]: The executor class for the given backend.
"""
from .exceptions import UnsupportedBackendError
normalized = normalize_backend(backend)
if normalized is ExecutorBackend.THREAD:
return ThreadPoolExecutor
if normalized is ExecutorBackend.PROCESS:
return ProcessPoolExecutor
if normalized is ExecutorBackend.INTERPRETER:
try:
from concurrent.futures import InterpreterPoolExecutor # type: ignore[attr-defined]
except ImportError as exc:
raise UnsupportedBackendError(
"InterpreterPoolExecutor is available only on Python 3.14+."
) from exc
return InterpreterPoolExecutor
# Defensive fallback for static analyzers.
raise UnsupportedBackendError(f"Unsupported backend: {backend!r}")
[docs]
def build_executor(
*,
backend: ExecutorBackend | str,
max_workers: int,
name_prefix: str,
executor_seq: int,
executor_kwargs: dict[str, Any],
) -> Executor:
"""Build an executor instance for the selected backend.
`thread_name_prefix` is valid for ThreadPoolExecutor and InterpreterPoolExecutor,
but not for ProcessPoolExecutor on Python 3.11.
"""
executor_cls = resolve_executor_cls(backend)
kwargs = dict(executor_kwargs)
normalized = normalize_backend(backend)
if normalized in {ExecutorBackend.THREAD, ExecutorBackend.INTERPRETER}:
kwargs.setdefault("thread_name_prefix", f"{name_prefix}-{executor_seq}")
return executor_cls(max_workers=max_workers, **kwargs)