Coverage for suppy\utils\rt_objectives.py: 0%

127 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2026-05-08 13:56 +0200

1## helper functions for radiotherapy 

2 

3import numpy as np 

4import numpy.typing as npt 

5import scipy 

6 

7from Testing._ignore.aaaaa import grad 

8 

9try: 

10 import cupy as cp 

11 from cupyx.scipy.sparse import isspmatrix 

12 

13 NO_GPU = False 

14 

15except ImportError: 

16 NO_GPU = True 

17 cp = np 

18 

19 

20class SquaredDeviation: 

21 """Mean squared deviation of dose from a reference dose level. 

22 

23 Penalizes both over- and under-dosing symmetrically: 

24 

25 f(d) = (1/N) * sum_{i in idxs} (d_i - d_ref)^2 

26 

27 Parameters 

28 ---------- 

29 d_ref : float 

30 Reference dose value. 

31 idxs : list or npt.ArrayLike 

32 Voxel indices (boolean mask or integer array) over which the 

33 objective is evaluated. 

34 """ 

35 

36 def __init__(self, d_ref: float, idxs: list): 

37 self.d_ref = d_ref 

38 self.idxs = idxs 

39 self.length = self.idxs.sum() if self.idxs.dtype == bool else len(self.idxs) 

40 

41 def objective_value(self, x: npt.ArrayLike) -> float: 

42 """Return the mean squared deviation at dose vector *x*.""" 

43 diff = x[self.idxs] - self.d_ref 

44 return 1 / self.length * (diff @ diff) 

45 

46 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike: 

47 """Return the gradient of the objective with respect to *x*.""" 

48 xp = cp if isinstance(x, cp.ndarray) else np 

49 grad = xp.zeros(x.shape[0]) 

50 grad[self.idxs] = 2 * (x[self.idxs] - self.d_ref) 

51 return 1 / self.length * grad 

52 

53 

54class SquaredOverdosing: 

55 """Mean squared overdosing penalty. 

56 

57 Only penalizes voxels that receive more dose than the reference: 

58 

59 f(d) = (1/N) * sum_{i in idxs} max(d_i - d_ref, 0)^2 

60 

61 Parameters 

62 ---------- 

63 d_ref : float 

64 Reference dose value. 

65 idxs : list or npt.ArrayLike 

66 Voxel indices (boolean mask or integer array) over which the 

67 objective is evaluated. 

68 """ 

69 

70 def __init__(self, d_ref: float, idxs: list): 

71 self.d_ref = d_ref 

72 self.idxs = idxs 

73 self.length = self.idxs.sum() if self.idxs.dtype == bool else len(self.idxs) 

74 

75 def objective_value(self, x: npt.ArrayLike) -> float: 

76 """Return the mean squared overdosing at dose vector *x*.""" 

77 d_diff = x[self.idxs] - self.d_ref 

78 d_diff[d_diff < 0] = 0 

79 return 1 / self.length * (d_diff @ d_diff) 

80 

81 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike: 

82 """Return the gradient of the objective with respect to *x*.""" 

83 xp = cp if isinstance(x, cp.ndarray) else np 

84 grad = xp.zeros(x.shape[0]) 

85 d_diff = x[self.idxs] - self.d_ref 

86 d_diff[d_diff < 0] = 0 

87 grad[self.idxs] = 2 * d_diff 

88 return 1 / self.length * grad 

89 

90 

91class EUD: 

92 """Generalized Equivalent Uniform Dose (gEUD). 

93 

94 Computes the power-mean dose over the selected voxels: 

95 

96 f(d) = (1/N * sum_{i in idxs} d_i^a)^(1/a) 

97 

98 For a > 1 the metric is sensitive to hot spots (OAR use); for a < 1 it 

99 is sensitive to cold spots (target use). 

100 

101 Parameters 

102 ---------- 

103 exponent : float 

104 Power-law exponent *a*. 

105 idxs : list or npt.ArrayLike 

106 Voxel indices (boolean mask or integer array) over which the 

107 objective is evaluated. 

108 """ 

109 

110 def __init__(self, exponent: float, idxs: list): 

111 self.exponent = exponent 

112 self.idxs = idxs 

113 self.length = self.idxs.sum() if self.idxs.dtype == bool else len(self.idxs) 

114 

115 def objective_value(self, x: npt.ArrayLike) -> float: 

116 """Return the gEUD at dose vector *x*.""" 

117 return (1 / self.length * ((x[self.idxs] ** self.exponent).sum())) ** (1 / self.exponent) 

