Coverage for suppy\utils\rt_objectives.py: 0%
127 statements
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
1## helper functions for radiotherapy
3import numpy as np
4import numpy.typing as npt
5import scipy
7from Testing._ignore.aaaaa import grad
9try:
10 import cupy as cp
11 from cupyx.scipy.sparse import isspmatrix
13 NO_GPU = False
15except ImportError:
16 NO_GPU = True
17 cp = np
20class SquaredDeviation:
21 """Mean squared deviation of dose from a reference dose level.
23 Penalizes both over- and under-dosing symmetrically:
25 f(d) = (1/N) * sum_{i in idxs} (d_i - d_ref)^2
27 Parameters
28 ----------
29 d_ref : float
30 Reference dose value.
31 idxs : list or npt.ArrayLike
32 Voxel indices (boolean mask or integer array) over which the
33 objective is evaluated.
34 """
36 def __init__(self, d_ref: float, idxs: list):
37 self.d_ref = d_ref
38 self.idxs = idxs
39 self.length = self.idxs.sum() if self.idxs.dtype == bool else len(self.idxs)
41 def objective_value(self, x: npt.ArrayLike) -> float:
42 """Return the mean squared deviation at dose vector *x*."""
43 diff = x[self.idxs] - self.d_ref
44 return 1 / self.length * (diff @ diff)
46 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike:
47 """Return the gradient of the objective with respect to *x*."""
48 xp = cp if isinstance(x, cp.ndarray) else np
49 grad = xp.zeros(x.shape[0])
50 grad[self.idxs] = 2 * (x[self.idxs] - self.d_ref)
51 return 1 / self.length * grad
54class SquaredOverdosing:
55 """Mean squared overdosing penalty.
57 Only penalizes voxels that receive more dose than the reference:
59 f(d) = (1/N) * sum_{i in idxs} max(d_i - d_ref, 0)^2
61 Parameters
62 ----------
63 d_ref : float
64 Reference dose value.
65 idxs : list or npt.ArrayLike
66 Voxel indices (boolean mask or integer array) over which the
67 objective is evaluated.
68 """
70 def __init__(self, d_ref: float, idxs: list):
71 self.d_ref = d_ref
72 self.idxs = idxs
73 self.length = self.idxs.sum() if self.idxs.dtype == bool else len(self.idxs)
75 def objective_value(self, x: npt.ArrayLike) -> float:
76 """Return the mean squared overdosing at dose vector *x*."""
77 d_diff = x[self.idxs] - self.d_ref
78 d_diff[d_diff < 0] = 0
79 return 1 / self.length * (d_diff @ d_diff)
81 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike:
82 """Return the gradient of the objective with respect to *x*."""
83 xp = cp if isinstance(x, cp.ndarray) else np
84 grad = xp.zeros(x.shape[0])
85 d_diff = x[self.idxs] - self.d_ref
86 d_diff[d_diff < 0] = 0
87 grad[self.idxs] = 2 * d_diff
88 return 1 / self.length * grad
91class EUD:
92 """Generalized Equivalent Uniform Dose (gEUD).
94 Computes the power-mean dose over the selected voxels:
96 f(d) = (1/N * sum_{i in idxs} d_i^a)^(1/a)
98 For a > 1 the metric is sensitive to hot spots (OAR use); for a < 1 it
99 is sensitive to cold spots (target use).
101 Parameters
102 ----------
103 exponent : float
104 Power-law exponent *a*.
105 idxs : list or npt.ArrayLike
106 Voxel indices (boolean mask or integer array) over which the
107 objective is evaluated.
108 """
110 def __init__(self, exponent: float, idxs: list):
111 self.exponent = exponent
112 self.idxs = idxs
113 self.length = self.idxs.sum() if self.idxs.dtype == bool else len(self.idxs)
115 def objective_value(self, x: npt.ArrayLike) -> float:
116 """Return the gEUD at dose vector *x*."""
117 return (1 / self.length * ((x[self.idxs] ** self.exponent).sum())) ** (1 / self.exponent)
119 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike:
120 """Return the gradient of the objective with respect to *x*."""
121 xp = cp if isinstance(x, cp.ndarray) else np
122 grad = xp.zeros(x.shape[0])
124 if x @ x == 0:
125 return grad
126 grad[self.idxs] = (
127 ((x[self.idxs] ** self.exponent).sum()) ** (1 / self.exponent - 1)
128 * (x[self.idxs] ** (self.exponent - 1))
129 / self.length ** (1 / self.exponent)
130 )
131 return grad
134class MeanDose:
135 """Mean dose over a set of voxels.
137 f(d) = (1/N) * sum_{i in idxs} d_i
139 Parameters
140 ----------
141 idxs : list or npt.ArrayLike
142 Voxel indices (boolean mask or integer array) over which the
143 objective is evaluated.
144 """
146 def __init__(self, idxs: list):
147 self.idxs = idxs
148 self.length = self.idxs.sum() if self.idxs.dtype == bool else len(self.idxs)
150 def objective_value(self, x: npt.ArrayLike) -> float:
151 """Return the mean dose at dose vector *x*."""
152 return 1 / self.length * ((x[self.idxs]).sum())
154 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike:
155 """Return the gradient of the objective with respect to *x*."""
156 xp = cp if isinstance(x, cp.ndarray) else np
157 grad = xp.zeros(x.shape[0])
158 grad[self.idxs] = 1 / self.length * np.ones(self.length)
159 return grad
162class MaxDVH:
163 """Dose-volume histogram (DVH) objective penalizing overdosing.
165 Enforces that at most *max_v_percent* % of the voxel volume receives
166 a dose exceeding *d_ref*. Voxels with dose in [d_ref, d_max] are
167 penalized, where d_max is the (100 - max_v_percent)-th dose percentile:
169 f(d) = (1/N) * sum_{i: d_ref <= d_i <= d_max} (d_i - d_ref)^2
171 Parameters
172 ----------
173 d_ref : float
174 Reference dose value; doses above this threshold are penalized.
175 max_v_percent : float
176 Maximum allowable percentage of volume receiving dose above *d_ref*.
177 idxs : list or npt.ArrayLike
178 Voxel indices (boolean mask or integer array) over which the
179 objective is evaluated.
180 """
182 def __init__(self, d_ref: float, max_v_percent: float, idxs: list):
183 self.d_ref = d_ref
184 self.max_v_percent = max_v_percent
185 self.idxs = idxs
186 self.length = self.idxs.sum() if self.idxs.dtype == bool else len(self.idxs)
188 def objective_value(self, x: npt.ArrayLike) -> float:
189 """Return the MaxDVH objective value at dose vector *x*."""
190 xp = cp if isinstance(x, cp.ndarray) else np
191 d = x[self.idxs]
192 d_diff = d - self.d_ref
194 # calculate the D_max_percent value
195 d_max = xp.percentile(
196 d, 100 - self.max_v_percent
197 ) # , method = 'interpolated_inverted_cdf')
198 d_diff[(d < self.d_ref) | (d > d_max)] = 0
199 return 1 / self.length * (d_diff @ d_diff)
201 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike:
202 """Return the gradient of the objective with respect to *x*."""
203 xp = cp if isinstance(x, cp.ndarray) else np
204 grad = xp.zeros(x.shape[0])
205 d = x[self.idxs]
206 d_diff = d - self.d_ref
208 d_max = xp.percentile(
209 d, 100 - self.max_v_percent
210 ) # , method = 'interpolated_inverted_cdf')
211 d_diff[(d < self.d_ref) | (d > d_max)] = 0
212 grad[self.idxs] = (2 / self.length) * d_diff
214 return grad
217class MinDVH:
218 """Dose-volume histogram (DVH) objective penalizing underdosing.
220 Enforces that at least *min_v_percent* % of the voxel volume receives
221 a dose of at least *d_ref*. Voxels with dose in [d_min, d_ref] are
222 penalized, where d_min is the *min_v_percent*-th dose percentile:
224 f(d) = (1/N) * sum_{i: d_min <= d_i <= d_ref} (d_i - d_ref)^2
226 Parameters
227 ----------
228 d_ref : float
229 Reference dose value; doses below this threshold are penalized.
230 min_v_percent : float
231 Minimum required percentage of volume receiving dose at least *d_ref*.
232 idxs : list or npt.ArrayLike
233 Voxel indices (boolean mask or integer array) over which the
234 objective is evaluated.
235 """
237 def __init__(self, d_ref: float, min_v_percent: float, idxs: list):
238 self.d_ref = d_ref
239 self.min_v_percent = min_v_percent
240 self.idxs = idxs
241 self.length = self.idxs.sum() if self.idxs.dtype == bool else len(self.idxs)
243 def objective_value(self, x: npt.ArrayLike) -> float:
244 """Return the MinDVH objective value at dose vector *x*."""
245 xp = cp if isinstance(x, cp.ndarray) else np
246 d = x[self.idxs]
247 d_diff = d - self.d_ref
249 # calculate the D_min_percent value
250 d_min = xp.percentile(d, self.min_v_percent) # , method = 'interpolated_inverted_cdf')
251 d_diff[(d > self.d_ref) | (d < d_min)] = 0
252 return 1 / self.length * (d_diff @ d_diff)
254 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike:
255 """Return the gradient of the objective with respect to *x*."""
256 xp = cp if isinstance(x, cp.ndarray) else np
257 grad = xp.zeros(x.shape[0])
258 d = x[self.idxs]
259 d_diff = d - self.d_ref
261 d_min = xp.percentile(d, self.min_v_percent) # , method = 'interpolated_inverted_cdf')
262 d_diff[(d > self.d_ref) | (d < d_min)] = 0
263 grad[self.idxs] = (2 / self.length) * d_diff
265 return grad
268class objectives:
269 """Weighted sum of radiotherapy dose objectives.
271 Combines multiple individual objective functions with penalty weights and
272 applies the dose-influence matrix *dij* to map fluence *x* to dose *d*:
274 F(x) = sum_k penalty_k * f_k(dij @ x)
276 Parameters
277 ----------
278 objectives : list
279 List of objective instances (e.g. :class:`SquaredDeviation`,
280 :class:`EUD`), each exposing ``objective_value`` and
281 ``objective_gradient`` methods.
282 penalties : list
283 Scalar penalty weights, one per objective. Must have the same
284 length as *objectives*.
285 dij : npt.ArrayLike
286 Dose-influence matrix mapping fluence vector to dose vector.
287 store_csc_copy : bool, optional
288 If ``True`` and a GPU sparse matrix is detected, stores a CSC copy
289 of *dij* to accelerate the gradient back-projection. By default
290 ``False``.
291 """
293 def __init__(
294 self, objectives: list, penalties: list, dij: npt.ArrayLike, store_csc_copy: bool = False
295 ):
296 self.objectives = objectives
297 self.penalties = penalties
298 if len(self.objectives) != len(self.penalties):
299 raise ValueError(
300 f"Number of objectives is {len(self.objectives)}, but number of penalties is {len(self.penalties)}."
301 )
302 self.dij = dij
303 self.store_csc_copy = store_csc_copy and not NO_GPU and isspmatrix(self.dij)
304 if self.store_csc_copy:
305 self.dij_csc = cp.sparse.csc_matrix(dij).copy()
307 def objective_value(self, x: npt.ArrayLike) -> float:
308 """Return the weighted sum of objective values at fluence vector
309 *x*.
310 """
311 d = self.dij @ x
312 return sum([f.objective_value(d) * p for f, p in zip(self.objectives, self.penalties)])
314 def objective_gradient(self, x: npt.ArrayLike) -> npt.ArrayLike:
315 """Return the gradient of the weighted sum with respect to *x*."""
316 d = self.dij @ x
318 if self.store_csc_copy:
319 return (
320 sum([f.objective_gradient(d) * p for f, p in zip(self.objectives, self.penalties)])
321 @ self.dij_csc
322 )
324 return (
325 sum([f.objective_gradient(d) * p for f, p in zip(self.objectives, self.penalties)])
326 @ self.dij
327 )