Coverage for src/driada/intense/improved_mi_testing.py: 0.00%

195 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-25 15:40 +0300

1""" 

2Improved statistical testing approaches for MI distributions in INTENSE. 

3 

4This module implements better alternatives to the current norm/gamma distribution 

5fitting approach, addressing the fundamental issue of using poorly-fitting distributions. 

6""" 

7 

8import numpy as np 

9from scipy import stats 

10from scipy.stats import rankdata 

11from typing import Tuple, Optional, Dict, Union 

12import warnings 

13 

14 

15def empirical_p_value(observed_value: float, 

16 null_distribution: np.ndarray, 

17 method: str = 'conservative') -> float: 

18 """ 

19 Calculate empirical p-value without assuming any distribution. 

20  

21 This is the most robust approach - it makes no distributional assumptions 

22 and directly uses the empirical distribution of shuffled values. 

23  

24 Parameters 

25 ---------- 

26 observed_value : float 

27 The observed MI value to test. 

28 null_distribution : np.ndarray 

29 Array of MI values from shuffled data (null hypothesis). 

30 method : str, optional 

31 How to handle ties. Options: 

32 - 'conservative': (rank + 1) / (n + 1) - never gives p=0 

33 - 'standard': rank / n - can give p=0 

34 - 'mid-rank': Uses mid-rank for ties 

35 Default: 'conservative' 

36  

37 Returns 

38 ------- 

39 p_value : float 

40 Empirical p-value. 

41  

42 Notes 

43 ----- 

44 This is the most defensible approach statistically as it requires 

45 no assumptions about the distribution of MI values. 

46 """ 

47 n = len(null_distribution) 

48 

49 if method == 'conservative': 

50 # Add 1 to both numerator and denominator to avoid p=0 

51 # This is the (r+1)/(n+1) formula recommended by many statisticians 

52 rank = np.sum(null_distribution >= observed_value) 

53 p_value = (rank + 1) / (n + 1) 

54 elif method == 'standard': 

55 # Simple proportion of values >= observed 

56 rank = np.sum(null_distribution >= observed_value) 

57 p_value = rank / n 

58 elif method == 'mid-rank': 

59 # Handle ties by using mid-rank 

60 all_values = np.concatenate([null_distribution, [observed_value]]) 

61 ranks = rankdata(all_values, method='average') 

62 obs_rank = ranks[-1] 

63 p_value = 1 - (obs_rank - 1) / n 

64 else: 

65 raise ValueError(f"Unknown method: {method}") 

66 

67 return p_value 

68 

69 

70def adaptive_distribution_test(observed_value: float, 

71 null_distribution: np.ndarray, 

72 candidate_distributions: Optional[Dict] = None, 

73 selection_criterion: str = 'aic') -> Tuple[float, str, Dict]: 

74 """ 

75 Adaptively select the best-fitting distribution and compute p-value. 

76  

77 This approach fits multiple candidate distributions and selects the best 

78 one based on goodness-of-fit criteria, then uses it for p-value calculation. 

79  

80 Parameters 

81 ---------- 

82 observed_value : float 

83 The observed MI value to test. 

84 null_distribution : np.ndarray 

85 Array of MI values from shuffled data. 

86 candidate_distributions : dict, optional 

87 Dictionary mapping distribution names to scipy.stats distributions. 

88 If None, uses a default set of candidates. 

89 selection_criterion : str, optional 

90 Criterion for selecting best distribution: 'aic', 'bic', or 'ks'. 

91 Default: 'aic' 

92  

93 Returns 

94 ------- 

95 p_value : float 

96 P-value from the best-fitting distribution. 

97 best_dist : str 

98 Name of the selected distribution. 

99 fit_info : dict 

100 Information about the fit quality for all distributions. 

101 """ 

102 if candidate_distributions is None: 

103 candidate_distributions = { 

104 'gamma': stats.gamma, 

105 'lognorm': stats.lognorm, 

106 'weibull_min': stats.weibull_min, 

107 'exponweib': stats.exponweib, 

108 'beta': stats.beta, # After transformation to [0,1] 

109 'norm': stats.norm # Include for comparison 

110 } 

111 

112 fit_results = {} 

113 n = len(null_distribution) 

114 

115 for dist_name, dist_class in candidate_distributions.items(): 

116 try: 

117 # Special handling for beta distribution 

118 if dist_name == 'beta': 

