Coverage for src/distopf/matrix_models/objectives.py: 22%

153 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1from collections.abc import Collection 

2import pyomo.environ as pe 

3import cvxpy as cp 

4import numpy as np 

5from distopf import LinDistModel 

6from distopf.matrix_models.base import LinDistBase 

7 

8 

9def gradient_load_min(model: LinDistModel, *args, **kwargs) -> np.ndarray: 

10 """ 

11 Gradient of the objective function to minimize the load at the substation. 

12 c has a 1 for each active power flow out of the substation. 

13 Parameters 

14 ---------- 

15 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

16 

17 Returns 

18 ------- 

19 c : 1-D array 

20 The coefficients of the linear objective function to be minimized. 

21 """ 

22 c = np.zeros(model.n_x) 

23 for ph in "abc": 

24 if model.phase_exists(ph): 

25 c[model.idx("pij", model.swing_bus, ph)] = 1 

26 return c 

27 

28 

29def gradient_curtail(model: LinDistModel, *args, **kwargs) -> np.ndarray: 

30 """ 

31 Gradient of the objective function to minimize curtailment of DERs. 

32 Parameters 

33 ---------- 

34 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

35 

36 Returns 

37 ------- 

38 c : 1-D array 

39 The coefficients of the linear objective function to be minimized. 

40 

41 """ 

42 

43 all_pg_idx = np.array([]) 

44 for a in "abc": 

45 if not model.phase_exists(a): 

46 continue 

47 all_pg_idx = np.r_[all_pg_idx, model.pg_map[a].to_numpy()] 

48 all_pg_idx = all_pg_idx.astype(int) 

49 c = np.zeros(model.n_x) 

50 c[all_pg_idx] = -1 

51 return c 

52 

53 

54# ~~~ CVXPY Objectives ~~~ 

55def cp_obj_loss(model: LinDistModel, xk: cp.Variable, **kwargs) -> cp.Expression: 

56 """ 

57 

58 Parameters 

59 ---------- 

60 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

61 xk : cp.Variable 

62 kwargs : 

63 

64 Returns 

65 ------- 

66 f: cp.Expression 

67 Expression to be minimized 

68 

69 """ 

70 index_list = [] 

71 r_list = np.array([]) 

72 for a in "abc": 

73 if not model.phase_exists(a): 

74 continue 

75 i = model.x_maps[a].bi 

76 j = model.x_maps[a].bj 

77 r_list = np.append(r_list, np.array(model.r[a + a][i, j]).flatten()) 

78 r_list = np.append(r_list, np.array(model.r[a + a][i, j]).flatten()) 

79 index_list = np.append(index_list, model.x_maps[a].pij.to_numpy().flatten()) 

80 index_list = np.append(index_list, model.x_maps[a].qij.to_numpy().flatten()) 

81 r = np.array(r_list) 

82 ix = np.array(index_list).astype(int) 

83 if isinstance(xk, cp.Variable): 

84 return cp.vdot(r, xk[ix] ** 2) 

85 else: 

86 return np.vdot(r, xk[ix] ** 2) 

87 

88 

89def cp_obj_loss_old(model: LinDistModel, xk: cp.Variable, **kwargs) -> cp.Expression: 

90 """ 

91 

92 Parameters 

93 ---------- 

94 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

95 xk : cp.Variable 

96 kwargs : 

97 

98 Returns 

99 ------- 

100 f: cp.Expression 

101 Expression to be minimized 

102 

103 """ 

104 f_list = [] 

105 for j in range(1, model.nb): 

106 for a in "abc": 

107 if model.phase_exists(a, j): 

108 i = model.idx("bi", j, a)[0] 

109 f_list.append( 

110 model.r[a + a][i, j] * (xk[model.idx("pij", j, a)[0]] ** 2) 

111 ) 

112 f_list.append( 

113 model.r[a + a][i, j] * (xk[model.idx("qij", j, a)[0]] ** 2) 

114 ) 

115 return cp.sum(f_list) 

116 

117 

118def cp_obj_target_p_3ph( 

119 model: LinDistModel, xk: cp.Variable, **kwargs 

120) -> cp.Expression: 

121 """ 

122 

123 Parameters 

124 ---------- 

125 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

126 xk : cp.Variable 

127 kwargs : 

128 

129 Returns 

130 ------- 

131 f: cp.Expression 

132 Expression to be minimized 

133 

134 """ 

135 f = cp.Constant(0) 

