Coverage for partipy/optim.py: 95%

57 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-09 10:32 +0200

1""" 

2Optimize the archetypal analysis objective by block coordiante descent. 

3 

4a) Regularized Nonnegative Least Squares 

5- Paper: A. Cutler and L. Breiman, “Archetypal analysis,” Technometrics, vol. 36, no. 4, pp. 338-347, 1994, doi: 10.1080/00401706.1994.10485840. 

6 

7 

8b) Projected Gradients (PCHA) 

9- Paper: M. Mørup and L. K. Hansen, “Archetypal analysis for machine learning and data mining,” Neurocomputing, vol. 80, pp. 54-63, Mar. 2012, doi: 10.1016/j.neucom.2011.06.033. 

10 

11 

12c) Adapted Frank-Wolfe algorithm 

13- Paper: C. Bauckhage, K. Kersting, F. Hoppe, and C. Thurau, “Archetypal analysis as an autoencoder,” presented at the Workshop “New Challenges in Neural Computation” (NC2) 2015, 2015. Accessed: Feb. 10, 2025. [Online]. Available: https://publica.fraunhofer.de/handle/publica/393337 

14 

15 

16Code adapted from 

17a) https://github.com/nichohelmut/football_results/blob/master/clustering/clustering.py 

18b) https://github.com/atmguille/archetypal-analysis (by Guillermo García Cobo) 

19""" 

20 

21import numpy as np 

22from numba import float32, int32, jit, prange 

23from scipy.optimize import nnls 

24 

25from .const import LAMBDA 

26 

27 

28def _inspect_array(arr: np.ndarray) -> dict: 

29 """ 

30 Return key properties of a NumPy array. 

31 

32 Parameters 

33 ---------- 

34 arr : np.ndarray 

35 The array to inspect. 

36 

37 Returns 

38 ------- 

39 dict 

40 Dictionary containing dtype, ndim, layout, writeability, and alignment. 

41 """ 

42 layout = "C" if arr.flags["C_CONTIGUOUS"] else "F" if arr.flags["F_CONTIGUOUS"] else "A" 

43 return { 

44 "dtype": str(arr.dtype), 

45 "ndim": arr.ndim, 

46 "layout": layout, 

47 "read_only": not arr.flags["WRITEABLE"], 

48 "aligned": arr.flags["ALIGNED"], 

49 } 

50 

51 

52@jit([float32(float32[:, :], float32[:, :], float32[:, :])], nopython=True, cache=True, fastmath=True) # pragma: no cover 

53def _compute_RSS_AZ(X, A, Z): 

54 X = np.ascontiguousarray(X) 

55 Z = np.ascontiguousarray(Z) 

56 A = np.ascontiguousarray(A) 

57 diff = X - np.dot(A, Z) 

58 return np.sum(diff * diff) 

59 

60 

61@jit([float32(float32[:, :], float32[:, :], float32[:, :], float32[:, :])], nopython=True, cache=True, fastmath=True) # pragma: no cover 

62def _compute_RSS_ABX(X, A, B, WX): 

63 X = np.ascontiguousarray(X) 

64 A = np.ascontiguousarray(A) 

65 B = np.ascontiguousarray(B) 

66 WX = np.ascontiguousarray(WX) 

67 diff = WX - np.dot(A, np.dot(B, X)) 

68 return np.sum(diff * diff) 

69 

70 

71def _compute_A_regularized_nnls( 

72 X: np.ndarray, 

73 Z: np.ndarray, 

74 A: np.ndarray | None = None, 

75) -> np.ndarray: 

76 # huge_constant is added as a new column to account for w norm constraint 

77 X_padded = np.hstack([X, (LAMBDA * np.ones(X.shape[0]))[:, None]]) 

78 Zt_padded = np.vstack([Z.T, LAMBDA * np.ones(Z.shape[0])]) 

79 

80 # Use non-negative least squares to solve the optimization problem 

81 A = np.array([nnls(A=Zt_padded, b=X_padded[n, :], maxiter=5 * Zt_padded.shape[1])[0] for n in range(X.shape[0])]) 

82 A = A.astype(np.float32) 

83 return A 

84 

85 

86def _compute_B_regularized_nnls( 

87 X: np.ndarray, 

88 A: np.ndarray, 

89 B: np.ndarray | None = None, 

90 WX: np.ndarray = None, # type: ignore[assignment] 

91) -> np.ndarray: 