119 # Transform to [0, 1] range 

120 data_min = np.min(null_distribution) 

121 data_max = np.max(null_distribution) 

122 if data_min == data_max: 

123 continue 

124 transformed_data = (null_distribution - data_min) / (data_max - data_min) 

125 params = dist_class.fit(transformed_data) 

126 

127 # Transform observed value 

128 transformed_obs = (observed_value - data_min) / (data_max - data_min) 

129 # Ensure within bounds 

130 transformed_obs = np.clip(transformed_obs, 0, 1) 

131 

132 # Calculate metrics on transformed data 

133 fitted_dist = dist_class(*params) 

134 log_likelihood = np.sum(dist_class.logpdf(transformed_data, *params)) 

135 ks_stat, ks_p = stats.kstest(transformed_data, fitted_dist.cdf) 

136 p_value = fitted_dist.sf(transformed_obs) 

137 

138 else: 

139 # Standard fitting 

140 if dist_name in ['gamma', 'lognorm', 'weibull_min']: 

141 # These require positive values 

142 if np.any(null_distribution <= 0): 

143 # Add small constant to make positive 

144 shifted_data = null_distribution + 1e-10 

145 shifted_obs = observed_value + 1e-10 

146 params = dist_class.fit(shifted_data, floc=0) 

147 fitted_dist = dist_class(*params) 

148 log_likelihood = np.sum(dist_class.logpdf(shifted_data, *params)) 

149 ks_stat, ks_p = stats.kstest(shifted_data, fitted_dist.cdf) 

150 p_value = fitted_dist.sf(shifted_obs) 

151 else: 

152 params = dist_class.fit(null_distribution, floc=0) 

153 fitted_dist = dist_class(*params) 

154 log_likelihood = np.sum(dist_class.logpdf(null_distribution, *params)) 

155 ks_stat, ks_p = stats.kstest(null_distribution, fitted_dist.cdf) 

156 p_value = fitted_dist.sf(observed_value) 

157 else: 

158 params = dist_class.fit(null_distribution) 

159 fitted_dist = dist_class(*params) 

160 log_likelihood = np.sum(dist_class.logpdf(null_distribution, *params)) 

161 ks_stat, ks_p = stats.kstest(null_distribution, fitted_dist.cdf) 

162 p_value = fitted_dist.sf(observed_value) 

163 

164 # Calculate information criteria 

165 k = len(params) # Number of parameters 

166 aic = 2 * k - 2 * log_likelihood 

167 bic = k * np.log(n) - 2 * log_likelihood 

168 

169 fit_results[dist_name] = { 

170 'params': params, 

171 'p_value': p_value, 

172 'aic': aic, 

173 'bic': bic, 

174 'ks_stat': ks_stat, 

175 'ks_p': ks_p, 

176 'log_likelihood': log_likelihood 

177 } 

178 

179 except Exception as e: 

180 warnings.warn(f"Failed to fit {dist_name}: {e}") 

181 continue 

182 

183 if not fit_results: 

184 raise ValueError("All distribution fits failed") 

185 

186 # Select best distribution 

187 if selection_criterion == 'aic': 

188 best_dist = min(fit_results.keys(), key=lambda d: fit_results[d]['aic']) 

189 elif selection_criterion == 'bic': 

190 best_dist = min(fit_results.keys(), key=lambda d: fit_results[d]['bic']) 

191 elif selection_criterion == 'ks': 

192 best_dist = min(fit_results.keys(), key=lambda d: fit_results[d]['ks_stat']) 

193 else: 

194 raise ValueError(f"Unknown selection criterion: {selection_criterion}") 

195 

196 return fit_results[best_dist]['p_value'], best_dist, fit_results 

197 

198 

199def robust_parametric_test(observed_value: float, 

200 null_distribution: np.ndarray, 

201 min_samples_for_parametric: int = 100) -> Tuple[float, str]: 

202 """ 

203 Use parametric test only when data strongly supports it, otherwise empirical. 

204  

205 This hybrid approach uses parametric testing when: 

206 1. We have enough samples 

207 2. The distribution clearly fits well 

208 Otherwise, it falls back to empirical p-values. 

209  

210 Parameters 

211 ---------- 

212 observed_value : float 

213 The observed MI value to test. 

214 null_distribution : np.ndarray 

215 Array of MI values from shuffled data. 

216 min_samples_for_parametric : int, optional 

217 Minimum number of samples required to attempt parametric fitting. 

218 Default: 100 

219  

220 Returns 

221 ------- 

222 p_value : float 

223 P-value from the selected method. 

224 method_used : str 

225 Description of the method used. 

226 """ 

