Coverage for src / tracekit / batch / analyze.py: 94%

47 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Multi-file batch analysis with parallel execution support. 

2 

3 

4This module provides parallel batch processing of signal files using 

5concurrent.futures for efficient multi-core utilization. 

6""" 

7 

8from collections.abc import Callable 

9from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed 

10from pathlib import Path 

11from typing import Any 

12 

13import pandas as pd 

14 

15 

16def batch_analyze( 

17 files: list[str | Path], 

18 analysis_fn: Callable[[str | Path], dict[str, Any]], 

19 *, 

20 parallel: bool = False, 

21 workers: int | None = None, 

22 progress_callback: Callable[[int, int, str], None] | None = None, 

23 use_threads: bool = False, 

24 **config: Any, 

25) -> pd.DataFrame: 

26 """Analyze multiple files with the same analysis configuration. 

27 

28 : Multi-file analysis with parallel execution support 

29 via concurrent.futures. Returns aggregated results as a DataFrame for 

30 easy statistical analysis and export. 

31 

32 Args: 

33 files: List of file paths to analyze 

34 analysis_fn: Analysis function to apply to each file. 

35 Must accept a file path and return a dict of results. 

36 parallel: Enable parallel processing (default: False) 

37 workers: Number of parallel workers (default: CPU count) 

38 progress_callback: Optional callback for progress updates. 

39 Called with (current, total, filename) after each file. 

40 use_threads: Use ThreadPoolExecutor instead of ProcessPoolExecutor 

41 (useful for I/O-bound tasks, default: False) 

42 **config: Additional keyword arguments passed to analysis_fn 

43 

44 Returns: 

45 DataFrame with one row per file, columns from analysis results. 

46 Always includes a 'file' column with the input filename. 

47 

48 Examples: 

49 >>> import tracekit as tk 

50 >>> import glob 

51 >>> files = glob.glob('captures/*.wfm') 

52 >>> results = tk.batch_analyze( 

53 ... files, 

54 ... analysis_fn=tk.characterize_buffer, 

55 ... parallel=True, 

56 ... workers=4 

57 ... ) 

58 >>> print(results[['file', 'rise_time', 'fall_time', 'status']]) 

59 >>> results.to_csv('batch_results.csv') 

60 

61 Notes: 

62 - Use parallel=True for CPU-bound analysis functions 

63 - Use use_threads=True for I/O-bound operations (file loading) 

64 - Progress callback is called from worker threads/processes 

65 - All exceptions during analysis are caught and stored in 'error' column 

66 

67 References: 

68 BATCH-001: Multi-File Analysis 

69 """ 

70 if not files: 

71 return pd.DataFrame() 

72 

73 # Wrapper to include config in analysis calls 

74 def _wrapped_analysis(filepath: str | Path) -> dict[str, Any]: 

75 try: 

76 result = analysis_fn(filepath, **config) 

77 # Ensure result is a dict 

78 if not isinstance(result, dict): 

79 result = {"result": result} # type: ignore[unreachable] 

80 result["file"] = str(filepath) 

81 result["error"] = None 

82 return result 

83 except Exception as e: 

84 # Return error info on failure 

85 return { 

86 "file": str(filepath), 

87 "error": str(e), 

88 } 

89 

90 results: list[dict[str, Any]] = [] 

91 total = len(files) 

92 

93 if parallel: 

94 # Use concurrent.futures for parallel execution 

95 executor_class = ThreadPoolExecutor if use_threads else ProcessPoolExecutor 

96 with executor_class(max_workers=workers) as executor: 

97 # Submit all tasks 

98 future_to_file = {executor.submit(_wrapped_analysis, f): f for f in files} 

99 

100 # Process results as they complete 

101 for i, future in enumerate(as_completed(future_to_file), 1): 

102 filepath = future_to_file[future] 

103 try: 

104 result = future.result() 

105 results.append(result) 

106 

107 if progress_callback: 

108 progress_callback(i, total, str(filepath)) 

109 except Exception as e: 

110 # Catch execution errors 

111 results.append( 

112 { 

113 "file": str(filepath), 

114 "error": f"Execution error: {e}", 

115 } 

116 ) 

117 

118 else: 

119 # Sequential processing 

120 for i, filepath in enumerate(files, 1): 

121 result = _wrapped_analysis(filepath) 

122 results.append(result) 

123 

124 if progress_callback: 

125 progress_callback(i, total, str(filepath)) 

126 

127 # Convert to DataFrame 

128 df = pd.DataFrame(results) 

129 

130 # Reorder columns: file first, error last 

131 cols = df.columns.tolist() 

132 if "file" in cols: 132 ↛ 135line 132 didn't jump to line 135 because the condition on line 132 was always true

133 cols.remove("file") 

134 cols = ["file", *cols] 

135 if "error" in cols: 135 ↛ 139line 135 didn't jump to line 139 because the condition on line 135 was always true

136 cols.remove("error") 

137 cols = [*cols, "error"] 

138 

139 return df[cols]