92 # works for weighted and unweighted version 

93 if WX is None: 

94 WX = X 

95 else: 

96 assert (WX.shape[0] == X.shape[0]) and (WX.shape[1] == X.shape[1]) 

97 Z = np.linalg.lstsq(a=A, b=WX, rcond=None)[0] 

98 Z_padded = np.hstack([Z, (LAMBDA * np.ones(Z.shape[0]))[:, None]]) 

99 Xt_padded = np.vstack([X.T, LAMBDA * np.ones(X.shape[0])]) # this should actually be precomputed once 

100 B = np.array([nnls(A=Xt_padded, b=Z_padded[k, :], maxiter=5 * Xt_padded.shape[1])[0] for k in range(Z.shape[0])]) 

101 B = B.astype(np.float32) 

102 return B 

103 

104 

105def _compute_A_projected_gradients( 

106 X: np.ndarray, 

107 Z: np.ndarray, 

108 A: np.ndarray, 

109 derivative_max_iter: int | np.int32 = 40, 

110 rel_tol_ls: float | np.float32 = 1e-3, 

111 rel_tol_conv: float | np.float32 = 1e-4, 

112) -> np.ndarray: 

113 """Updates the A matrix given the data matrix X and the archetypes Z. 

114 

115 A is the matrix that provides the best convex approximation of X by Z. 

116 

117 Parameters 

118 ---------- 

119 X : numpy 2d-array 

120 Data matrix with shape (n_samples, n_features). 

121 

122 Z : numpy 2d-array 

123 Archetypes matrix with shape (n_archetypes, n_features). 

124 

125 A : numpy 2d-array 

126 A matrix with shape (n_samples, n_archetypes). 

127 

128 derivative_max_iter: int 

129 Maximum number of steps for optimization 

130 

131 Returns 

132 ------- 

133 A : numpy 2d-array 

134 Updated A matrix with shape (n_samples, n_archetypes). 

135 """ 

136 # check and other things 

137 assert (rel_tol_ls >= 0) and (rel_tol_conv >= 0) 

138 

139 # ensure correct data type for parameters 

140 derivative_max_iter = np.int32(derivative_max_iter) 

141 rel_tol_ls = np.float32(rel_tol_ls) 

142 rel_tol_conv = np.float32(rel_tol_conv) 

143 

144 # optimize A 

145 A = _compute_A_projected_gradients_jit( 

146 X=X, Z=Z, A=A, derivative_max_iter=derivative_max_iter, rel_tol_ls=rel_tol_ls, rel_tol_conv=rel_tol_conv 

147 ) 

148 return A 

149 

150 

151@jit( 

152 [float32[:, :](float32[:, :], float32[:, :], float32[:, :], int32, float32, float32)], 

153 nopython=True, 

154 cache=True, 

155 fastmath=True, 

156) # pragma: no cover 

157def _compute_A_projected_gradients_jit( 

158 X: np.ndarray, 

159 Z: np.ndarray, 

160 A: np.ndarray, 

161 derivative_max_iter: np.int32, 

162 rel_tol_ls: np.float32, 

163 rel_tol_conv: np.float32, 

164) -> np.ndarray: 

165 # initialize learning rate 

166 mu = np.float32(1.0) 

167 min_mu = np.float32(1e-6) 

168 

169 # explicitly making sure everything is contiguous (C-roder) 

170 X = np.ascontiguousarray(X) 

171 Z = np.ascontiguousarray(Z) 

172 A = np.ascontiguousarray(A) 

173 

174 # terms that can be pre-computed 

175 XZT = np.dot(X, Z.T) 

176 ZZT = np.dot(Z, Z.T) 

177 

178 for _ in range(derivative_max_iter): 

179 # make sure to multiply things in the right order to keep matrix sizes minimal 

180 G = np.float32(2.0) * (np.dot(A, ZZT) - XZT) # G has shape N x K 

181 G = G - np.sum(A * G, axis=1)[:, None] # chain rule of projection 

182 

183 # line search for optimal step size 

184 prev_RSS = _compute_RSS_AZ(X=X, A=A, Z=Z) 

185 prev_A = A 

186 for _ in range(20): 

187 A = (prev_A - mu * G).clip(0) 

188 A = A / (np.sum(A, axis=1)[:, None] + np.finfo(np.float32).eps) # Avoid division by zero 