227 n = len(null_distribution) 

228 

229 # Check if we have enough samples 

230 if n < min_samples_for_parametric: 

231 p_value = empirical_p_value(observed_value, null_distribution, method='conservative') 

232 return p_value, f"empirical (n={n} too small)" 

233 

234 # Try to fit gamma distribution (most theoretically justified) 

235 try: 

236 # Ensure positive values 

237 if np.any(null_distribution <= 0): 

238 null_dist_positive = null_distribution + 1e-10 

239 obs_positive = observed_value + 1e-10 

240 else: 

241 null_dist_positive = null_distribution 

242 obs_positive = observed_value 

243 

244 # Fit gamma 

245 params = stats.gamma.fit(null_dist_positive, floc=0) 

246 fitted_gamma = stats.gamma(*params) 

247 

248 # Check goodness of fit 

249 ks_stat, ks_p = stats.kstest(null_dist_positive, fitted_gamma.cdf) 

250 

251 # Use parametric only if fit is good 

252 if ks_p > 0.1: # Reasonable fit 

253 p_value = fitted_gamma.sf(obs_positive) 

254 return p_value, f"gamma (KS p={ks_p:.3f})" 

255 else: 

256 # Poor fit, use empirical 

257 p_value = empirical_p_value(observed_value, null_distribution, method='conservative') 

258 return p_value, f"empirical (gamma KS p={ks_p:.3f} too low)" 

259 

260 except Exception: 

261 # If fitting fails, use empirical 

262 p_value = empirical_p_value(observed_value, null_distribution, method='conservative') 

263 return p_value, "empirical (gamma fit failed)" 

264 

265 

266def extreme_value_correction(observed_value: float, 

267 null_distribution: np.ndarray, 

268 threshold_percentile: float = 95) -> Tuple[float, str]: 

269 """ 

270 Use extreme value theory for very high MI values. 

271  

272 When the observed MI is in the extreme tail, standard methods may be 

273 unreliable. This uses extreme value theory for better estimates. 

274  

275 Parameters 

276 ---------- 

277 observed_value : float 

278 The observed MI value to test. 

279 null_distribution : np.ndarray 

280 Array of MI values from shuffled data. 

281 threshold_percentile : float, optional 

282 Percentile above which to use extreme value theory. 

283 Default: 95 

284  

285 Returns 

286 ------- 

287 p_value : float 

288 P-value using appropriate method. 

289 method_used : str 

290 Description of the method used. 

291 """ 

292 threshold = np.percentile(null_distribution, threshold_percentile) 

293 

294 if observed_value <= threshold: 

295 # Use standard empirical method 

296 p_value = empirical_p_value(observed_value, null_distribution, method='conservative') 

297 return p_value, "empirical (below extreme threshold)" 

298 

299 # Use extreme value theory 

300 # Fit Generalized Pareto Distribution to exceedances 

301 exceedances = null_distribution[null_distribution > threshold] - threshold 

302 

303 if len(exceedances) < 10: 

304 # Not enough extreme values 

305 p_value = empirical_p_value(observed_value, null_distribution, method='conservative') 

306 return p_value, "empirical (too few exceedances)" 

307 

308 try: 

309 # Fit GPD 

310 params = stats.genpareto.fit(exceedances) 

311 gpd = stats.genpareto(*params) 

312 

313 # Probability of exceeding threshold 

314 p_threshold = len(exceedances) / len(null_distribution) 

315 

316 # Probability of exceeding observed value given exceedance 

317 if observed_value > threshold: 

318 p_exceed_given_threshold = gpd.sf(observed_value - threshold) 

319 p_value = p_threshold * p_exceed_given_threshold 

320 else: 

321 p_value = empirical_p_value(observed_value, null_distribution, method='conservative') 

322 

323 return p_value, f"extreme value theory (threshold={threshold:.3f})" 

324 

325 except Exception: 

326 # If EVT fails, use empirical 

327 p_value = empirical_p_value(observed_value, null_distribution, method='conservative') 

328 return p_value, "empirical (EVT fit failed)" 

329 

330 

