Coverage for src/distopf/matrix_models/multiperiod/objectives.py: 22%
152 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
1import cvxpy as cp
2import numpy as np
3from distopf.matrix_models.multiperiod import LinDistBaseMP
6# cost = pd.read_csv("cost_data.csv")
7def gradient_load_min(model):
8 c = np.zeros(model.n_x)
9 for ph in "abc":
10 if model.phase_exists(ph):
11 c[model.branches_out_of_j("pij", 0, ph)] = 1
12 return c
15def gradient_curtail(model):
16 c = np.zeros(model.n_x)
17 for i in range(
18 model.p_der_start_phase_idx["a"],
19 model.p_der_start_phase_idx["c"] + len(model.gen_buses["c"]),
20 ):
21 c[i] = -1
22 return c
25def gradient_battery_efficiency(model: LinDistBaseMP, xk: cp.Variable, **kwargs):
26 """
27 Parameters
28 ----------
29 model : LinDistModel, or LinDistModelP, or LinDistModelQ
30 xk : cp.Variable
31 kwargs :
33 Returns
34 -------
35 c: numpy array, gradient of objective function c'x
37 """
38 if "start_step" in model.__dict__.keys():
39 start_step = model.start_step
40 else:
41 start_step = 0
42 c = np.zeros(model.n_x)
43 for t in range(start_step, start_step + model.n_steps):
44 for a in "abc":
45 if not model.phase_exists(a):
46 continue
47 charging_efficiency = model.bat.loc[
48 model.charge_map[t][a].index, f"nc_{a}"
49 ].to_numpy()
50 discharging_efficiency = model.bat.loc[
51 model.discharge_map[t][a].index, f"nd_{a}"
52 ].to_numpy()
53 c[model.charge_map[t][a].to_numpy()] = 1 - charging_efficiency # type: ignore
54 c[model.discharge_map[t][a].to_numpy()] = (1 / discharging_efficiency) - 1 # type: ignore
55 return c
58# ~~~ Quadratic objective with linear constraints for use with solve_quad()~~~
61def cp_obj_loss(model: LinDistBaseMP, xk: cp.Variable, **kwargs) -> cp.Expression:
62 """
64 Parameters
65 ----------
66 model : LinDistModel, or LinDistModelP, or LinDistModelQ
67 xk : cp.Variable
68 kwargs :
70 Returns
71 -------
72 f: cp.Expression
73 Expression to be minimized
75 """
76 if "start_step" in model.__dict__.keys():
77 start_step = model.start_step
78 else:
79 start_step = 0
80 index_list: list[int] = []
81 r_list = np.array([])
82 for t in range(start_step, start_step + model.n_steps):
83 for a in "abc":
84 if not model.phase_exists(a):
85 continue
86 i = model.x_maps[t][a].bi
87 j = model.x_maps[t][a].bj
88 r_list = np.append(r_list, np.array(model.r[a + a][i, j]).flatten())
89 r_list = np.append(r_list, np.array(model.r[a + a][i, j]).flatten())
90 index_list = np.append(
91 index_list, model.x_maps[t][a].pij.to_numpy().flatten()
92 ) # type: ignore
93 index_list = np.append(
94 index_list, model.x_maps[t][a].qij.to_numpy().flatten()
95 ) # type: ignore
96 r = np.array(r_list)
97 ix = np.array(index_list).astype(int)
98 if isinstance(xk, cp.Variable):
99 return cp.vdot(r, xk[ix] ** 2)
100 else:
101 return np.vdot(r, xk[ix] ** 2)
104def cp_battery_efficiency(
105 model: LinDistBaseMP, xk: cp.Variable, **kwargs
106) -> cp.Expression:
107 """
109 Parameters
110 ----------
111 model : LinDistModel, or LinDistModelP, or LinDistModelQ
112 xk : cp.Variable
113 kwargs :
115 Returns
116 -------
117 f: cp.Expression
118 Expression to be minimized
120 """
121 if "start_step" in model.__dict__.keys():
122 start_step = model.start_step
123 else:
124 start_step = 0
125 vec1_list:list[float] = []
126 index_list:list[int] = []
127 for t in range(start_step, start_step + model.n_steps):
128 for a in "abc":
129 if not model.phase_exists(a):
130 continue
131 charging_efficiency = model.bat.loc[
132 model.charge_map[t][a].index, f"nc_{a}"
133 ].to_numpy()
134 discharging_efficiency = model.bat.loc[
135 model.discharge_map[t][a].index, f"nd_{a}"
136 ].to_numpy()
137 vec1_list.extend((1 - charging_efficiency)) # type: ignore
138 vec1_list.extend(((1 / discharging_efficiency) - 1)) # type: ignore
139 index_list.extend(model.charge_map[t][a].to_numpy())
140 index_list.extend(model.discharge_map[t][a].to_numpy())
141 vec1 = np.array(vec1_list)
142 ix = np.array(index_list)
143 if isinstance(xk, cp.Variable):
144 return 1e-3 * cp.vdot(vec1, xk[ix])
145 else:
146 return 1e-3 * np.vdot(vec1, xk[ix])
149def cp_obj_loss_batt(model: LinDistBaseMP, xk: cp.Variable, **kwargs) -> cp.Expression:
150 """
152 Parameters
153 ----------
154 model : LinDistModel, or LinDistModelP, or LinDistModelQ
155 xk : cp.Variable
156 kwargs :
158 Returns
159 -------
160 f: cp.Expression
161 Expression to be minimized
163 """
164 return cp_obj_loss(model, xk) + cp_battery_efficiency(model, xk)
167def charge_batteries(model, xk, **kwargs) -> cp.Expression:
168 f_list = []
169 for t in range(model.n_steps):
170 for a in "abc":
171 if not model.phase_exists(a):
172 continue
173 f_list.append(-cp.sum(xk[model.soc_map[t][a].to_numpy()]))
174 return cp.sum(f_list)
177def cp_obj_target_p_3ph(model, xk, **kwargs):
178 f = cp.Constant(0)
179 target = kwargs["target"]
180 loss_percent = kwargs.get("loss_percent", np.zeros(3))
181 for i, ph in enumerate("abc"):
182 if model.phase_exists(ph):
183 p = 0
184 for out_branch in model.branches_out_of_j("pij", 0, ph):
185 p = p + xk[out_branch]
186 f += (target[i] - p * (1 + loss_percent[i] / 100)) ** 2
187 return f
190def cp_obj_target_p_total(model, xk, **kwargs):
191 actual = 0
192 target = kwargs["target"]
193 loss_percent = kwargs.get("loss_percent", np.zeros(3))
194 for i, ph in enumerate("abc"):
195 if model.phase_exists(ph):
196 p = 0
197 for out_branch in model.branches_out_of_j("pij", 0, ph):
198 p = p + xk[out_branch]
199 actual += p
200 f = (target - actual * (1 + loss_percent[0] / 100)) ** 2
201 return f
204def cp_obj_target_q_3ph(model, xk, **kwargs):
205 target_q = kwargs["target"]
206 loss_percent = kwargs.get("loss_percent", np.zeros(3))
207 f = cp.Constant(0)
208 for i, ph in enumerate("abc"):
209 if model.phase_exists(ph):
210 q = 0
211 for out_branch in model.branches_out_of_j("qij", 0, ph):
212 q = q + xk[out_branch]
213 f += (target_q[i] - q * (1 + loss_percent[i] / 100)) ** 2
214 return f
217def cp_obj_target_q_total(model, xk, **kwargs):
218 actual = 0
219 target = kwargs["target"]
220 loss_percent = kwargs.get("loss_percent", np.zeros(3))
221 for i, ph in enumerate("abc"):
222 if model.phase_exists(ph):
223 q = 0
224 for out_branch in model.branches_out_of_j("qij", 0, ph):
225 q = q + xk[out_branch]
226 actual += q
227 f = (target - actual * (1 + loss_percent[0] / 100)) ** 2
228 return f
231def cp_obj_curtail(model: LinDistBaseMP, xk: cp.Variable, **kwargs) -> cp.Expression:
232 """
233 Objective function to minimize curtailment of DERs.
234 Min sum((P_der_max - P_der)^2)
235 Parameters
236 ----------
237 model : LinDistModel, or LinDistModelP, or LinDistModelQ
238 xk : cp.Variable
240 Returns
241 -------
242 f: cp.Expression
243 Expression to be minimized
244 """
246 if "start_step" in model.__dict__.keys():
247 start_step = model.start_step
248 else:
249 start_step = 0
250 all_pg_idx = np.array([])
251 for t in range(start_step, start_step + model.n_steps):
252 for a in "abc":
253 if not model.phase_exists(a):
254 continue
255 all_pg_idx = np.r_[all_pg_idx, model.pg_map[t][a].to_numpy()]
256 all_pg_idx = all_pg_idx.astype(int)
257 return cp.sum((model.x_max[all_pg_idx] - xk[all_pg_idx]) ** 2)
260def cp_obj_curtail_lp(model: LinDistBaseMP, xk: cp.Variable, **kwargs) -> cp.Expression:
261 """
262 Objective function to minimize curtailment of DERs.
263 Min sum((P_der_max - P_der)^2)
264 Parameters
265 ----------
266 model : LinDistModel, or LinDistModelP, or LinDistModelQ
267 xk : cp.Variable
269 Returns
270 -------
271 f: cp.Expression
272 Expression to be minimized
273 """
275 if "start_step" in model.__dict__.keys():
276 start_step = model.start_step
277 else:
278 start_step = 0
279 all_pg_idx = np.array([])
280 for t in range(start_step, start_step + model.n_steps):
281 for a in "abc":
282 if not model.phase_exists(a):
283 continue
284 all_pg_idx = np.r_[all_pg_idx, model.pg_map[t][a].to_numpy()]
285 all_pg_idx = all_pg_idx.astype(int)
286 return cp.sum((model.x_max[all_pg_idx] - xk[all_pg_idx]))
289def cp_obj_none(*args, **kwargs) -> cp.Constant:
290 """
291 For use with cvxpy_solve() to run a power flow with no optimization.
293 Returns
294 -------
295 constant 0
296 """
297 return cp.Constant(0)