Coverage for src/distopf/matrix_models/multiperiod/objectives.py: 22%

152 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1import cvxpy as cp 

2import numpy as np 

3from distopf.matrix_models.multiperiod import LinDistBaseMP 

4 

5 

6# cost = pd.read_csv("cost_data.csv") 

7def gradient_load_min(model): 

8 c = np.zeros(model.n_x) 

9 for ph in "abc": 

10 if model.phase_exists(ph): 

11 c[model.branches_out_of_j("pij", 0, ph)] = 1 

12 return c 

13 

14 

15def gradient_curtail(model): 

16 c = np.zeros(model.n_x) 

17 for i in range( 

18 model.p_der_start_phase_idx["a"], 

19 model.p_der_start_phase_idx["c"] + len(model.gen_buses["c"]), 

20 ): 

21 c[i] = -1 

22 return c 

23 

24 

25def gradient_battery_efficiency(model: LinDistBaseMP, xk: cp.Variable, **kwargs): 

26 """ 

27 Parameters 

28 ---------- 

29 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

30 xk : cp.Variable 

31 kwargs : 

32 

33 Returns 

34 ------- 

35 c: numpy array, gradient of objective function c'x 

36 

37 """ 

38 if "start_step" in model.__dict__.keys(): 

39 start_step = model.start_step 

40 else: 

41 start_step = 0 

42 c = np.zeros(model.n_x) 

43 for t in range(start_step, start_step + model.n_steps): 

44 for a in "abc": 

45 if not model.phase_exists(a): 

46 continue 

47 charging_efficiency = model.bat.loc[ 

48 model.charge_map[t][a].index, f"nc_{a}" 

49 ].to_numpy() 

50 discharging_efficiency = model.bat.loc[ 

51 model.discharge_map[t][a].index, f"nd_{a}" 

52 ].to_numpy() 

53 c[model.charge_map[t][a].to_numpy()] = 1 - charging_efficiency # type: ignore 

54 c[model.discharge_map[t][a].to_numpy()] = (1 / discharging_efficiency) - 1 # type: ignore 

55 return c 

56 

57 

58# ~~~ Quadratic objective with linear constraints for use with solve_quad()~~~ 

59 

60 

61def cp_obj_loss(model: LinDistBaseMP, xk: cp.Variable, **kwargs) -> cp.Expression: 

62 """ 

63 

64 Parameters 

65 ---------- 

66 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

67 xk : cp.Variable 

68 kwargs : 

69 

70 Returns 

71 ------- 

72 f: cp.Expression 

73 Expression to be minimized 

74 

75 """ 

76 if "start_step" in model.__dict__.keys(): 

77 start_step = model.start_step 

78 else: 

79 start_step = 0 

80 index_list: list[int] = [] 

81 r_list = np.array([]) 

82 for t in range(start_step, start_step + model.n_steps): 

83 for a in "abc": 

84 if not model.phase_exists(a): 

85 continue 

86 i = model.x_maps[t][a].bi 

87 j = model.x_maps[t][a].bj 

88 r_list = np.append(r_list, np.array(model.r[a + a][i, j]).flatten()) 

89 r_list = np.append(r_list, np.array(model.r[a + a][i, j]).flatten()) 

90 index_list = np.append( 

91 index_list, model.x_maps[t][a].pij.to_numpy().flatten() 

92 ) # type: ignore 

93 index_list = np.append( 

94 index_list, model.x_maps[t][a].qij.to_numpy().flatten() 

95 ) # type: ignore 

96 r = np.array(r_list) 

97 ix = np.array(index_list).astype(int) 

98 if isinstance(xk, cp.Variable): 

99 return cp.vdot(r, xk[ix] ** 2) 

100 else: 

101 return np.vdot(r, xk[ix] ** 2) 

102 

103 

104def cp_battery_efficiency( 

105 model: LinDistBaseMP, xk: cp.Variable, **kwargs 

106) -> cp.Expression: 

107 """ 

108 

109 Parameters 

110 ---------- 

111 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

112 xk : cp.Variable 

113 kwargs : 

114 

115 Returns 

116 ------- 

117 f: cp.Expression 

118 Expression to be minimized 

119 

120 """ 

121 if "start_step" in model.__dict__.keys(): 

122 start_step = model.start_step 

123 else: 

124 start_step = 0 

125 vec1_list:list[float] = [] 

126 index_list:list[int] = [] 

127 for t in range(start_step, start_step + model.n_steps): 

128 for a in "abc": 

129 if not model.phase_exists(a): 

130 continue 

131 charging_efficiency = model.bat.loc[ 

132 model.charge_map[t][a].index, f"nc_{a}" 

133 ].to_numpy() 

134 discharging_efficiency = model.bat.loc[ 

135 model.discharge_map[t][a].index, f"nd_{a}" 

136 ].to_numpy() 

137 vec1_list.extend((1 - charging_efficiency)) # type: ignore 

138 vec1_list.extend(((1 / discharging_efficiency) - 1)) # type: ignore 

139 index_list.extend(model.charge_map[t][a].to_numpy()) 

140 index_list.extend(model.discharge_map[t][a].to_numpy()) 

141 vec1 = np.array(vec1_list) 

142 ix = np.array(index_list) 

143 if isinstance(xk, cp.Variable): 

144 return 1e-3 * cp.vdot(vec1, xk[ix]) 

145 else: 

146 return 1e-3 * np.vdot(vec1, xk[ix]) 

147 

148 