331def bootstrap_p_value_confidence(observed_value: float, 

332 null_distribution: np.ndarray, 

333 n_bootstrap: int = 1000, 

334 confidence_level: float = 0.95) -> Tuple[float, Tuple[float, float]]: 

335 """ 

336 Calculate p-value with bootstrap confidence interval. 

337  

338 This provides uncertainty quantification for the p-value estimate, 

339 which is especially important for small sample sizes. 

340  

341 Parameters 

342 ---------- 

343 observed_value : float 

344 The observed MI value to test. 

345 null_distribution : np.ndarray 

346 Array of MI values from shuffled data. 

347 n_bootstrap : int, optional 

348 Number of bootstrap samples. Default: 1000 

349 confidence_level : float, optional 

350 Confidence level for interval. Default: 0.95 

351  

352 Returns 

353 ------- 

354 p_value : float 

355 Point estimate of p-value. 

356 confidence_interval : tuple 

357 (lower, upper) bounds of confidence interval. 

358 """ 

359 n = len(null_distribution) 

360 bootstrap_p_values = [] 

361 

362 for _ in range(n_bootstrap): 

363 # Bootstrap sample from null distribution 

364 bootstrap_sample = np.random.choice(null_distribution, size=n, replace=True) 

365 

366 # Calculate empirical p-value 

367 p_boot = empirical_p_value(observed_value, bootstrap_sample, method='conservative') 

368 bootstrap_p_values.append(p_boot) 

369 

370 # Point estimate (original p-value) 

371 p_value = empirical_p_value(observed_value, null_distribution, method='conservative') 

372 

373 # Confidence interval 

374 alpha = 1 - confidence_level 

375 lower = np.percentile(bootstrap_p_values, 100 * alpha / 2) 

376 upper = np.percentile(bootstrap_p_values, 100 * (1 - alpha / 2)) 

377 

378 return p_value, (lower, upper) 

379 

380 

381class ImprovedMITesting: 

382 """ 

383 Improved MI testing framework that addresses the distribution fitting issues. 

384  

385 This class provides multiple testing strategies and automatically selects 

386 the most appropriate one based on the data characteristics. 

387 """ 

388 

389 def __init__(self, 

390 method: str = 'auto', 

391 min_samples_for_parametric: int = 100, 

392 extreme_value_threshold: float = 95): 

393 """ 

394 Initialize improved MI testing framework. 

395  

396 Parameters 

397 ---------- 

398 method : str, optional 

399 Testing method to use. Options: 

400 - 'auto': Automatically select best method 

401 - 'empirical': Always use empirical p-values 

402 - 'adaptive': Use adaptive distribution selection 

403 - 'robust': Use robust parametric/empirical hybrid 

404 - 'extreme': Use extreme value theory when appropriate 

405 Default: 'auto' 

406 min_samples_for_parametric : int, optional 

407 Minimum samples required for parametric methods. Default: 100 

408 extreme_value_threshold : float, optional 

409 Percentile threshold for extreme value theory. Default: 95 

410 """ 

411 self.method = method 

412 self.min_samples_for_parametric = min_samples_for_parametric 

413 self.extreme_value_threshold = extreme_value_threshold 

414 

415 def compute_p_value(self, 

416 observed_value: float, 

417 null_distribution: np.ndarray, 

418 return_details: bool = False) -> Union[float, Tuple[float, Dict]]: 

419 """ 

420 Compute p-value using the configured method. 

421  

422 Parameters 

423 ---------- 

424 observed_value : float 

425 The observed MI value. 

426 null_distribution : np.ndarray 

427 Shuffled MI values. 

428 return_details : bool, optional 

429 Whether to return additional details. Default: False 

430  

431 Returns 

432 ------- 

433 p_value : float 

434 The computed p-value. 

435 details : dict (optional) 

436 Additional information about the computation. 

437 """ 

438 details = {} 

439 

440 if self.method == 'empirical': 

441 p_value = empirical_p_value(observed_value, null_distribution, method='conservative') 

442 details['method'] = 'empirical' 

443 

444 elif self.method == 'adaptive': 

445 p_value, best_dist, fit_info = adaptive_distribution_test(observed_value, null_distribution) 

446 details['method'] = f'adaptive ({best_dist})' 

447 details['fit_info'] = fit_info 

448 

449 elif self.method == 'robust': 

450 p_value, method_desc = robust_parametric_test( 

451 observed_value, null_distribution, self.min_samples_for_parametric 

452 ) 