189 RSS = _compute_RSS_AZ(X=X, A=A, Z=Z) 

190 if RSS <= (prev_RSS * (1 + rel_tol_ls)): 

191 mu *= np.float32(1.2) 

192 break 

193 else: 

194 mu *= np.float32(0.5) 

195 

196 if mu < min_mu: 

197 # Use the current A (even if not optimal) or revert to prev_A 

198 # depending on which gives better RSS 

199 if RSS > prev_RSS: 

200 A = prev_A 

201 break 

202 

203 # check for convergence 

204 if (np.abs(prev_RSS - RSS) / prev_RSS) < rel_tol_conv: 

205 break 

206 

207 A = np.ascontiguousarray(A) # TODO: check if we need this line 

208 return A 

209 

210 

211def _compute_B_projected_gradients( 

212 X: np.ndarray, 

213 A: np.ndarray, 

214 B: np.ndarray, 

215 WX: np.ndarray = None, # type: ignore[assignment] 

216 derivative_max_iter: int | np.int32 = 40, 

217 rel_tol_ls: float | np.float32 = 1e-3, 

218 rel_tol_conv: float | np.float32 = 1e-4, 

219) -> np.ndarray: 

220 """Updates the B matrix given the data matrix X and the A matrix. 

221 

222 Parameters 

223 ---------- 

224 X : numpy 2d-array 

225 Data matrix with shape (n_samples, n_features). 

226 

227 A : numpy 2d-array 

228 A matrix with shape (n_samples, n_archetypes). 

229 

230 B : numpy 2d-array 

231 B matrix with shape (n_archetypes, n_samples). 

232 

233 WX : numpy 2d-array, optional 

234 Weighted data matrix with shape (n_samples, n_features). 

235 

236 derivative_max_iter: int 

237 Maximum number of steps for optimization 

238 

239 Returns 

240 ------- 

241 B : numpy 2d-array 

242 Updated B matrix with shape (n_archetypes, n_samples). 

243 """ 

244 # checks 

245 assert (rel_tol_ls >= 0) and (rel_tol_conv >= 0) 

246 

247 # ensure correct data type for parameters 

248 derivative_max_iter = np.int32(derivative_max_iter) 

249 rel_tol_ls = np.float32(rel_tol_ls) 

250 rel_tol_conv = np.float32(rel_tol_conv) 

251 

252 # works for weighted and unweighted version 

253 if WX is None: 

254 WX = X 

255 else: 

256 assert (WX.shape[0] == X.shape[0]) and (WX.shape[1] == X.shape[1]) 

257 

258 # optimize B 

259 B = _compute_B_projected_gradients_jit( 

260 X=X, A=A, B=B, WX=WX, derivative_max_iter=derivative_max_iter, rel_tol_ls=rel_tol_ls, rel_tol_conv=rel_tol_conv 

261 ) 

262 return B 

263 

264 

265@jit( 

266 [float32[:, :](float32[:, :], float32[:, :], float32[:, :], float32[:, :], int32, float32, float32)], 

267 nopython=True, 

268 cache=True, 

269 fastmath=True, 

270) # pragma: no cover 

271def _compute_B_projected_gradients_jit( 

272 X: np.ndarray, 

273 A: np.ndarray, 

274 B: np.ndarray, 

275 WX: np.ndarray, 

276 derivative_max_iter: np.int32, 

277 rel_tol_ls: np.float32, 

278 rel_tol_conv: np.float32, 

279) -> np.ndarray: 

280 # initialize learning rates 

281 mu = np.float32(1.0) 

282 min_mu = np.float32(1e-6) 

283 

284 # explicitly making sure everything is contiguous (C-order) 

285 X = np.ascontiguousarray(X) 

286 A = np.ascontiguousarray(A) 

287 B = np.ascontiguousarray(B) 

288 WX = np.ascontiguousarray(WX) 

289 

290 # terms that can be pre-computed 

291 AT_WX_XT = np.dot(np.dot(A.T, WX), X.T) 

292 AT_A = np.dot(A.T, A) 

293 

294 for _ in range(derivative_max_iter): 

295 # make sure to multiply things in the right order to keep matrix sizes minimal 

296 G = np.float32(2.0) * (np.dot(np.dot(AT_A, np.dot(B, X)), X.T) - AT_WX_XT) # G has shape K x N 