118 

119 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike: 

120 """Return the gradient of the objective with respect to *x*.""" 

121 xp = cp if isinstance(x, cp.ndarray) else np 

122 grad = xp.zeros(x.shape[0]) 

123 

124 if x @ x == 0: 

125 return grad 

126 grad[self.idxs] = ( 

127 ((x[self.idxs] ** self.exponent).sum()) ** (1 / self.exponent - 1) 

128 * (x[self.idxs] ** (self.exponent - 1)) 

129 / self.length ** (1 / self.exponent) 

130 ) 

131 return grad 

132 

133 

134class MeanDose: 

135 """Mean dose over a set of voxels. 

136 

137 f(d) = (1/N) * sum_{i in idxs} d_i 

138 

139 Parameters 

140 ---------- 

141 idxs : list or npt.ArrayLike 

142 Voxel indices (boolean mask or integer array) over which the 

143 objective is evaluated. 

144 """ 

145 

146 def __init__(self, idxs: list): 

147 self.idxs = idxs 

148 self.length = self.idxs.sum() if self.idxs.dtype == bool else len(self.idxs) 

149 

150 def objective_value(self, x: npt.ArrayLike) -> float: 

151 """Return the mean dose at dose vector *x*.""" 

152 return 1 / self.length * ((x[self.idxs]).sum()) 

153 

154 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike: 

155 """Return the gradient of the objective with respect to *x*.""" 

156 xp = cp if isinstance(x, cp.ndarray) else np 

157 grad = xp.zeros(x.shape[0]) 

158 grad[self.idxs] = 1 / self.length * np.ones(self.length) 

159 return grad 

160 

161 

162class MaxDVH: 

163 """Dose-volume histogram (DVH) objective penalizing overdosing. 

164 

165 Enforces that at most *max_v_percent* % of the voxel volume receives 

166 a dose exceeding *d_ref*. Voxels with dose in [d_ref, d_max] are 

167 penalized, where d_max is the (100 - max_v_percent)-th dose percentile: 

168 

169 f(d) = (1/N) * sum_{i: d_ref <= d_i <= d_max} (d_i - d_ref)^2 

170 

171 Parameters 

172 ---------- 

173 d_ref : float 

174 Reference dose value; doses above this threshold are penalized. 

175 max_v_percent : float 

176 Maximum allowable percentage of volume receiving dose above *d_ref*. 

177 idxs : list or npt.ArrayLike 

178 Voxel indices (boolean mask or integer array) over which the 

179 objective is evaluated. 

180 """ 

181 

182 def __init__(self, d_ref: float, max_v_percent: float, idxs: list): 

183 self.d_ref = d_ref 

184 self.max_v_percent = max_v_percent 

185 self.idxs = idxs 

186 self.length = self.idxs.sum() if self.idxs.dtype == bool else len(self.idxs) 

187 

188 def objective_value(self, x: npt.ArrayLike) -> float: 

189 """Return the MaxDVH objective value at dose vector *x*.""" 

190 xp = cp if isinstance(x, cp.ndarray) else np 

191 d = x[self.idxs] 

192 d_diff = d - self.d_ref 

193 

194 # calculate the D_max_percent value 

195 d_max = xp.percentile( 

196 d, 100 - self.max_v_percent 

197 ) # , method = 'interpolated_inverted_cdf') 

198 d_diff[(d < self.d_ref) | (d > d_max)] = 0 

199 return 1 / self.length * (d_diff @ d_diff) 

200 

201 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike: 

202 """Return the gradient of the objective with respect to *x*.""" 

203 xp = cp if isinstance(x, cp.ndarray) else np 

204 grad = xp.zeros(x.shape[0]) 

205 d = x[self.idxs] 

206 d_diff = d - self.d_ref 

207 

208 d_max = xp.percentile( 

209 d, 100 - self.max_v_percent 

210 ) # , method = 'interpolated_inverted_cdf') 

211 d_diff[(d < self.d_ref) | (d > d_max)] = 0 

212 grad[self.idxs] = (2 / self.length) * d_diff 

213 

214 return grad 

215 

216 

217class MinDVH: 

