Coverage for suppy\superiorization\_standard_sup.py: 85%

150 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2026-05-08 13:56 +0200

1"""Normal superiorization algorithm.""" 

2from typing import List, Callable 

3import numpy as np 

4import time 

5import numpy.typing as npt 

6from suppy.utils import ensure_float_array 

7from suppy.perturbations import Perturbation 

8from ._sup import FeasibilityPerturbation 

9from suppy.projections import Projection 

10 

11try: 

12 import cupy as cp 

13 

14 NO_GPU = False 

15except ImportError: 

16 cp = np 

17 NO_GPU = True 

18 

19 

20class Superiorization(FeasibilityPerturbation): 

21 """ 

22 Superiorization algorithm for constrained optimization problems. 

23 

24 Parameters 

25 ---------- 

26 basic : Callable 

27 The underlying feasibility seeking algorithm. 

28 perturbation_scheme : Perturbation 

29 The perturbation scheme to be used for superiorization. 

30 

31 Attributes 

32 ---------- 

33 basic : Callable 

34 The underlying feasibility seeking algorithm. 

35 perturbation_scheme : Perturbation 

36 The perturbation scheme to be used for superiorization. 

37 objective_tol : float 

38 Tolerance for the objective function value change to determine stopping criteria. 

39 prox_tol : float 

40 Tolerance for the constraint proximity value change to determine stopping criteria. 

41 _k : int 

42 The current iteration number. 

43 all_x : list | None 

44 List of all points achieved during the optimization process, only stored if requested by the user. 

45 all_function_values : list | None 

46 List of all objective function values achieved during the optimization process, only stored if requested by the user. 

47 all_x_function_reduction : list | None 

48 List of all points achieved via the function reduction step, only stored if requested by the user. 

49 all_function_values_function_reduction : list | None 

50 List of all objective function values achieved via the function reduction step, only stored if requested by the user. 

51 all_x_basic : list | None 

52 List of all points achieved via the basic feasibility seeking algorithm, only stored if requested by the user. 

53 all_function_values_basic : list | None 

54 List of all objective function values achieved via the basic feasibility seeking algorithm, only stored if requested by the user. 

55 """ 

56 

57 def __init__( 

58 self, 

59 basic: Projection, 

60 perturbation_scheme: Perturbation, 

61 ): 

62 

63 super().__init__(basic) 

64 self.perturbation_scheme = perturbation_scheme 

65 

66 # initialize some variables for the algorithms 

67 self._k = 0 

68 self.t = [] 

69 self.t_it = [] 

70 

71 # initialize some arrays for storing of results 

72 self.all_x = [] 

73 self.all_function_values = [] # array storing all objective function values 

74 self.proximities = [] # array storing all proximity function values 

75 

76 self.all_x_function_reduction = [] 

77 self.all_function_values_function_reduction = [] 

78 self.proximities_function_reduction = [] 

79 

80 self.all_x_basic = [] 

81 self.all_function_values_basic = [] 

82 self.proximities_basic = [] 

83 

84 @ensure_float_array 

85 def solve( 

86 self, 

87 x_0: npt.NDArray, 

88 max_iter: int = 10, 

89 prox_tol: float = 1e-6, 

90 del_prox_tol: float = 1e-8, 

91 del_prox_n: int = 5, 

92 del_objective_tol: float = 1e-6, 

93 del_objective_n: int = 5, 

94 proximity_measures: List | None = None, 

95 storage: bool = False, 

96 storage_iters: List[int] | int | None = None, 

97 alternative_stopping_criterion: Callable | None = None, 

98 alternative_stopping_criterion_initial_call: Callable | None = None, 

99 ) -> np.ndarray: 