149def cp_obj_loss_batt(model: LinDistBaseMP, xk: cp.Variable, **kwargs) -> cp.Expression: 

150 """ 

151 

152 Parameters 

153 ---------- 

154 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

155 xk : cp.Variable 

156 kwargs : 

157 

158 Returns 

159 ------- 

160 f: cp.Expression 

161 Expression to be minimized 

162 

163 """ 

164 return cp_obj_loss(model, xk) + cp_battery_efficiency(model, xk) 

165 

166 

167def charge_batteries(model, xk, **kwargs) -> cp.Expression: 

168 f_list = [] 

169 for t in range(model.n_steps): 

170 for a in "abc": 

171 if not model.phase_exists(a): 

172 continue 

173 f_list.append(-cp.sum(xk[model.soc_map[t][a].to_numpy()])) 

174 return cp.sum(f_list) 

175 

176 

177def cp_obj_target_p_3ph(model, xk, **kwargs): 

178 f = cp.Constant(0) 

179 target = kwargs["target"] 

180 loss_percent = kwargs.get("loss_percent", np.zeros(3)) 

181 for i, ph in enumerate("abc"): 

182 if model.phase_exists(ph): 

183 p = 0 

184 for out_branch in model.branches_out_of_j("pij", 0, ph): 

185 p = p + xk[out_branch] 

186 f += (target[i] - p * (1 + loss_percent[i] / 100)) ** 2 

187 return f 

188 

189 

190def cp_obj_target_p_total(model, xk, **kwargs): 

191 actual = 0 

192 target = kwargs["target"] 

193 loss_percent = kwargs.get("loss_percent", np.zeros(3)) 

194 for i, ph in enumerate("abc"): 

195 if model.phase_exists(ph): 

196 p = 0 

197 for out_branch in model.branches_out_of_j("pij", 0, ph): 

198 p = p + xk[out_branch] 

199 actual += p 

200 f = (target - actual * (1 + loss_percent[0] / 100)) ** 2 

201 return f 

202 

203 

204def cp_obj_target_q_3ph(model, xk, **kwargs): 

205 target_q = kwargs["target"] 

206 loss_percent = kwargs.get("loss_percent", np.zeros(3)) 

207 f = cp.Constant(0) 

208 for i, ph in enumerate("abc"): 

209 if model.phase_exists(ph): 

210 q = 0 

211 for out_branch in model.branches_out_of_j("qij", 0, ph): 

212 q = q + xk[out_branch] 

213 f += (target_q[i] - q * (1 + loss_percent[i] / 100)) ** 2 

214 return f 

215 

216 

217def cp_obj_target_q_total(model, xk, **kwargs): 

218 actual = 0 

219 target = kwargs["target"] 

220 loss_percent = kwargs.get("loss_percent", np.zeros(3)) 

221 for i, ph in enumerate("abc"): 

222 if model.phase_exists(ph): 

223 q = 0 

224 for out_branch in model.branches_out_of_j("qij", 0, ph): 

225 q = q + xk[out_branch] 

226 actual += q 

227 f = (target - actual * (1 + loss_percent[0] / 100)) ** 2 

228 return f 

229 

230 

231def cp_obj_curtail(model: LinDistBaseMP, xk: cp.Variable, **kwargs) -> cp.Expression: 

232 """ 

233 Objective function to minimize curtailment of DERs. 

234 Min sum((P_der_max - P_der)^2) 

235 Parameters 

236 ---------- 

237 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

238 xk : cp.Variable 

239 

240 Returns 

241 ------- 

242 f: cp.Expression 

243 Expression to be minimized 

244 """ 

245 

246 if "start_step" in model.__dict__.keys(): 

247 start_step = model.start_step 

248 else: 

249 start_step = 0 

250 all_pg_idx = np.array([]) 

251 for t in range(start_step, start_step + model.n_steps): 

252 for a in "abc": 

253 if not model.phase_exists(a): 

254 continue 

255 all_pg_idx = np.r_[all_pg_idx, model.pg_map[t][a].to_numpy()] 

256 all_pg_idx = all_pg_idx.astype(int) 

257 return cp.sum((model.x_max[all_pg_idx] - xk[all_pg_idx]) ** 2) 

258 

259 

260def cp_obj_curtail_lp(model: LinDistBaseMP, xk: cp.Variable, **kwargs) -> cp.Expression: 

261 """ 

262 Objective function to minimize curtailment of DERs. 

263 Min sum((P_der_max - P_der)^2) 

264 Parameters 

265 ---------- 

266 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

267 xk : cp.Variable 

268 

269 Returns 

270 ------- 

271 f: cp.Expression 

272 Expression to be minimized 

273 """ 

274 

275 if "start_step" in model.__dict__.keys(): 

276 start_step = model.start_step 

277 else: 

278 start_step = 0 

279 all_pg_idx = np.array([]) 

280 for t in range(start_step, start_step + model.n_steps): 

281 for a in "abc": 

282 if not model.phase_exists(a): 

283 continue 

284 all_pg_idx = np.r_[all_pg_idx, model.pg_map[t][a].to_numpy()] 

285 all_pg_idx = all_pg_idx.astype(int) 

286 return cp.sum((model.x_max[all_pg_idx] - xk[all_pg_idx])) 

287 

288 

289def cp_obj_none(*args, **kwargs) -> cp.Constant: 

290 """ 

291 For use with cvxpy_solve() to run a power flow with no optimization. 

292 

293 Returns 

294 ------- 

295 constant 0 

296 """ 

297 return cp.Constant(0)