218 """Dose-volume histogram (DVH) objective penalizing underdosing. 

219 

220 Enforces that at least *min_v_percent* % of the voxel volume receives 

221 a dose of at least *d_ref*. Voxels with dose in [d_min, d_ref] are 

222 penalized, where d_min is the *min_v_percent*-th dose percentile: 

223 

224 f(d) = (1/N) * sum_{i: d_min <= d_i <= d_ref} (d_i - d_ref)^2 

225 

226 Parameters 

227 ---------- 

228 d_ref : float 

229 Reference dose value; doses below this threshold are penalized. 

230 min_v_percent : float 

231 Minimum required percentage of volume receiving dose at least *d_ref*. 

232 idxs : list or npt.ArrayLike 

233 Voxel indices (boolean mask or integer array) over which the 

234 objective is evaluated. 

235 """ 

236 

237 def __init__(self, d_ref: float, min_v_percent: float, idxs: list): 

238 self.d_ref = d_ref 

239 self.min_v_percent = min_v_percent 

240 self.idxs = idxs 

241 self.length = self.idxs.sum() if self.idxs.dtype == bool else len(self.idxs) 

242 

243 def objective_value(self, x: npt.ArrayLike) -> float: 

244 """Return the MinDVH objective value at dose vector *x*.""" 

245 xp = cp if isinstance(x, cp.ndarray) else np 

246 d = x[self.idxs] 

247 d_diff = d - self.d_ref 

248 

249 # calculate the D_min_percent value 

250 d_min = xp.percentile(d, self.min_v_percent) # , method = 'interpolated_inverted_cdf') 

251 d_diff[(d > self.d_ref) | (d < d_min)] = 0 

252 return 1 / self.length * (d_diff @ d_diff) 

253 

254 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike: 

255 """Return the gradient of the objective with respect to *x*.""" 

256 xp = cp if isinstance(x, cp.ndarray) else np 

257 grad = xp.zeros(x.shape[0]) 

258 d = x[self.idxs] 

259 d_diff = d - self.d_ref 

260 

261 d_min = xp.percentile(d, self.min_v_percent) # , method = 'interpolated_inverted_cdf') 

262 d_diff[(d > self.d_ref) | (d < d_min)] = 0 

263 grad[self.idxs] = (2 / self.length) * d_diff 

264 

265 return grad 

266 

267 

268class objectives: 

269 """Weighted sum of radiotherapy dose objectives. 

270 

271 Combines multiple individual objective functions with penalty weights and 

272 applies the dose-influence matrix *dij* to map fluence *x* to dose *d*: 

273 

274 F(x) = sum_k penalty_k * f_k(dij @ x) 

275 

276 Parameters 

277 ---------- 

278 objectives : list 

279 List of objective instances (e.g. :class:`SquaredDeviation`, 

280 :class:`EUD`), each exposing ``objective_value`` and 

281 ``objective_gradient`` methods. 

282 penalties : list 

283 Scalar penalty weights, one per objective. Must have the same 

284 length as *objectives*. 

285 dij : npt.ArrayLike 

286 Dose-influence matrix mapping fluence vector to dose vector. 

287 store_csc_copy : bool, optional 

288 If ``True`` and a GPU sparse matrix is detected, stores a CSC copy 

289 of *dij* to accelerate the gradient back-projection. By default 

290 ``False``. 

291 """ 

292 

293 def __init__( 

294 self, objectives: list, penalties: list, dij: npt.ArrayLike, store_csc_copy: bool = False 

295 ): 

296 self.objectives = objectives 

297 self.penalties = penalties 

298 if len(self.objectives) != len(self.penalties): 

299 raise ValueError( 

300 f"Number of objectives is {len(self.objectives)}, but number of penalties is {len(self.penalties)}." 

301 ) 

302 self.dij = dij 

303 self.store_csc_copy = store_csc_copy and not NO_GPU and isspmatrix(self.dij) 

304 if self.store_csc_copy: 

305 self.dij_csc = cp.sparse.csc_matrix(dij).copy() 

306 

307 def objective_value(self, x: npt.ArrayLike) -> float: 

308 """Return the weighted sum of objective values at fluence vector 

309 *x*. 

310 """ 

311 d = self.dij @ x 

312 return sum([f.objective_value(d) * p for f, p in zip(self.objectives, self.penalties)]) 

313 

314 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike: 

315 """Return the gradient of the weighted sum with respect to *x*.""" 

316 d = self.dij @ x 

317 

318 if self.store_csc_copy: 

319 return ( 

320 sum([f.objective_gradient(d) * p for f, p in zip(self.objectives, self.penalties)]) 

321 @ self.dij_csc 

322 ) 

323 

324 return ( 

325 sum([f.objective_gradient(d) * p for f, p in zip(self.objectives, self.penalties)]) 

326 @ self.dij 

327 )