Source code for scitex_parallel._run

#!/usr/bin/env python3
# File: src/scitex_parallel/_run.py

"""
Parallel execution utilities using ThreadPoolExecutor.

1. Functionality:
   - Runs functions in parallel using ThreadPoolExecutor
   - Handles both single and multiple return values
   - Supports automatic CPU core detection
2. Input:
   - Function to run
   - List of items to process
   - Optional parameters for execution control
3. Output:
   - List of results or concatenated tuple
4. Prerequisites:
   - concurrent.futures
   - tqdm
"""

import multiprocessing
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Callable, List

from tqdm import tqdm


[docs] def run( func: Callable, args_list: List[tuple], n_jobs: int = -1, desc: str = "Processing", ) -> List[Any]: """Runs function in parallel using ThreadPoolExecutor with tuple arguments. Parameters ---------- func : Callable Function to run in parallel args_list : List[tuple] List of argument tuples, each tuple contains arguments for one function call n_jobs : int, optional Number of jobs to run in parallel. -1 means using all processors desc : str, optional Description for progress bar Returns ------- List[Any] Results of parallel execution Examples -------- >>> def add(x, y): ... return x + y >>> args_list = [(1, 4), (2, 5), (3, 6)] >>> run(add, args_list) [5, 7, 9] """ if not args_list: raise ValueError("Args list cannot be empty") if not callable(func): raise ValueError("Func must be callable") # Validate n_jobs before conversion: must be -1 or >= 1 if n_jobs < -1 or n_jobs == 0: raise ValueError("n_jobs must be >= 1 or -1") cpu_count = multiprocessing.cpu_count() n_jobs = cpu_count if n_jobs == -1 else n_jobs if n_jobs > cpu_count: warnings.warn(f"n_jobs ({n_jobs}) is greater than CPU count ({cpu_count})") results = [None] * len(args_list) # Pre-allocate list with ThreadPoolExecutor(max_workers=n_jobs) as executor: futures = { executor.submit(func, *args): idx for idx, args in enumerate(args_list) } for future in tqdm(as_completed(futures), total=len(args_list), desc=desc): idx = futures[future] results[idx] = future.result() # If results contain multiple values (tuples), transpose them if results and isinstance(results[0], tuple): n_vars = len(results[0]) return tuple([result[i] for result in results] for i in range(n_vars)) return results
# EOF