100 """ 

101 Solve the optimization problem using the superiorization method. 

102 

103 Parameters 

104 ---------- 

105 x_0 : npt.NDArray 

106 Initial guess for the solution. 

107 max_iter : int, optional 

108 Maximum number of iterations to perform (default is 10). 

109 prox_tol : float, optional 

110 Tolerance for the constraint function value to determine stopping criteria, by default 1e-6. 

111 del_prox_tol : float, optional 

112 Tolerance for the proximity function value change to determine stopping criteria, by default 1e-8. 

113 del_prox_n : int, optional 

114 Number of iterations with proximity changes below the threshold to determine stopping criteria, by default 5. 

115 proximity_measures : List, optional 

116 The proximity measures to calculate, by default None. Right now only the first in the list is used to check the feasibility. 

117 del_objective_tol : float, optional 

118 Tolernace for the objective function value to determine stopping criteria, by default 1e-6. 

119 del_objective_n : int, optional 

120 Number of iterations with objective function changes below the threshold to determine stopping criteria, by default 5. 

121 storage : bool, optional 

122 If True, store intermediate results (default is False). 

123 storage_iters : List[int] or int, optional 

124 Controls which iterations are stored (when storage=True). If None, all iterations are stored. 

125 If a list of ints, only those iteration indices are stored (0 = initial point). 

126 If an int, storage occurs every that many iterations. 

127 alternative_stopping_criterion : callable, optional 

128 Alternative stopping criterion 

129 alternative_stopping_criterion_initial_call : callable, optional 

130 Initial call for an alternative stopping criterion 

131 

132 Returns 

133 ------- 

134 npt.NDArray 

135 The superiorized solution. 

136 """ 

137 

138 def _should_store(idx): 

139 if storage_iters is None: 

140 return True 

141 if isinstance(storage_iters, int): 

142 return idx % storage_iters == 0 

143 return idx in storage_iters 

144 

145 if proximity_measures is None: 

146 proximity_measures = [("p_norm", 2)] 

147 else: 

148 # TODO: check that proximity measures are valid 

149 _ = None 

150 # initialization of variables 

151 

152 self.perturbation_scheme.reset() # reset the perturbation scheme 

153 

154 self._n_tol_objective = 0 

155 self._n_tol_prox = 0 

156 

157 x = x_0 

158 

159 # initial function and proximity values 

160 

161 self._initial_storage( 

162 x, 

163 storage and _should_store(0), 

164 self.perturbation_scheme.func(x_0), 

165 self.basic.proximity(x_0, proximity_measures), 

166 ) 

167 

168 self._k = 0 # reset counter if necessary 

169 

170 if alternative_stopping_criterion_initial_call is not None: 

171 stop = alternative_stopping_criterion_initial_call(x, self) 

172 else: 

173 stop = False # criterion for stopping the algorithm 

174 

175 self.t.append(0) 

176 t_current = time.time() 

177 t_init = t_current 

178 

179 while self._k < max_iter and not stop: 

180 self.perturbation_scheme.pre_step( 

181 x, 

182 last_proximity=self.proximities[-1], 

183 last_proximity_basic=self.proximities_basic[-1], 

184 last_proximity_function_reduction=self.proximities_function_reduction[-1], 

185 last_function_value=self.all_function_values[-1], 

186 last_function_value_basic=self.all_function_values_basic[-1], 

187 ) # perform any pre-step actions of the perturbation scheme 

188 

189 # perform the perturbation schemes update step 

190 x = self.perturbation_scheme.perturbation_step(x) 

191 self.storage( 

192 x, 

193 kind="function_reduction", 

194 storage=storage and _should_store(self._k + 1), 

195 f=self.perturbation_scheme.func(x), 

196 p=self.basic.proximity(x, proximity_measures), 

197 ) 

198 

199 self.perturbation_scheme.post_step( 

200 x, 

201 last_proximity=self.proximities[-1], 

202 last_proximity_basic=self.proximities_basic[-1], 

203 last_proximity_function_reduction=self.proximities_function_reduction[-1], 

204 last_function_value=self.all_function_values[-1], 

205 last_function_value_basic=self.all_function_values_basic[-1], 

206 ) # perform any post-step actions of the perturbation scheme 

207 

208 # perform basic step 

209 x = self.basic.step(x) 

210 

211 # check current function and proximity values 

212 

213 self.storage( 

214 x, 

215 kind="basic", 

216 storage=storage and _should_store(self._k + 1), 

217 f=self.perturbation_scheme.func(x), 

218 p=self.basic.proximity(x, proximity_measures), 

219 ) 