297 G = G - np.sum(B * G, axis=1)[:, None] # chain rule of projection 

298 

299 # line search for optimal step size 

300 prev_RSS = _compute_RSS_ABX(X=X, A=A, B=B, WX=WX) 

301 prev_B = B 

302 for _ in range(20): 

303 B = (prev_B - mu * G).clip(0) 

304 B = B / (np.sum(B, axis=1)[:, None] + np.finfo(np.float32).eps) # avoid division by zero 

305 RSS = _compute_RSS_ABX(X=X, A=A, B=B, WX=WX) 

306 if RSS <= (prev_RSS * (1 + rel_tol_ls)): 

307 mu *= np.float32(1.2) 

308 break 

309 else: 

310 mu *= np.float32(0.5) 

311 

312 if mu < min_mu: 

313 # Use the current B (even if not optimal) or revert to prev_B 

314 # depending on which gives better RSS 

315 if RSS > prev_RSS: 

316 B = prev_B 

317 break 

318 

319 # check for convergence 

320 if (np.abs(prev_RSS - RSS) / prev_RSS) < rel_tol_conv: 

321 break 

322 

323 B = np.ascontiguousarray(B) # TODO: check if we need this line 

324 return B 

325 

326 

327@jit(nopython=True, cache=True, fastmath=True) # pragma: no cover 

328def _add_argmins_per_row(mtx, argmins, mu): 

329 for idx in range(len(argmins)): 

330 mtx[idx, argmins[idx]] += mu 

331 return mtx 

332 

333 

334# NOTE: can lead to kernel crashed in Jupyter notebooks... 

335@jit(nopython=True, cache=True, parallel=True) # pragma: no cover 

336def _add_argmins_per_row_p(mtx, argmins, mu): 

337 for idx in prange(len(argmins)): 

338 mtx[idx, argmins[idx]] += mu 

339 return mtx 

340 

341 

342def _compute_A_frank_wolfe( 

343 X: np.ndarray, 

344 Z: np.ndarray, 

345 A: np.ndarray, 

346 derivative_max_iter: int | np.int32 = 80, 

347 rel_tol_ls: float | np.float32 = 1e-3, 

348 rel_tol_conv: float | np.float32 = 1e-4, 

349) -> np.ndarray: 

350 # check and other things 

351 assert (rel_tol_ls >= 0) and (rel_tol_conv >= 0) 

352 

353 # ensure correct data type for parameters 

354 derivative_max_iter = np.int32(derivative_max_iter) 

355 rel_tol_ls = np.float32(rel_tol_ls) 

356 rel_tol_conv = np.float32(rel_tol_conv) 

357 

358 # optimize A 

359 A = _compute_A_frank_wolfe_jit( 

360 X=X, Z=Z, A=A, derivative_max_iter=derivative_max_iter, rel_tol_ls=rel_tol_ls, rel_tol_conv=rel_tol_conv 

361 ) 

362 return A 

363 

364 

365@jit( 

366 [float32[:, :](float32[:, :], float32[:, :], float32[:, :], int32, float32, float32)], 

367 nopython=True, 

368 cache=True, 

369 fastmath=True, 

370) # pragma: no cover 

371def _compute_A_frank_wolfe_jit( 

372 X: np.ndarray, 

373 Z: np.ndarray, 

374 A: np.ndarray, 

375 derivative_max_iter: np.int32, 

376 rel_tol_ls: np.float32, 

377 rel_tol_conv: np.float32, 

378) -> np.ndarray: 

379 # explicitly making sure everything is contiguous (C-roder) 

380 X = np.ascontiguousarray(X) 

381 Z = np.ascontiguousarray(Z) 

382 A = np.ascontiguousarray(A) 

383 

384 # terms that can be pre-computed 

385 XZT = np.dot(X, Z.T) 

386 ZZT = np.dot(Z, Z.T) 

387 

388 for iter in range(derivative_max_iter): 

389 # frank wolfe step size 

390 mu = np.float32(2 / (iter + 2)) 

391 

392 # compute the gradient 

393 G = np.float32(2.0) * (np.dot(A, ZZT) - XZT) # G has shape N x K 

394 

395 # for each sample, get the archetype column with the most negative gradient 

396 argmins = np.argmin(G, axis=1) 

397 

398 # line search 

399 prev_RSS = _compute_RSS_AZ(X=X, A=A, Z=Z) 

