Source code for leasepool.manager

from __future__ import annotations

import asyncio
import logging
import math
import multiprocessing
import threading
import time
import uuid
from collections import deque
from collections.abc import Callable
from concurrent.futures import Executor, Future
from dataclasses import dataclass
from typing import Any

from ._process_logging import (
    ProcessLoggingConfig,
    build_queue_listener,
    coerce_log_level,
    process_worker_initializer,
)
from .backends import ExecutorBackend, build_executor, normalize_backend
from .exceptions import (
    LeaseExpiredError,
    LeasePoolNotStartedError,
    LeaseUnavailableError,
)
from .types import SizeProvider


logger = logging.getLogger(__name__)


@dataclass(slots=True)
class _LeaseRecord:
    lease_id: str
    executor: Executor
    owner: str | None
    leased_at: float
    lease_seconds: float
    grace_seconds: float

    @property
    def soft_expires_at(self) -> float:
        return self.leased_at + self.lease_seconds

    @property
    def hard_expires_at(self) -> float:
        return self.soft_expires_at + self.grace_seconds


class _LeasedExecutorProxy(Executor):
    """Proxy handed to callers instead of the raw executor.

    The raw executor remains private inside LeasedExecutorManager. Once the lease
    is released or revoked, new submissions through this proxy fail.
    """

    def __init__(self, manager: LeasedExecutorManager, lease_id: str):
        """Init module-level initializer for ProcessPoolExecutor workers.

        Args:
            manager (LeasedExecutorManager): _description_
            lease_id (str): _description_
        """
        self._manager = manager
        self._lease_id = lease_id

    def submit(self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> Future:
        return self._manager._submit_for_lease(self._lease_id, fn, *args, **kwargs)

    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
        raise RuntimeError(
            "Do not shut down a leased executor directly. "
            "Return the lease with `await lease.release()`."
        )


[docs] class ExecutorLease: """Lease object returned by LeasedExecutorManager.acquire().""" def __init__( self, *, manager: LeasedExecutorManager, lease_id: str, owner: str | None, lease_seconds: float, grace_seconds: float, leased_at: float, ): self._manager = manager self.lease_id = lease_id self.owner = owner self.lease_seconds = lease_seconds self.grace_seconds = grace_seconds self.leased_at = leased_at self.executor: Executor = _LeasedExecutorProxy(manager, lease_id) self._released = False @property def soft_expires_at(self) -> float: return self.leased_at + self.lease_seconds @property def hard_expires_at(self) -> float: return self.soft_expires_at + self.grace_seconds
[docs] async def run(self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> Any: """Run a synchronous callable in this leased executor.""" loop = asyncio.get_running_loop() if kwargs: import functools call = functools.partial(fn, *args, **kwargs) return await loop.run_in_executor(self.executor, call) return await loop.run_in_executor(self.executor, fn, *args)
[docs] async def release(self) -> None: if self._released: return self._released = True await self._manager.release(self.lease_id)
async def __aenter__(self) -> ExecutorLease: return self async def __aexit__(self, exc_type: object, exc: object, tb: object) -> None: await self.release()
[docs] class LeasedExecutorManager: """Bounded async manager for leased Executor instances. The manager supports ThreadPoolExecutor and ProcessPoolExecutor on Python 3.11. Python 3.14+ can add InterpreterPoolExecutor through the same backend hook. Sizing rule: desired executors = max(min_pools, ceil(size_provider() / units_per_pool)) desired executors is capped at max_pools. """ def __init__( self, *, backend: ExecutorBackend | str = ExecutorBackend.THREAD, max_pools: int, min_pools: int = 1, units_per_pool: int = 10, size_provider: SizeProvider | None = None, check_interval: float = 120.0, default_lease_seconds: float = 300.0, lease_grace_seconds: float = 15.0, workers_per_pool: int = 4, name_prefix: str = "leasepool", logger: logging.Logger | None = None, process_logging: ProcessLoggingConfig | None = None, forward_process_logs: bool = False, process_log_level: int | str = logging.INFO, process_log_target_logger: logging.Logger | None = None, clear_process_log_handlers: bool = True, **executor_kwargs: Any, ): if max_pools <= 0: raise ValueError("max_pools must be > 0") if min_pools <= 0: raise ValueError("min_pools must be > 0") if units_per_pool <= 0: raise ValueError("units_per_pool must be > 0") if workers_per_pool <= 0: raise ValueError("workers_per_pool must be > 0") if min_pools > max_pools: raise ValueError("min_pools cannot be greater than max_pools") self._backend = normalize_backend(backend) self._logger = logger or logging.getLogger(__name__) if process_logging is not None and ( forward_process_logs or process_log_level != logging.INFO or process_log_target_logger is not None or clear_process_log_handlers is not True ): raise ValueError( "Pass either process_logging or the forward_process_logs/process_log_* " "arguments, not both." ) if process_logging is None: process_logging = ProcessLoggingConfig( enabled=forward_process_logs, level=process_log_level, target_logger=process_log_target_logger, clear_child_handlers=clear_process_log_handlers, ) if process_logging.enabled and self._backend is not ExecutorBackend.PROCESS: raise ValueError( "process log forwarding is only supported for process backend" ) self._process_logging = process_logging self._process_log_level = coerce_log_level(process_logging.level) self._process_log_queue: Any | None = None self._process_log_listener: Any | None = None self._process_log_mp_context: multiprocessing.context.BaseContext | None = None self._max_pools = int(max_pools) self._min_pools = int(min_pools) self._units_per_pool = int(units_per_pool) self._size_provider = size_provider self._check_interval = float(check_interval) self._default_lease_seconds = float(default_lease_seconds) self._lease_grace_seconds = float(lease_grace_seconds) self._workers_per_pool = int(workers_per_pool) self._name_prefix = name_prefix self._executor_kwargs = executor_kwargs self._available: deque[Executor] = deque() self._leased: dict[str, _LeaseRecord] = {} # Synchronous lock because Executor.submit() is synchronous. self._lock = threading.RLock() self._availability_event: asyncio.Event | None = None self._scale_change_event: asyncio.Event | None = None self._loop: asyncio.AbstractEventLoop | None = None self._checker_task: asyncio.Task[None] | None = None self._started = False self._stopping = False self._executor_seq = 0 @property def backend(self) -> ExecutorBackend: return self._backend
[docs] async def start(self) -> None: if self._started: return self._loop = asyncio.get_running_loop() self._availability_event = asyncio.Event() self._scale_change_event = asyncio.Event() self._start_process_logging_if_needed() try: with self._lock: self._stopping = False self._ensure_minimum_locked() self._checker_task = asyncio.create_task( self._checker_loop(), name="leasepool-checker", ) self._started = True except Exception: executors: list[Executor] = [] with self._lock: executors.extend(self._available) executors.extend(record.executor for record in self._leased.values()) self._available.clear() self._leased.clear() for executor in executors: self._shutdown_executor(executor) self._stop_process_logging_if_needed() raise self._logger.info( "LeasedExecutorManager started backend=%s total=%s available=%s " "leased=%s target=%s", self._backend.value, self.total_count, self.available_count, self.leased_count, self.desired_executor_count(), )
[docs] async def stop(self) -> None: self._stopping = True if self._checker_task: self._checker_task.cancel() try: await self._checker_task except asyncio.CancelledError: pass finally: self._checker_task = None executors: list[Executor] = [] with self._lock: executors.extend(self._available) executors.extend(record.executor for record in self._leased.values()) self._available.clear() self._leased.clear() self._started = False for executor in executors: self._shutdown_executor(executor) self._stop_process_logging_if_needed() self._wake_waiters() self._logger.info("LeasedExecutorManager stopped")
[docs] async def acquire( self, *, lease_seconds: float | None = None, owner: str | None = None, wait: bool = True, timeout: float | None = None, ) -> ExecutorLease: """Acquire an executor lease. Args: lease_seconds (float | None, optional): The duration of the lease in seconds. Defaults to None. owner (str | None, optional): The owner of the lease. Defaults to None. wait (bool, optional): Whether to wait for an available executor. Defaults to True. timeout (float | None, optional): The maximum time to wait for an available executor in seconds. Defaults to None. Raises: LeasePoolNotStartedError: If the lease pool is not started. ValueError: If the lease_seconds is not positive. LeaseUnavailableError: If no executor is available and wait is False. TimeoutError: If the timeout is reached while waiting for an executor. LeasePoolNotStartedError: If the lease pool is not started. TimeoutError: If the timeout is reached while waiting for an executor. Returns: ExecutorLease: The acquired executor lease. """ if not self._started: raise LeasePoolNotStartedError("LeasedExecutorManager is not started") if lease_seconds is None: lease_seconds = self._default_lease_seconds lease_seconds = float(lease_seconds) if lease_seconds <= 0: raise ValueError("lease_seconds must be > 0") started_at = time.monotonic() while True: with self._lock: self._revoke_expired_leases_locked() executor = self._take_or_create_available_locked() if executor is not None: lease_id = uuid.uuid4().hex leased_at = time.monotonic() self._leased[lease_id] = _LeaseRecord( lease_id=lease_id, executor=executor, owner=owner, leased_at=leased_at, lease_seconds=lease_seconds, grace_seconds=self._lease_grace_seconds, ) self._logger.info( "Leased executor backend=%s lease_id=%s owner=%s " "seconds=%s grace=%s", self._backend.value, lease_id, owner, lease_seconds, self._lease_grace_seconds, ) return ExecutorLease( manager=self, lease_id=lease_id, owner=owner, lease_seconds=lease_seconds, grace_seconds=self._lease_grace_seconds, leased_at=leased_at, ) if not wait: raise LeaseUnavailableError( f"No executor available; max_pools={self._max_pools} reached" ) remaining: float | None if timeout is not None: elapsed = time.monotonic() - started_at remaining = timeout - elapsed if remaining <= 0: raise TimeoutError( f"Timed out waiting for an executor after {timeout}s" ) else: remaining = None if self._availability_event is None: raise LeasePoolNotStartedError("LeasedExecutorManager is not started") try: await asyncio.wait_for( self._availability_event.wait(), timeout=remaining, ) except asyncio.TimeoutError as exc: raise TimeoutError( f"Timed out waiting for an executor after {timeout}s" ) from exc finally: self._availability_event.clear()
[docs] async def release(self, lease_id: str) -> None: """Release an executor lease. Args: lease_id (str): The ID of the lease to release. """ executor: Executor | None = None should_keep = False with self._lock: record = self._leased.pop(lease_id, None) if record is None: return executor = record.executor if self._stopping: should_keep = False else: target = self._desired_executor_count_locked() projected_total = self._current_count_locked() + 1 should_keep = projected_total <= target if should_keep: self._available.append(executor) self._logger.info( "Released executor lease_id=%s owner=%s kept=%s", lease_id, record.owner, should_keep, ) self._ensure_minimum_locked() if executor is not None and not should_keep: self._shutdown_executor(executor) self._wake_waiters()
[docs] def notify_scale_changed(self) -> None: """Inform the checker that the size signal changed.""" self._wake_scale_checker()
[docs] def desired_executor_count(self) -> int: with self._lock: return self._desired_executor_count_locked()
@property def available_count(self) -> int: with self._lock: return len(self._available) @property def leased_count(self) -> int: with self._lock: return len(self._leased) @property def total_count(self) -> int: with self._lock: return self._current_count_locked()
[docs] def stats(self) -> dict[str, Any]: """Get statistics about the executor manager. Returns: dict[str, Any]: A dictionary containing statistics about the executor manager. """ with self._lock: now = time.monotonic() return { "backend": self._backend.value, "desired": self._desired_executor_count_locked(), "max_pools": self._max_pools, "min_pools": self._min_pools, "available": len(self._available), "leased": len(self._leased), "total": self._current_count_locked(), "workers_per_pool": self._workers_per_pool, "leases": [ { "lease_id": record.lease_id, "owner": record.owner, "seconds_until_soft_expiry": max( 0.0, record.soft_expires_at - now, ), "seconds_until_hard_expiry": max( 0.0, record.hard_expires_at - now, ), } for record in self._leased.values() ], }
async def _checker_loop(self) -> None: """Checker loop for managing executor leases.""" try: while True: with self._lock: self._revoke_expired_leases_locked() self._ensure_minimum_locked() self._shrink_idle_if_above_target_locked() next_expiry_delay = self._seconds_until_next_hard_expiry_locked() self._wake_waiters() wait_seconds = self._check_interval if next_expiry_delay is not None: wait_seconds = min(wait_seconds, max(0.0, next_expiry_delay)) assert self._scale_change_event is not None try: await asyncio.wait_for( self._scale_change_event.wait(), timeout=wait_seconds, ) except asyncio.TimeoutError: pass finally: self._scale_change_event.clear() except asyncio.CancelledError: raise except Exception: self._logger.exception("LeasedExecutorManager checker crashed") def _submit_for_lease( self, lease_id: str, fn: Callable[..., Any], *args: Any, **kwargs: Any, ) -> Future: """Submit a task for execution under a lease. Args: lease_id (str): _description_ fn (Callable[..., Any]): _description_ Raises: LeaseExpiredError: _description_ LeaseExpiredError: _description_ Returns: Future: _description_ """ executor_to_shutdown: Executor | None = None with self._lock: record = self._leased.get(lease_id) if record is None: raise LeaseExpiredError("Executor lease is no longer active") now = time.monotonic() if now >= record.hard_expires_at: self._leased.pop(lease_id, None) executor_to_shutdown = record.executor self._ensure_minimum_locked() else: return record.executor.submit(fn, *args, **kwargs) if executor_to_shutdown is not None: self._shutdown_executor(executor_to_shutdown) self._wake_waiters() raise LeaseExpiredError("Executor lease expired and was revoked") def _take_or_create_available_locked(self) -> Executor | None: """Take an available executor or create a new one if possible. Returns: Executor | None: An available executor or None if the maximum number of pools is reached. """ if self._available: return self._available.popleft() if self._current_count_locked() < self._max_pools: return self._create_executor_locked() return None def _ensure_minimum_locked(self) -> None: """Ensure the minimum number of executors are available.""" target = self._desired_executor_count_locked() while self._current_count_locked() < target: self._available.append(self._create_executor_locked()) def _shrink_idle_if_above_target_locked(self) -> None: """Shrink idle executors if the current count is above the target.""" target = self._desired_executor_count_locked() while self._available and self._current_count_locked() > target: executor = self._available.pop() self._shutdown_executor(executor) def _revoke_expired_leases_locked(self) -> None: """Revoke expired executor leases.""" now = time.monotonic() expired_ids = [ lease_id for lease_id, record in self._leased.items() if now >= record.hard_expires_at ] for lease_id in expired_ids: record = self._leased.pop(lease_id) self._logger.warning( "Revoking expired executor lease_id=%s owner=%s " "lease_seconds=%s grace=%s", lease_id, record.owner, record.lease_seconds, record.grace_seconds, ) self._shutdown_executor(record.executor) if expired_ids: self._ensure_minimum_locked() def _seconds_until_next_hard_expiry_locked(self) -> float | None: """Calculate the seconds until the next hard expiry of any leased executor. Returns: float | None: The number of seconds until the next hard expiry, or None if there are no leased executors. """ if not self._leased: return None now = time.monotonic() return min(record.hard_expires_at - now for record in self._leased.values()) def _desired_executor_count_locked(self) -> int: """Calculate the desired number of executors based on the current unit count. Returns: int: The desired number of executors. """ unit_count = self._unit_count_locked() unit_based = math.ceil(unit_count / self._units_per_pool) desired = max(self._min_pools, unit_based) return min(self._max_pools, desired) def _unit_count_locked(self) -> int: """Get the current unit count from the size provider. Returns: int: The current unit count. """ if self._size_provider is None: return 0 try: return max(0, int(self._size_provider())) except Exception: self._logger.debug("Could not read size_provider", exc_info=True) return 0 def _current_count_locked(self) -> int: """Get the current count of executors. Returns: int: The current count of executors. """ return len(self._available) + len(self._leased) def _start_process_logging_if_needed(self) -> None: """Start the parent-side process logging bridge, if configured.""" if self._backend is not ExecutorBackend.PROCESS: return if not self._process_logging.enabled: return if self._process_log_listener is not None: return mp_context = self._executor_kwargs.get("mp_context") if mp_context is None: # ProcessPoolExecutor uses spawn automatically when # max_tasks_per_child is supplied. I will use the same context for the # logging queue to avoid cross-context SemLock errors. if self._executor_kwargs.get("max_tasks_per_child") is not None: mp_context = multiprocessing.get_context("spawn") else: mp_context = multiprocessing.get_context() self._process_log_mp_context = mp_context self._process_log_queue = mp_context.Queue(-1) target_logger = self._process_logging.target_logger or self._logger self._process_log_listener = build_queue_listener( log_queue=self._process_log_queue, target_logger=target_logger, ) self._process_log_listener.start() self._logger.debug( "Process log forwarding started target_logger=%s level=%s", target_logger.name, self._process_log_level, ) def _stop_process_logging_if_needed(self) -> None: """Stop the parent-side process logging bridge, if running.""" listener = self._process_log_listener if listener is not None: try: listener.stop() finally: self._process_log_listener = None queue = self._process_log_queue if queue is not None: try: queue.close() except Exception: self._logger.debug("Could not close process log queue", exc_info=True) try: queue.join_thread() except Exception: self._logger.debug( "Could not join process log queue thread", exc_info=True, ) finally: self._process_log_queue = None self._process_log_mp_context = None def _executor_kwargs_for_new_executor(self) -> dict[str, Any]: """Build executor kwargs, composing process-log initializer if needed.""" kwargs = dict(self._executor_kwargs) if self._backend is not ExecutorBackend.PROCESS: return kwargs if not self._process_logging.enabled: return kwargs log_queue = self._process_log_queue if log_queue is None: raise RuntimeError("process log forwarding has not been started") user_initializer = kwargs.pop("initializer", None) user_initargs = tuple(kwargs.pop("initargs", ())) kwargs["initializer"] = process_worker_initializer kwargs["initargs"] = ( log_queue, self._process_log_level, self._process_logging.clear_child_handlers, user_initializer, user_initargs, ) if "mp_context" not in kwargs and self._process_log_mp_context is not None: kwargs["mp_context"] = self._process_log_mp_context return kwargs def _create_executor_locked(self) -> Executor: """Create a new executor instance. Returns: Executor: The newly created executor. """ self._executor_seq += 1 executor = build_executor( backend=self._backend, max_workers=self._workers_per_pool, name_prefix=self._name_prefix, executor_seq=self._executor_seq, executor_kwargs=self._executor_kwargs_for_new_executor(), ) self._logger.info( "Created executor backend=%s seq=%s workers=%s", self._backend.value, self._executor_seq, self._workers_per_pool, ) return executor @staticmethod def _shutdown_executor(executor: Executor) -> None: """Shutdown the given executor. Args: executor (Executor): The executor to shutdown. """ executor.shutdown(wait=False, cancel_futures=True) def _wake_waiters(self) -> None: """Wake up any waiters waiting for an available executor.""" event = self._availability_event loop = self._loop if event is None or loop is None or loop.is_closed(): return loop.call_soon_threadsafe(event.set) def _wake_scale_checker(self) -> None: """Wake up the scale checker to re-evaluate the desired executor count.""" event = self._scale_change_event loop = self._loop if event is None or loop is None or loop.is_closed(): return loop.call_soon_threadsafe(event.set)