Coverage for partipy/optim.py: 95%
57 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-09 10:32 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-09 10:32 +0200
1"""
2Optimize the archetypal analysis objective by block coordiante descent.
4a) Regularized Nonnegative Least Squares
5- Paper: A. Cutler and L. Breiman, “Archetypal analysis,” Technometrics, vol. 36, no. 4, pp. 338-347, 1994, doi: 10.1080/00401706.1994.10485840.
8b) Projected Gradients (PCHA)
9- Paper: M. Mørup and L. K. Hansen, “Archetypal analysis for machine learning and data mining,” Neurocomputing, vol. 80, pp. 54-63, Mar. 2012, doi: 10.1016/j.neucom.2011.06.033.
12c) Adapted Frank-Wolfe algorithm
13- Paper: C. Bauckhage, K. Kersting, F. Hoppe, and C. Thurau, “Archetypal analysis as an autoencoder,” presented at the Workshop “New Challenges in Neural Computation” (NC2) 2015, 2015. Accessed: Feb. 10, 2025. [Online]. Available: https://publica.fraunhofer.de/handle/publica/393337
16Code adapted from
17a) https://github.com/nichohelmut/football_results/blob/master/clustering/clustering.py
18b) https://github.com/atmguille/archetypal-analysis (by Guillermo García Cobo)
19"""
21import numpy as np
22from numba import float32, int32, jit, prange
23from scipy.optimize import nnls
25from .const import LAMBDA
28def _inspect_array(arr: np.ndarray) -> dict:
29 """
30 Return key properties of a NumPy array.
32 Parameters
33 ----------
34 arr : np.ndarray
35 The array to inspect.
37 Returns
38 -------
39 dict
40 Dictionary containing dtype, ndim, layout, writeability, and alignment.
41 """
42 layout = "C" if arr.flags["C_CONTIGUOUS"] else "F" if arr.flags["F_CONTIGUOUS"] else "A"
43 return {
44 "dtype": str(arr.dtype),
45 "ndim": arr.ndim,
46 "layout": layout,
47 "read_only": not arr.flags["WRITEABLE"],
48 "aligned": arr.flags["ALIGNED"],
49 }
52@jit([float32(float32[:, :], float32[:, :], float32[:, :])], nopython=True, cache=True, fastmath=True) # pragma: no cover
53def _compute_RSS_AZ(X, A, Z):
54 X = np.ascontiguousarray(X)
55 Z = np.ascontiguousarray(Z)
56 A = np.ascontiguousarray(A)
57 diff = X - np.dot(A, Z)
58 return np.sum(diff * diff)
61@jit([float32(float32[:, :], float32[:, :], float32[:, :], float32[:, :])], nopython=True, cache=True, fastmath=True) # pragma: no cover
62def _compute_RSS_ABX(X, A, B, WX):
63 X = np.ascontiguousarray(X)
64 A = np.ascontiguousarray(A)
65 B = np.ascontiguousarray(B)
66 WX = np.ascontiguousarray(WX)
67 diff = WX - np.dot(A, np.dot(B, X))
68 return np.sum(diff * diff)
71def _compute_A_regularized_nnls(
72 X: np.ndarray,
73 Z: np.ndarray,
74 A: np.ndarray | None = None,
75) -> np.ndarray:
76 # huge_constant is added as a new column to account for w norm constraint
77 X_padded = np.hstack([X, (LAMBDA * np.ones(X.shape[0]))[:, None]])
78 Zt_padded = np.vstack([Z.T, LAMBDA * np.ones(Z.shape[0])])
80 # Use non-negative least squares to solve the optimization problem
81 A = np.array([nnls(A=Zt_padded, b=X_padded[n, :], maxiter=5 * Zt_padded.shape[1])[0] for n in range(X.shape[0])])
82 A = A.astype(np.float32)
83 return A
86def _compute_B_regularized_nnls(
87 X: np.ndarray,
88 A: np.ndarray,
89 B: np.ndarray | None = None,
90 WX: np.ndarray = None, # type: ignore[assignment]
91) -> np.ndarray:
92 # works for weighted and unweighted version
93 if WX is None:
94 WX = X
95 else:
96 assert (WX.shape[0] == X.shape[0]) and (WX.shape[1] == X.shape[1])
97 Z = np.linalg.lstsq(a=A, b=WX, rcond=None)[0]
98 Z_padded = np.hstack([Z, (LAMBDA * np.ones(Z.shape[0]))[:, None]])
99 Xt_padded = np.vstack([X.T, LAMBDA * np.ones(X.shape[0])]) # this should actually be precomputed once
100 B = np.array([nnls(A=Xt_padded, b=Z_padded[k, :], maxiter=5 * Xt_padded.shape[1])[0] for k in range(Z.shape[0])])
101 B = B.astype(np.float32)
102 return B
105def _compute_A_projected_gradients(
106 X: np.ndarray,
107 Z: np.ndarray,
108 A: np.ndarray,
109 derivative_max_iter: int | np.int32 = 40,
110 rel_tol_ls: float | np.float32 = 1e-3,
111 rel_tol_conv: float | np.float32 = 1e-4,
112) -> np.ndarray:
113 """Updates the A matrix given the data matrix X and the archetypes Z.
115 A is the matrix that provides the best convex approximation of X by Z.
117 Parameters
118 ----------
119 X : numpy 2d-array
120 Data matrix with shape (n_samples, n_features).
122 Z : numpy 2d-array
123 Archetypes matrix with shape (n_archetypes, n_features).
125 A : numpy 2d-array
126 A matrix with shape (n_samples, n_archetypes).
128 derivative_max_iter: int
129 Maximum number of steps for optimization
131 Returns
132 -------
133 A : numpy 2d-array
134 Updated A matrix with shape (n_samples, n_archetypes).
135 """
136 # check and other things
137 assert (rel_tol_ls >= 0) and (rel_tol_conv >= 0)
139 # ensure correct data type for parameters
140 derivative_max_iter = np.int32(derivative_max_iter)
141 rel_tol_ls = np.float32(rel_tol_ls)
142 rel_tol_conv = np.float32(rel_tol_conv)
144 # optimize A
145 A = _compute_A_projected_gradients_jit(
146 X=X, Z=Z, A=A, derivative_max_iter=derivative_max_iter, rel_tol_ls=rel_tol_ls, rel_tol_conv=rel_tol_conv
147 )
148 return A
151@jit(
152 [float32[:, :](float32[:, :], float32[:, :], float32[:, :], int32, float32, float32)],
153 nopython=True,
154 cache=True,
155 fastmath=True,
156) # pragma: no cover
157def _compute_A_projected_gradients_jit(
158 X: np.ndarray,
159 Z: np.ndarray,
160 A: np.ndarray,
161 derivative_max_iter: np.int32,
162 rel_tol_ls: np.float32,
163 rel_tol_conv: np.float32,
164) -> np.ndarray:
165 # initialize learning rate
166 mu = np.float32(1.0)
167 min_mu = np.float32(1e-6)
169 # explicitly making sure everything is contiguous (C-roder)
170 X = np.ascontiguousarray(X)
171 Z = np.ascontiguousarray(Z)
172 A = np.ascontiguousarray(A)
174 # terms that can be pre-computed
175 XZT = np.dot(X, Z.T)
176 ZZT = np.dot(Z, Z.T)
178 for _ in range(derivative_max_iter):
179 # make sure to multiply things in the right order to keep matrix sizes minimal
180 G = np.float32(2.0) * (np.dot(A, ZZT) - XZT) # G has shape N x K
181 G = G - np.sum(A * G, axis=1)[:, None] # chain rule of projection
183 # line search for optimal step size
184 prev_RSS = _compute_RSS_AZ(X=X, A=A, Z=Z)
185 prev_A = A
186 for _ in range(20):
187 A = (prev_A - mu * G).clip(0)
188 A = A / (np.sum(A, axis=1)[:, None] + np.finfo(np.float32).eps) # Avoid division by zero
189 RSS = _compute_RSS_AZ(X=X, A=A, Z=Z)
190 if RSS <= (prev_RSS * (1 + rel_tol_ls)):
191 mu *= np.float32(1.2)
192 break
193 else:
194 mu *= np.float32(0.5)
196 if mu < min_mu:
197 # Use the current A (even if not optimal) or revert to prev_A
198 # depending on which gives better RSS
199 if RSS > prev_RSS:
200 A = prev_A
201 break
203 # check for convergence
204 if (np.abs(prev_RSS - RSS) / prev_RSS) < rel_tol_conv:
205 break
207 A = np.ascontiguousarray(A) # TODO: check if we need this line
208 return A
211def _compute_B_projected_gradients(
212 X: np.ndarray,
213 A: np.ndarray,
214 B: np.ndarray,
215 WX: np.ndarray = None, # type: ignore[assignment]
216 derivative_max_iter: int | np.int32 = 40,
217 rel_tol_ls: float | np.float32 = 1e-3,
218 rel_tol_conv: float | np.float32 = 1e-4,
219) -> np.ndarray:
220 """Updates the B matrix given the data matrix X and the A matrix.
222 Parameters
223 ----------
224 X : numpy 2d-array
225 Data matrix with shape (n_samples, n_features).
227 A : numpy 2d-array
228 A matrix with shape (n_samples, n_archetypes).
230 B : numpy 2d-array
231 B matrix with shape (n_archetypes, n_samples).
233 WX : numpy 2d-array, optional
234 Weighted data matrix with shape (n_samples, n_features).
236 derivative_max_iter: int
237 Maximum number of steps for optimization
239 Returns
240 -------
241 B : numpy 2d-array
242 Updated B matrix with shape (n_archetypes, n_samples).
243 """
244 # checks
245 assert (rel_tol_ls >= 0) and (rel_tol_conv >= 0)
247 # ensure correct data type for parameters
248 derivative_max_iter = np.int32(derivative_max_iter)
249 rel_tol_ls = np.float32(rel_tol_ls)
250 rel_tol_conv = np.float32(rel_tol_conv)
252 # works for weighted and unweighted version
253 if WX is None:
254 WX = X
255 else:
256 assert (WX.shape[0] == X.shape[0]) and (WX.shape[1] == X.shape[1])
258 # optimize B
259 B = _compute_B_projected_gradients_jit(
260 X=X, A=A, B=B, WX=WX, derivative_max_iter=derivative_max_iter, rel_tol_ls=rel_tol_ls, rel_tol_conv=rel_tol_conv
261 )
262 return B
265@jit(
266 [float32[:, :](float32[:, :], float32[:, :], float32[:, :], float32[:, :], int32, float32, float32)],
267 nopython=True,
268 cache=True,
269 fastmath=True,
270) # pragma: no cover
271def _compute_B_projected_gradients_jit(
272 X: np.ndarray,
273 A: np.ndarray,
274 B: np.ndarray,
275 WX: np.ndarray,
276 derivative_max_iter: np.int32,
277 rel_tol_ls: np.float32,
278 rel_tol_conv: np.float32,
279) -> np.ndarray:
280 # initialize learning rates
281 mu = np.float32(1.0)
282 min_mu = np.float32(1e-6)
284 # explicitly making sure everything is contiguous (C-order)
285 X = np.ascontiguousarray(X)
286 A = np.ascontiguousarray(A)
287 B = np.ascontiguousarray(B)
288 WX = np.ascontiguousarray(WX)
290 # terms that can be pre-computed
291 AT_WX_XT = np.dot(np.dot(A.T, WX), X.T)
292 AT_A = np.dot(A.T, A)
294 for _ in range(derivative_max_iter):
295 # make sure to multiply things in the right order to keep matrix sizes minimal
296 G = np.float32(2.0) * (np.dot(np.dot(AT_A, np.dot(B, X)), X.T) - AT_WX_XT) # G has shape K x N
297 G = G - np.sum(B * G, axis=1)[:, None] # chain rule of projection
299 # line search for optimal step size
300 prev_RSS = _compute_RSS_ABX(X=X, A=A, B=B, WX=WX)
301 prev_B = B
302 for _ in range(20):
303 B = (prev_B - mu * G).clip(0)
304 B = B / (np.sum(B, axis=1)[:, None] + np.finfo(np.float32).eps) # avoid division by zero
305 RSS = _compute_RSS_ABX(X=X, A=A, B=B, WX=WX)
306 if RSS <= (prev_RSS * (1 + rel_tol_ls)):
307 mu *= np.float32(1.2)
308 break
309 else:
310 mu *= np.float32(0.5)
312 if mu < min_mu:
313 # Use the current B (even if not optimal) or revert to prev_B
314 # depending on which gives better RSS
315 if RSS > prev_RSS:
316 B = prev_B
317 break
319 # check for convergence
320 if (np.abs(prev_RSS - RSS) / prev_RSS) < rel_tol_conv:
321 break
323 B = np.ascontiguousarray(B) # TODO: check if we need this line
324 return B
327@jit(nopython=True, cache=True, fastmath=True) # pragma: no cover
328def _add_argmins_per_row(mtx, argmins, mu):
329 for idx in range(len(argmins)):
330 mtx[idx, argmins[idx]] += mu
331 return mtx
334# NOTE: can lead to kernel crashed in Jupyter notebooks...
335@jit(nopython=True, cache=True, parallel=True) # pragma: no cover
336def _add_argmins_per_row_p(mtx, argmins, mu):
337 for idx in prange(len(argmins)):
338 mtx[idx, argmins[idx]] += mu
339 return mtx
342def _compute_A_frank_wolfe(
343 X: np.ndarray,
344 Z: np.ndarray,
345 A: np.ndarray,
346 derivative_max_iter: int | np.int32 = 80,
347 rel_tol_ls: float | np.float32 = 1e-3,
348 rel_tol_conv: float | np.float32 = 1e-4,
349) -> np.ndarray:
350 # check and other things
351 assert (rel_tol_ls >= 0) and (rel_tol_conv >= 0)
353 # ensure correct data type for parameters
354 derivative_max_iter = np.int32(derivative_max_iter)
355 rel_tol_ls = np.float32(rel_tol_ls)
356 rel_tol_conv = np.float32(rel_tol_conv)
358 # optimize A
359 A = _compute_A_frank_wolfe_jit(
360 X=X, Z=Z, A=A, derivative_max_iter=derivative_max_iter, rel_tol_ls=rel_tol_ls, rel_tol_conv=rel_tol_conv
361 )
362 return A
365@jit(
366 [float32[:, :](float32[:, :], float32[:, :], float32[:, :], int32, float32, float32)],
367 nopython=True,
368 cache=True,
369 fastmath=True,
370) # pragma: no cover
371def _compute_A_frank_wolfe_jit(
372 X: np.ndarray,
373 Z: np.ndarray,
374 A: np.ndarray,
375 derivative_max_iter: np.int32,
376 rel_tol_ls: np.float32,
377 rel_tol_conv: np.float32,
378) -> np.ndarray:
379 # explicitly making sure everything is contiguous (C-roder)
380 X = np.ascontiguousarray(X)
381 Z = np.ascontiguousarray(Z)
382 A = np.ascontiguousarray(A)
384 # terms that can be pre-computed
385 XZT = np.dot(X, Z.T)
386 ZZT = np.dot(Z, Z.T)
388 for iter in range(derivative_max_iter):
389 # frank wolfe step size
390 mu = np.float32(2 / (iter + 2))
392 # compute the gradient
393 G = np.float32(2.0) * (np.dot(A, ZZT) - XZT) # G has shape N x K
395 # for each sample, get the archetype column with the most negative gradient
396 argmins = np.argmin(G, axis=1)
398 # line search
399 prev_RSS = _compute_RSS_AZ(X=X, A=A, Z=Z)
400 prev_A = A
401 for _ in range(20):
402 A = (np.float32(1.0) - mu) * prev_A
403 A = _add_argmins_per_row(mtx=A, argmins=argmins, mu=mu)
404 RSS = _compute_RSS_AZ(X=X, A=A, Z=Z)
405 if RSS <= (prev_RSS * (1 + rel_tol_ls)):
406 mu *= np.float32(1.2)
407 break
408 else:
409 mu *= np.float32(0.5)
411 # check for convergence
412 if (np.abs(prev_RSS - RSS) / prev_RSS) < rel_tol_conv:
413 break
415 return A
418def _compute_B_frank_wolfe(
419 X: np.ndarray,
420 A: np.ndarray,
421 B: np.ndarray,
422 WX: np.ndarray = None, # type: ignore[assignment]
423 derivative_max_iter: int | np.int32 = 80,
424 rel_tol_ls: float | np.float32 = 1e-3,
425 rel_tol_conv: float | np.float32 = 1e-4,
426) -> np.ndarray:
427 # checks
428 assert (rel_tol_ls >= 0) and (rel_tol_conv >= 0)
430 # ensure correct data type for parameters
431 derivative_max_iter = np.int32(derivative_max_iter)
432 rel_tol_ls = np.float32(rel_tol_ls)
433 rel_tol_conv = np.float32(rel_tol_conv)
435 # works for weighted and unweighted version
436 if WX is None:
437 WX = X
438 else:
439 assert (WX.shape[0] == X.shape[0]) and (WX.shape[1] == X.shape[1])
441 # optimize B
442 B = _compute_B_frank_wolfe_jit(
443 X=X, A=A, B=B, WX=WX, derivative_max_iter=derivative_max_iter, rel_tol_ls=rel_tol_ls, rel_tol_conv=rel_tol_conv
444 )
445 return B
448@jit(
449 [float32[:, :](float32[:, :], float32[:, :], float32[:, :], float32[:, :], int32, float32, float32)],
450 nopython=True,
451 cache=True,
452 fastmath=True,
453) # pragma: no cover
454def _compute_B_frank_wolfe_jit(
455 X: np.ndarray,
456 A: np.ndarray,
457 B: np.ndarray,
458 WX: np.ndarray,
459 derivative_max_iter: np.int32,
460 rel_tol_ls: np.float32,
461 rel_tol_conv: np.float32,
462) -> np.ndarray:
463 # explicitly making sure everything is contiguous (C-order)
464 X = np.ascontiguousarray(X)
465 A = np.ascontiguousarray(A)
466 B = np.ascontiguousarray(B)
467 WX = np.ascontiguousarray(WX)
469 # terms that can be pre-computed
470 AT_WX_XT = np.dot(np.dot(A.T, WX), X.T)
471 AT_A = np.dot(A.T, A)
473 for iter in range(derivative_max_iter):
474 # frank wolfe step size
475 mu = np.float32(2 / (iter + 2))
477 # compute the gradient
478 G = np.float32(2.0) * (np.dot(np.dot(AT_A, np.dot(B, X)), X.T) - AT_WX_XT) # G has shape K x N
480 # for each archetype, get the sample column with the most negative gradient
481 argmins = np.argmin(G, axis=1)
483 # line search for optimal step size
484 prev_RSS = _compute_RSS_ABX(X=X, A=A, B=B, WX=WX)
485 prev_B = B
486 for _ in range(20):
487 B = (np.float32(1.0) - mu) * prev_B
488 B = _add_argmins_per_row(mtx=B, argmins=argmins, mu=mu)
489 RSS = _compute_RSS_ABX(X=X, A=A, B=B, WX=WX)
490 if RSS <= (prev_RSS * (1 + rel_tol_ls)):
491 mu *= np.float32(1.2)
492 break
493 else:
494 mu *= np.float32(0.5)
496 # check for convergence
497 if (np.abs(prev_RSS - RSS) / prev_RSS) < rel_tol_conv:
498 break
500 return B