136 target = kwargs["target"] 

137 if not isinstance(target, Collection): 

138 raise TypeError(f"target must be a size 3 array. Instead got {target}.") 

139 if len(target) != 3: 

140 raise TypeError(f"target must be a size 3 array. Instead got {target}.") 

141 

142 error_percent = kwargs.get("error_percent", np.zeros(3)) 

143 for i, ph in enumerate("abc"): 

144 if model.phase_exists(ph): 

145 p = 0 

146 for out_branch in model.branches_out_of_j("pij", 0, ph): 

147 p = p + xk[out_branch] 

148 f += (target[i] - p * (1 + error_percent[i] / 100)) ** 2 

149 return f 

150 

151 

152def cp_obj_target_p_total( 

153 model: LinDistModel | LinDistModel, xk: cp.Variable, **kwargs 

154) -> cp.Expression: 

155 """ 

156 

157 Parameters 

158 ---------- 

159 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

160 xk : cp.Variable 

161 kwargs : 

162 

163 Returns 

164 ------- 

165 f: cp.Expression 

166 Expression to be minimized 

167 

168 """ 

169 actual = 0 

170 target = kwargs["target"] 

171 if not isinstance(target, (int, float)): 

172 raise TypeError( 

173 f"target must be a float or integer value. Instead got {target}." 

174 ) 

175 error_percent = kwargs.get("error_percent", np.zeros(3)) 

176 for i, ph in enumerate("abc"): 

177 if model.phase_exists(ph): 

178 p = 0 

179 for out_branch in model.branches_out_of_j("pij", 0, ph): 

180 p = p + xk[out_branch] 

181 actual += p 

182 f = (target - actual * (1 + error_percent[0] / 100)) ** 2 

183 return f 

184 

185 

186def cp_obj_target_q_3ph( 

187 model: LinDistModel, xk: cp.Variable, **kwargs 

188) -> cp.Expression: 

189 """ 

190 

191 Parameters 

192 ---------- 

193 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

194 xk : cp.Variable 

195 kwargs : 

196 

197 Returns 

198 ------- 

199 f: cp.Expression 

200 Expression to be minimized 

201 

202 """ 

203 target = kwargs["target"] 

204 if not isinstance(target, Collection): 

205 raise TypeError(f"target must be a size 3 array. Instead got {target}.") 

206 if len(target) != 3: 

207 raise TypeError(f"target must be a size 3 array. Instead got {target}.") 

208 error_percent = kwargs.get("error_percent", np.zeros(3)) 

209 f = cp.Constant(0) 

210 for i, ph in enumerate("abc"): 

211 if model.phase_exists(ph): 

212 q = 0 

213 for out_branch in model.branches_out_of_j("qij", 0, ph): 

214 q = q + xk[out_branch] 

215 f += (target[i] - q * (1 + error_percent[i] / 100)) ** 2 

216 return f 

217 

218 

219def cp_obj_target_q_total( 

220 model: LinDistModel, xk: cp.Variable, **kwargs 

221) -> cp.Expression: 

222 """ 

223 Parameters 

224 ---------- 

225 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

226 xk : cp.Variable 

227 kwargs : 

228 

229 Returns 

230 ------- 

231 f: cp.Expression 

232 Expression to be minimized 

233 

234 """ 

235 actual = 0 

236 target = kwargs["target"] 

237 if not isinstance(target, (int, float)): 

238 raise TypeError( 

239 f"target must be a float or integer value. Instead got {target}." 

240 ) 

241 error_percent = kwargs.get("error_percent", np.zeros(3)) 

242 for i, ph in enumerate("abc"): 

243 if model.phase_exists(ph): 

244 q = 0 

245 for out_branch in model.branches_out_of_j("qij", 0, ph): 

246 q = q + xk[out_branch] 

247 actual += q 

248 f = (target - actual * (1 + error_percent[0] / 100)) ** 2 

249 return f 

250 

251 

252# def cp_obj_curtail(model: LinDistModel, xk: cp.Variable, **kwargs) -> cp.Expression: 

253# """ 

254# Objective function to minimize curtailment of DERs. 

255# Min sum((P_der_max - P_der)^2) 

256# Parameters 

257# ---------- 

258# model : LinDistModel, or LinDistModelP, or LinDistModelQ 

259# xk : cp.Variable 

260# 

261# Returns 

262# ------- 

263# f: cp.Expression 

264# Expression to be minimized 

265# """ 