453 details['method'] = f'robust ({method_desc})' 

454 

455 elif self.method == 'extreme': 

456 p_value, method_desc = extreme_value_correction( 

457 observed_value, null_distribution, self.extreme_value_threshold 

458 ) 

459 details['method'] = f'extreme ({method_desc})' 

460 

461 elif self.method == 'auto': 

462 # Automatic method selection based on data characteristics 

463 n = len(null_distribution) 

464 

465 if n < 50: 

466 # Too few samples - use empirical 

467 p_value = empirical_p_value(observed_value, null_distribution, method='conservative') 

468 details['method'] = f'empirical (n={n} too small)' 

469 

470 elif observed_value > np.percentile(null_distribution, 99): 

471 # Extreme value - use EVT 

472 p_value, method_desc = extreme_value_correction( 

473 observed_value, null_distribution, self.extreme_value_threshold 

474 ) 

475 details['method'] = f'extreme/auto ({method_desc})' 

476 

477 else: 

478 # Try robust parametric approach 

479 p_value, method_desc = robust_parametric_test( 

480 observed_value, null_distribution, self.min_samples_for_parametric 

481 ) 

482 details['method'] = f'robust/auto ({method_desc})' 

483 

484 else: 

485 raise ValueError(f"Unknown method: {self.method}") 

486 

487 details['p_value'] = p_value 

488 details['n_shuffles'] = len(null_distribution) 

489 details['observed_value'] = observed_value 

490 

491 if return_details: 

492 return p_value, details 

493 else: 

494 return p_value 

495 

496 

497def compare_testing_methods(observed_value: float, 

498 null_distribution: np.ndarray) -> Dict[str, Dict]: 

499 """ 

500 Compare all available testing methods for diagnostic purposes. 

501  

502 Parameters 

503 ---------- 

504 observed_value : float 

505 The observed MI value. 

506 null_distribution : np.ndarray 

507 Shuffled MI values. 

508  

509 Returns 

510 ------- 

511 comparison : dict 

512 Results from all testing methods. 

513 """ 

514 results = {} 

515 

516 # Current INTENSE approach 

517 from .stats import get_mi_distr_pvalue 

518 results['current_norm'] = { 

519 'p_value': get_mi_distr_pvalue(null_distribution, observed_value, 'norm'), 

520 'method': 'normal distribution (current)' 

521 } 

522 results['current_gamma'] = { 

523 'p_value': get_mi_distr_pvalue(null_distribution, observed_value, 'gamma'), 

524 'method': 'gamma distribution (current)' 

525 } 

526 

527 # Empirical 

528 results['empirical'] = { 

529 'p_value': empirical_p_value(observed_value, null_distribution), 

530 'method': 'empirical (conservative)' 

531 } 

532 

533 # Adaptive 

534 try: 

535 p_val, best_dist, _ = adaptive_distribution_test(observed_value, null_distribution) 

536 results['adaptive'] = { 

537 'p_value': p_val, 

538 'method': f'adaptive ({best_dist})' 

539 } 

540 except: 

541 results['adaptive'] = {'p_value': np.nan, 'method': 'adaptive (failed)'} 

542 

543 # Robust 

544 try: 

545 p_val, method_desc = robust_parametric_test(observed_value, null_distribution) 

546 results['robust'] = { 

547 'p_value': p_val, 

548 'method': f'robust ({method_desc})' 

549 } 

550 except: 

551 results['robust'] = {'p_value': np.nan, 'method': 'robust (failed)'} 

552 

553 # Extreme value 

554 try: 

555 p_val, method_desc = extreme_value_correction(observed_value, null_distribution) 

556 results['extreme'] = { 

557 'p_value': p_val, 

558 'method': f'extreme ({method_desc})' 

559 } 

560 except: 

561 results['extreme'] = {'p_value': np.nan, 'method': 'extreme (failed)'} 

562 

563 # Bootstrap CI 

564 try: 

565 p_val, (ci_low, ci_high) = bootstrap_p_value_confidence(observed_value, null_distribution, n_bootstrap=200) 

566 results['bootstrap'] = { 

567 'p_value': p_val, 

568 'method': f'empirical with 95% CI: [{ci_low:.4f}, {ci_high:.4f}]' 

569 } 

570 except: 

571 results['bootstrap'] = {'p_value': np.nan, 'method': 'bootstrap (failed)'} 

572 

573 return results