400 prev_A = A 

401 for _ in range(20): 

402 A = (np.float32(1.0) - mu) * prev_A 

403 A = _add_argmins_per_row(mtx=A, argmins=argmins, mu=mu) 

404 RSS = _compute_RSS_AZ(X=X, A=A, Z=Z) 

405 if RSS <= (prev_RSS * (1 + rel_tol_ls)): 

406 mu *= np.float32(1.2) 

407 break 

408 else: 

409 mu *= np.float32(0.5) 

410 

411 # check for convergence 

412 if (np.abs(prev_RSS - RSS) / prev_RSS) < rel_tol_conv: 

413 break 

414 

415 return A 

416 

417 

418def _compute_B_frank_wolfe( 

419 X: np.ndarray, 

420 A: np.ndarray, 

421 B: np.ndarray, 

422 WX: np.ndarray = None, # type: ignore[assignment] 

423 derivative_max_iter: int | np.int32 = 80, 

424 rel_tol_ls: float | np.float32 = 1e-3, 

425 rel_tol_conv: float | np.float32 = 1e-4, 

426) -> np.ndarray: 

427 # checks 

428 assert (rel_tol_ls >= 0) and (rel_tol_conv >= 0) 

429 

430 # ensure correct data type for parameters 

431 derivative_max_iter = np.int32(derivative_max_iter) 

432 rel_tol_ls = np.float32(rel_tol_ls) 

433 rel_tol_conv = np.float32(rel_tol_conv) 

434 

435 # works for weighted and unweighted version 

436 if WX is None: 

437 WX = X 

438 else: 

439 assert (WX.shape[0] == X.shape[0]) and (WX.shape[1] == X.shape[1]) 

440 

441 # optimize B 

442 B = _compute_B_frank_wolfe_jit( 

443 X=X, A=A, B=B, WX=WX, derivative_max_iter=derivative_max_iter, rel_tol_ls=rel_tol_ls, rel_tol_conv=rel_tol_conv 

444 ) 

445 return B 

446 

447 

448@jit( 

449 [float32[:, :](float32[:, :], float32[:, :], float32[:, :], float32[:, :], int32, float32, float32)], 

450 nopython=True, 

451 cache=True, 

452 fastmath=True, 

453) # pragma: no cover 

454def _compute_B_frank_wolfe_jit( 

455 X: np.ndarray, 

456 A: np.ndarray, 

457 B: np.ndarray, 

458 WX: np.ndarray, 

459 derivative_max_iter: np.int32, 

460 rel_tol_ls: np.float32, 

461 rel_tol_conv: np.float32, 

462) -> np.ndarray: 

463 # explicitly making sure everything is contiguous (C-order) 

464 X = np.ascontiguousarray(X) 

465 A = np.ascontiguousarray(A) 

466 B = np.ascontiguousarray(B) 

467 WX = np.ascontiguousarray(WX) 

468 

469 # terms that can be pre-computed 

470 AT_WX_XT = np.dot(np.dot(A.T, WX), X.T) 

471 AT_A = np.dot(A.T, A) 

472 

473 for iter in range(derivative_max_iter): 

474 # frank wolfe step size 

475 mu = np.float32(2 / (iter + 2)) 

476 

477 # compute the gradient 

478 G = np.float32(2.0) * (np.dot(np.dot(AT_A, np.dot(B, X)), X.T) - AT_WX_XT) # G has shape K x N 

479 

480 # for each archetype, get the sample column with the most negative gradient 

481 argmins = np.argmin(G, axis=1) 

482 

483 # line search for optimal step size 

484 prev_RSS = _compute_RSS_ABX(X=X, A=A, B=B, WX=WX) 

485 prev_B = B 

486 for _ in range(20): 

487 B = (np.float32(1.0) - mu) * prev_B 

488 B = _add_argmins_per_row(mtx=B, argmins=argmins, mu=mu) 

489 RSS = _compute_RSS_ABX(X=X, A=A, B=B, WX=WX) 

490 if RSS <= (prev_RSS * (1 + rel_tol_ls)): 

491 mu *= np.float32(1.2) 

492 break 

493 else: 

494 mu *= np.float32(0.5) 

495 

496 # check for convergence 

497 if (np.abs(prev_RSS - RSS) / prev_RSS) < rel_tol_conv: 

498 break 

499 

500 return B