266# f = cp.Constant(0) 

267# for i in range(model.ctr_var_start_idx, model.n_x): 

268# f += (model.bounds[i][1] - xk[i]) ** 2 

269# return f 

270 

271 

272def cp_obj_curtail(model: LinDistModel, xk: cp.Variable, **kwargs) -> cp.Expression: 

273 """ 

274 Objective function to minimize curtailment of DERs. 

275 Min sum((P_der_max - P_der)^2) 

276 Parameters 

277 ---------- 

278 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

279 xk : cp.Variable 

280 

281 Returns 

282 ------- 

283 f: cp.Expression 

284 Expression to be minimized 

285 """ 

286 

287 all_pg_idx = np.array([]) 

288 for a in "abc": 

289 if not model.phase_exists(a): 

290 continue 

291 all_pg_idx = np.r_[all_pg_idx, model.pg_map[a].to_numpy()] 

292 all_pg_idx = all_pg_idx.astype(int) 

293 return cp.sum((model.x_max[all_pg_idx] - xk[all_pg_idx]) ** 2) 

294 

295 

296def cp_obj_curtail_lp(model: LinDistModel, xk: cp.Variable, **kwargs) -> cp.Expression: 

297 """ 

298 Objective function to minimize curtailment of DERs. 

299 Min sum((P_der_max - P_der)^2) 

300 Parameters 

301 ---------- 

302 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

303 xk : cp.Variable 

304 

305 Returns 

306 ------- 

307 f: cp.Expression 

308 Expression to be minimized 

309 """ 

310 

311 all_pg_idx = np.array([]) 

312 for a in "abc": 

313 if not model.phase_exists(a): 

314 continue 

315 all_pg_idx = np.r_[all_pg_idx, model.pg_map[a].to_numpy()] 

316 all_pg_idx = all_pg_idx.astype(int) 

317 return cp.sum((model.x_max[all_pg_idx] - xk[all_pg_idx])) 

318 

319 

320def cp_obj_none(*args, **kwargs) -> cp.Constant: 

321 """ 

322 For use with cvxpy_solve() to run a power flow with no optimization. 

323 

324 Returns 

325 ------- 

326 constant 0 

327 """ 

328 return cp.Constant(0) 

329 

330 

331# ~~~ PYOMO Objectives ~~~ 

332 

333 

334def pyo_obj_loss(model: LinDistBase, x: pe.Var, **kwargs) -> float: 

335 """ 

336 

337 Parameters 

338 ---------- 

339 model : LinDistModel 

340 x : pe.Var 

341 kwargs : 

342 

343 Returns 

344 ------- 

345 cost : float 

346 

347 """ 

348 index_list = [] 

349 r_list = np.array([]) 

350 for a in "abc": 

351 if not model.phase_exists(a): 

352 continue 

353 i = model.x_maps[a].bi 

354 j = model.x_maps[a].bj 

355 r_list = np.append(r_list, np.array(model.r[a + a][i, j]).flatten()) 

356 r_list = np.append(r_list, np.array(model.r[a + a][i, j]).flatten()) 

357 index_list = np.append(index_list, model.x_maps[a].pij.to_numpy().flatten()) 

358 index_list = np.append(index_list, model.x_maps[a].qij.to_numpy().flatten()) 

359 r = np.array(r_list) 

360 ix = np.array(index_list).astype(int) 

361 terms = [] 

362 for i in range(len(ix)): 

363 terms.append(r * x[ix[i] ** 2]) 

364 return sum(terms) 

365 

366 

367def pyo_obj_curtail(model: LinDistBase, x: pe.Var, **kwargs) -> float: 

368 """ 

369 Objective function to minimize curtailment of DERs. 

370 Min sum((P_der_max - P_der)^2) 

371 Parameters 

372 ---------- 

373 model : LinDistBase 

374 x : pe.Var 

375 

376 Returns 

377 ------- 

378 cost : float 

379 """ 

380 

381 all_pg_idx = np.array([]) 

382 for a in "abc": 

383 if not model.phase_exists(a): 

384 continue 

385 all_pg_idx = np.r_[all_pg_idx, model.pg_map[a].to_numpy()] 

386 all_pg_idx = all_pg_idx.astype(int) 

387 terms = [] 

388 for i in range(len(all_pg_idx)): 

389 terms.append((model.x_max[all_pg_idx[i]] - x[all_pg_idx[i]]) ** 2) 

390 return sum(terms)