220 

221 self._k += 1 

222 

223 # enable different stopping criteria for different superiorization algorithms 

224 if alternative_stopping_criterion is not None: 

225 stop = alternative_stopping_criterion(x, self) 

226 else: 

227 stop = self._stopping_criterion( 

228 del_objective_tol, del_objective_n, prox_tol, del_prox_tol, del_prox_n 

229 ) 

230 

231 self._additional_action(x) 

232 self.t.append(time.time() - t_init) 

233 self.t_it.append(time.time() - t_current) 

234 t_current = time.time() 

235 self._post_step(x) 

236 

237 return x 

238 

239 def _stopping_criterion( 

240 self, 

241 del_objective_tol: float, 

242 del_objective_n: int, 

243 prox_tol: float, 

244 del_prox_tol: float, 

245 del_prox_n: int, 

246 ) -> bool: 

247 """ 

248 Determine if the stopping criterion for the optimization process are 

249 met. 

250 

251 Parameters 

252 ---------- 

253 del_objective_tol : float, optional 

254 Tolernace for the objective function value to determine stopping criteria, by default 1e-6. 

255 del_objective_n : int, optional 

256 Number of iterations with objective function changes below the threshold to determine stopping criteria, by default 5. 

257 prox_tol : float, optional 

258 Tolerance for the constraint function value to determine stopping criteria, by default 1e-6. 

259 del_prox_tol : float, optional 

260 Tolerance for the proximity function value change to determine stopping criteria, by default 1e-8. 

261 del_prox_n : int, optional 

262 Number of iterations with proximity changes below the threshold to determine stopping criteria, by default 5. 

263 

264 Returns 

265 ------- 

266 bool 

267 True if the stopping criteria are met, False otherwise. 

268 """ 

269 

270 stop_objective = False # variable to check if the objective function criteria is met 

271 # check objective function criteria f_(k+1)-f_k<= (delta f)* is met in last n_tol_objective iterations 

272 if ( 

273 abs(self.all_function_values[-3] - self.all_function_values[-1]) 

274 / max(1, self.all_function_values[-3]) 

275 < del_objective_tol 

276 ): 

277 self._n_tol_objective += 1 

278 else: 

279 self._n_tol_objective = 0 

280 if ( 

281 self._n_tol_objective >= del_objective_n 

282 ): # n objective function changes below threshold 

283 stop_objective = True 

284 

285 stop_prox = False # variable to check if the proximity criteria is met 

286 # check if proximity values are below the threshold 

287 if self.proximities[-1][0] < prox_tol: # proximity below goal/tolerance 

288 stop_prox = True 

289 

290 # check if the proximity changes are below tolerance level 

291 if ( 

292 abs(self.proximities[-3][0] - self.proximities[-1][0]) 

293 / max(1, self.proximities[-3][0]) 

294 < del_prox_tol 

295 ): 

296 self._n_tol_prox += 1 

297 else: 

298 self._n_tol_prox = 0 

299 if self._n_tol_prox >= del_prox_n: # n proximity changes below threshold 

300 stop_prox = True 

301 

302 # check if both criteria are met 

303 return stop_objective and stop_prox 

304 

305 def _additional_action(self, x: npt.NDArray): 

306 """ 

307 Perform an additional action on the input, in case it is needed. 

308 

309 Parameters 

310 ---------- 

311 x : npt.NDArray 

312 The current iterate 

313 

314 Returns 

315 ------- 

316 None 

317 """ 

318 

319 def _initial_storage(self, x, storage, f, p): 

320 """ 

321 Initializes storage for objective values and appends initial values. 

322 

323 Parameters 

324 ---------- 

325 x : array-like 

326 Initial values of the variables. 

327 f : array-like 

328 Initial values of the objective function. 

329 p : array-like 

330 Proximity function value 

331 """ 

332 # reset objective values 

333 self.all_x = [] 

334 self.all_function_values = [] # array storing all objective function values 

335 self.proximities = [] # array storing all proximity function values 

336 

337 self.all_x_function_reduction = [] 

338 self.all_function_values_function_reduction = [] 

339 self.proximities_function_reduction = [] 

