Coverage for src/distopf/matrix_models/objectives.py: 22%
153 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
1from collections.abc import Collection
2import pyomo.environ as pe
3import cvxpy as cp
4import numpy as np
5from distopf import LinDistModel
6from distopf.matrix_models.base import LinDistBase
9def gradient_load_min(model: LinDistModel, *args, **kwargs) -> np.ndarray:
10 """
11 Gradient of the objective function to minimize the load at the substation.
12 c has a 1 for each active power flow out of the substation.
13 Parameters
14 ----------
15 model : LinDistModel, or LinDistModelP, or LinDistModelQ
17 Returns
18 -------
19 c : 1-D array
20 The coefficients of the linear objective function to be minimized.
21 """
22 c = np.zeros(model.n_x)
23 for ph in "abc":
24 if model.phase_exists(ph):
25 c[model.idx("pij", model.swing_bus, ph)] = 1
26 return c
29def gradient_curtail(model: LinDistModel, *args, **kwargs) -> np.ndarray:
30 """
31 Gradient of the objective function to minimize curtailment of DERs.
32 Parameters
33 ----------
34 model : LinDistModel, or LinDistModelP, or LinDistModelQ
36 Returns
37 -------
38 c : 1-D array
39 The coefficients of the linear objective function to be minimized.
41 """
43 all_pg_idx = np.array([])
44 for a in "abc":
45 if not model.phase_exists(a):
46 continue
47 all_pg_idx = np.r_[all_pg_idx, model.pg_map[a].to_numpy()]
48 all_pg_idx = all_pg_idx.astype(int)
49 c = np.zeros(model.n_x)
50 c[all_pg_idx] = -1
51 return c
54# ~~~ CVXPY Objectives ~~~
55def cp_obj_loss(model: LinDistModel, xk: cp.Variable, **kwargs) -> cp.Expression:
56 """
58 Parameters
59 ----------
60 model : LinDistModel, or LinDistModelP, or LinDistModelQ
61 xk : cp.Variable
62 kwargs :
64 Returns
65 -------
66 f: cp.Expression
67 Expression to be minimized
69 """
70 index_list = []
71 r_list = np.array([])
72 for a in "abc":
73 if not model.phase_exists(a):
74 continue
75 i = model.x_maps[a].bi
76 j = model.x_maps[a].bj
77 r_list = np.append(r_list, np.array(model.r[a + a][i, j]).flatten())
78 r_list = np.append(r_list, np.array(model.r[a + a][i, j]).flatten())
79 index_list = np.append(index_list, model.x_maps[a].pij.to_numpy().flatten())
80 index_list = np.append(index_list, model.x_maps[a].qij.to_numpy().flatten())
81 r = np.array(r_list)
82 ix = np.array(index_list).astype(int)
83 if isinstance(xk, cp.Variable):
84 return cp.vdot(r, xk[ix] ** 2)
85 else:
86 return np.vdot(r, xk[ix] ** 2)
89def cp_obj_loss_old(model: LinDistModel, xk: cp.Variable, **kwargs) -> cp.Expression:
90 """
92 Parameters
93 ----------
94 model : LinDistModel, or LinDistModelP, or LinDistModelQ
95 xk : cp.Variable
96 kwargs :
98 Returns
99 -------
100 f: cp.Expression
101 Expression to be minimized
103 """
104 f_list = []
105 for j in range(1, model.nb):
106 for a in "abc":
107 if model.phase_exists(a, j):
108 i = model.idx("bi", j, a)[0]
109 f_list.append(
110 model.r[a + a][i, j] * (xk[model.idx("pij", j, a)[0]] ** 2)
111 )
112 f_list.append(
113 model.r[a + a][i, j] * (xk[model.idx("qij", j, a)[0]] ** 2)
114 )
115 return cp.sum(f_list)
118def cp_obj_target_p_3ph(
119 model: LinDistModel, xk: cp.Variable, **kwargs
120) -> cp.Expression:
121 """
123 Parameters
124 ----------
125 model : LinDistModel, or LinDistModelP, or LinDistModelQ
126 xk : cp.Variable
127 kwargs :
129 Returns
130 -------
131 f: cp.Expression
132 Expression to be minimized
134 """
135 f = cp.Constant(0)
136 target = kwargs["target"]
137 if not isinstance(target, Collection):
138 raise TypeError(f"target must be a size 3 array. Instead got {target}.")
139 if len(target) != 3:
140 raise TypeError(f"target must be a size 3 array. Instead got {target}.")
142 error_percent = kwargs.get("error_percent", np.zeros(3))
143 for i, ph in enumerate("abc"):
144 if model.phase_exists(ph):
145 p = 0
146 for out_branch in model.branches_out_of_j("pij", 0, ph):
147 p = p + xk[out_branch]
148 f += (target[i] - p * (1 + error_percent[i] / 100)) ** 2
149 return f
152def cp_obj_target_p_total(
153 model: LinDistModel | LinDistModel, xk: cp.Variable, **kwargs
154) -> cp.Expression:
155 """
157 Parameters
158 ----------
159 model : LinDistModel, or LinDistModelP, or LinDistModelQ
160 xk : cp.Variable
161 kwargs :
163 Returns
164 -------
165 f: cp.Expression
166 Expression to be minimized
168 """
169 actual = 0
170 target = kwargs["target"]
171 if not isinstance(target, (int, float)):
172 raise TypeError(
173 f"target must be a float or integer value. Instead got {target}."
174 )
175 error_percent = kwargs.get("error_percent", np.zeros(3))
176 for i, ph in enumerate("abc"):
177 if model.phase_exists(ph):
178 p = 0
179 for out_branch in model.branches_out_of_j("pij", 0, ph):
180 p = p + xk[out_branch]
181 actual += p
182 f = (target - actual * (1 + error_percent[0] / 100)) ** 2
183 return f
186def cp_obj_target_q_3ph(
187 model: LinDistModel, xk: cp.Variable, **kwargs
188) -> cp.Expression:
189 """
191 Parameters
192 ----------
193 model : LinDistModel, or LinDistModelP, or LinDistModelQ
194 xk : cp.Variable
195 kwargs :
197 Returns
198 -------
199 f: cp.Expression
200 Expression to be minimized
202 """
203 target = kwargs["target"]
204 if not isinstance(target, Collection):
205 raise TypeError(f"target must be a size 3 array. Instead got {target}.")
206 if len(target) != 3:
207 raise TypeError(f"target must be a size 3 array. Instead got {target}.")
208 error_percent = kwargs.get("error_percent", np.zeros(3))
209 f = cp.Constant(0)
210 for i, ph in enumerate("abc"):
211 if model.phase_exists(ph):
212 q = 0
213 for out_branch in model.branches_out_of_j("qij", 0, ph):
214 q = q + xk[out_branch]
215 f += (target[i] - q * (1 + error_percent[i] / 100)) ** 2
216 return f
219def cp_obj_target_q_total(
220 model: LinDistModel, xk: cp.Variable, **kwargs
221) -> cp.Expression:
222 """
223 Parameters
224 ----------
225 model : LinDistModel, or LinDistModelP, or LinDistModelQ
226 xk : cp.Variable
227 kwargs :
229 Returns
230 -------
231 f: cp.Expression
232 Expression to be minimized
234 """
235 actual = 0
236 target = kwargs["target"]
237 if not isinstance(target, (int, float)):
238 raise TypeError(
239 f"target must be a float or integer value. Instead got {target}."
240 )
241 error_percent = kwargs.get("error_percent", np.zeros(3))
242 for i, ph in enumerate("abc"):
243 if model.phase_exists(ph):
244 q = 0
245 for out_branch in model.branches_out_of_j("qij", 0, ph):
246 q = q + xk[out_branch]
247 actual += q
248 f = (target - actual * (1 + error_percent[0] / 100)) ** 2
249 return f
252# def cp_obj_curtail(model: LinDistModel, xk: cp.Variable, **kwargs) -> cp.Expression:
253# """
254# Objective function to minimize curtailment of DERs.
255# Min sum((P_der_max - P_der)^2)
256# Parameters
257# ----------
258# model : LinDistModel, or LinDistModelP, or LinDistModelQ
259# xk : cp.Variable
260#
261# Returns
262# -------
263# f: cp.Expression
264# Expression to be minimized
265# """
266# f = cp.Constant(0)
267# for i in range(model.ctr_var_start_idx, model.n_x):
268# f += (model.bounds[i][1] - xk[i]) ** 2
269# return f
272def cp_obj_curtail(model: LinDistModel, xk: cp.Variable, **kwargs) -> cp.Expression:
273 """
274 Objective function to minimize curtailment of DERs.
275 Min sum((P_der_max - P_der)^2)
276 Parameters
277 ----------
278 model : LinDistModel, or LinDistModelP, or LinDistModelQ
279 xk : cp.Variable
281 Returns
282 -------
283 f: cp.Expression
284 Expression to be minimized
285 """
287 all_pg_idx = np.array([])
288 for a in "abc":
289 if not model.phase_exists(a):
290 continue
291 all_pg_idx = np.r_[all_pg_idx, model.pg_map[a].to_numpy()]
292 all_pg_idx = all_pg_idx.astype(int)
293 return cp.sum((model.x_max[all_pg_idx] - xk[all_pg_idx]) ** 2)
296def cp_obj_curtail_lp(model: LinDistModel, xk: cp.Variable, **kwargs) -> cp.Expression:
297 """
298 Objective function to minimize curtailment of DERs.
299 Min sum((P_der_max - P_der)^2)
300 Parameters
301 ----------
302 model : LinDistModel, or LinDistModelP, or LinDistModelQ
303 xk : cp.Variable
305 Returns
306 -------
307 f: cp.Expression
308 Expression to be minimized
309 """
311 all_pg_idx = np.array([])
312 for a in "abc":
313 if not model.phase_exists(a):
314 continue
315 all_pg_idx = np.r_[all_pg_idx, model.pg_map[a].to_numpy()]
316 all_pg_idx = all_pg_idx.astype(int)
317 return cp.sum((model.x_max[all_pg_idx] - xk[all_pg_idx]))
320def cp_obj_none(*args, **kwargs) -> cp.Constant:
321 """
322 For use with cvxpy_solve() to run a power flow with no optimization.
324 Returns
325 -------
326 constant 0
327 """
328 return cp.Constant(0)
331# ~~~ PYOMO Objectives ~~~
334def pyo_obj_loss(model: LinDistBase, x: pe.Var, **kwargs) -> float:
335 """
337 Parameters
338 ----------
339 model : LinDistModel
340 x : pe.Var
341 kwargs :
343 Returns
344 -------
345 cost : float
347 """
348 index_list = []
349 r_list = np.array([])
350 for a in "abc":
351 if not model.phase_exists(a):
352 continue
353 i = model.x_maps[a].bi
354 j = model.x_maps[a].bj
355 r_list = np.append(r_list, np.array(model.r[a + a][i, j]).flatten())
356 r_list = np.append(r_list, np.array(model.r[a + a][i, j]).flatten())
357 index_list = np.append(index_list, model.x_maps[a].pij.to_numpy().flatten())
358 index_list = np.append(index_list, model.x_maps[a].qij.to_numpy().flatten())
359 r = np.array(r_list)
360 ix = np.array(index_list).astype(int)
361 terms = []
362 for i in range(len(ix)):
363 terms.append(r * x[ix[i] ** 2])
364 return sum(terms)
367def pyo_obj_curtail(model: LinDistBase, x: pe.Var, **kwargs) -> float:
368 """
369 Objective function to minimize curtailment of DERs.
370 Min sum((P_der_max - P_der)^2)
371 Parameters
372 ----------
373 model : LinDistBase
374 x : pe.Var
376 Returns
377 -------
378 cost : float
379 """
381 all_pg_idx = np.array([])
382 for a in "abc":
383 if not model.phase_exists(a):
384 continue
385 all_pg_idx = np.r_[all_pg_idx, model.pg_map[a].to_numpy()]
386 all_pg_idx = all_pg_idx.astype(int)
387 terms = []
388 for i in range(len(all_pg_idx)):
389 terms.append((model.x_max[all_pg_idx[i]] - x[all_pg_idx[i]]) ** 2)
390 return sum(terms)