340 

341 self.all_x_basic = [] 

342 self.all_function_values_basic = [] 

343 self.proximities_basic = [] 

344 

345 # append initial values 

346 self.all_function_values.append(f) 

347 self.all_function_values_basic.append(f) 

348 self.all_function_values_function_reduction.append(f) 

349 

350 self.proximities.append(p) 

351 self.proximities_basic.append(p) 

352 self.proximities_function_reduction.append(p) 

353 

354 if storage and isinstance(x, np.ndarray): 

355 self.all_x.append(x.copy()) 

356 self.all_x_basic.append(x.copy()) 

357 self.all_x_function_reduction.append(x.copy()) 

358 

359 elif storage: 

360 self.all_x.append(x.get()) 

361 self.all_x_basic.append(x.get()) 

362 self.all_x_function_reduction.append(x.get()) 

363 

364 def storage( 

365 self, 

366 x: npt.NDArray, 

367 kind: str, 

368 storage: bool = True, 

369 f: float | None = None, 

370 p: list | None = None, 

371 ): 

372 """ 

373 Stores the given values of x and f into the corresponding lists. 

374 

375 Parameters 

376 ---------- 

377 x : npt.NDArray 

378 The current value of the variable x to be stored. 

379 kind : str 

380 The type of storage to be used, either "function_reduction" or "basic". 

381 storage : bool, optional 

382 If True, store the values of x 

383 """ 

384 

385 # always store all function and proximity values 

386 self.all_function_values.append(f) 

387 self.proximities.append(p) 

388 if storage and isinstance(x, np.ndarray): 

389 self.all_x.append(x.copy()) 

390 elif storage: 

391 self.all_x.append((x.get())) 

392 

393 if kind == "function_reduction": 

394 self.all_function_values_function_reduction.append(f) 

395 self.proximities_function_reduction.append(p) 

396 

397 if storage and isinstance(x, np.ndarray): 

398 self.all_x_function_reduction.append(x.copy()) 

399 elif storage: 

400 self.all_x_function_reduction.append((x.get())) 

401 

402 elif kind == "basic": 

403 self.all_function_values_basic.append(f) 

404 self.proximities_basic.append(p) 

405 

406 if storage and isinstance(x, np.ndarray): 

407 self.all_x_basic.append(x.copy()) 

408 elif storage: 

409 self.all_x_basic.append((x.get())) 

410 

411 else: 

412 raise ValueError("Invalid storage type. Use 'function_reduction' or 'basic'.") 

413 

414 def _post_step(self, x: npt.NDArray): 

415 """ 

416 Perform an action after the optimization process has finished. 

417 

418 Parameters 

419 ---------- 

420 x : array-like 

421 The current value of the variable x. 

422 

423 Returns 

424 ------- 

425 None 

426 """ 

427 

428 self.all_x = np.array(self.all_x) 

429 self.all_x_function_reduction = np.array(self.all_x_function_reduction) 

430 self.all_x_basic = np.array(self.all_x_basic) 

431 

432 if isinstance(x, np.ndarray): 

433 self.all_function_values = np.array(self.all_function_values) 

434 self.all_function_values_function_reduction = np.array( 

435 self.all_function_values_function_reduction 

436 ) 

437 self.all_function_values_basic = np.array(self.all_function_values_basic) 

438 self.proximities = np.array(self.proximities) 

439 self.proximities_function_reduction = np.array(self.proximities_function_reduction) 

440 self.proximities_basic = np.array(self.proximities_basic) 

441 else: 

442 self.all_function_values = np.array([el.get() for el in self.all_function_values]) 

443 self.all_function_values_function_reduction = np.array( 

444 [el.get() for el in self.all_function_values_function_reduction] 

445 ) 

446 self.all_function_values_basic = np.array( 

447 [el.get() for el in self.all_function_values_basic] 

448 ) 

449 self.proximities = np.array([el.get() for el in self.proximities]) 

450 self.proximities_function_reduction = np.array( 

451 [el.get() for el in self.proximities_function_reduction] 

452 ) 

453 self.proximities_basic = np.array([el.get() for el in self.proximities_basic])