Coverage for src/distopf/matrix_models/multiperiod/base_mp.py: 58%

897 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1from functools import cache 

2import warnings 

3from typing import Optional 

4import networkx as nx 

5import numpy as np 

6from numpy.typing import NDArray 

7import pandas as pd 

8from numpy import sqrt, zeros 

9from scipy.sparse import csr_array, lil_array, vstack # type: ignore 

10import distopf as opf 

11from distopf.utils import ( 

12 handle_branch_input, 

13 handle_bus_input, 

14 handle_gen_input, 

15 handle_cap_input, 

16 handle_reg_input, 

17 handle_bat_input, 

18 handle_schedules_input, 

19 get, 

20) 

21from distopf.importer import Case 

22 

23 

24class BaseModelMP: 

25 """ 

26 LinDistFlow Model base class. 

27 

28 Parameters 

29 ---------- 

30 branch_data : pd.DataFrame 

31 DataFrame containing branch data (r and x values, limits) 

32 bus_data : pd.DataFrame 

33 DataFrame containing bus data (loads, voltages, limits) 

34 gen_data : pd.DataFrame 

35 DataFrame containing generator/DER data 

36 cap_data : pd.DataFrame 

37 DataFrame containing capacitor data 

38 reg_data : pd.DataFrame 

39 DataFrame containing regulator data 

40 bat_data : pd DataFrame 

41 DataFrame containing battery data 

42 loadshape_data : pd.DataFrame 

43 DataFrame containing loadshape multipliers for P values 

44 pv_loadshape_data : pd.DataFrame 

45 DataFrame containing PV profile of 1h interval for 24h 

46 n_steps : int, 

47 Number of time intervals for multi period optimization. Default is 24. 

48 case : Case, 

49 Case object containing all of the parameters. Alternative to listing seperately. 

50 

51 """ 

52 

53 def __init__( 

54 self, 

55 branch_data: Optional[pd.DataFrame] = None, 

56 bus_data: Optional[pd.DataFrame] = None, 

57 gen_data: Optional[pd.DataFrame] = None, 

58 cap_data: Optional[pd.DataFrame] = None, 

59 reg_data: Optional[pd.DataFrame] = None, 

60 bat_data: Optional[pd.DataFrame] = None, 

61 schedules: Optional[pd.DataFrame] = None, 

62 start_step: int = 0, 

63 n_steps: int = 24, 

64 delta_t: float = 1, # hours per step 

65 case: Optional[Case] = None, 

66 ): 

67 # ~~~~~~~~~~~~~~~~~~~~ Load Data Frames ~~~~~~~~~~~~~~~~~~~~ 

68 if case: 

69 self.branch = case.branch_data 

70 self.bus = case.bus_data 

71 self.gen = case.gen_data 

72 self.cap = case.cap_data 

73 self.reg = case.reg_data 

74 self.bat = case.bat_data 

75 self.schedules = case.schedules 

76 self.start_step = case.start_step 

77 self.n_steps = case.n_steps 

78 self.delta_t = case.delta_t # hours per step 

79 else: 

80 self.branch = handle_branch_input(branch_data) 

81 self.bus = handle_bus_input(bus_data) 

82 self.gen = handle_gen_input(gen_data) 

83 self.cap = handle_cap_input(cap_data) 

84 self.reg = handle_reg_input(reg_data) 

85 self.bat = handle_bat_input(bat_data) 

86 self.schedules = handle_schedules_input(schedules) 

87 self.start_step = start_step 

88 self.n_steps = n_steps 

89 self.delta_t = delta_t # hours per step 

90 

91 # ~~~~~~~~~~~~~~~~~~~~ prepare data ~~~~~~~~~~~~~~~~~~~~ 

92 self.nb = len(self.bus.id) 

93 self.r, self.x = self._init_rx(self.branch) 

94 self.swing_bus = self.bus.loc[self.bus.bus_type == "SWING"].index[0] 

95 self.all_buses = { 

96 "a": self.bus.loc[self.bus.phases.str.contains("a")].index.to_numpy(), 

97 "b": self.bus.loc[self.bus.phases.str.contains("b")].index.to_numpy(), 

98 "c": self.bus.loc[self.bus.phases.str.contains("c")].index.to_numpy(), 

99 } 

100 self.load_buses = { 

101 "a": self.all_buses["a"][np.where(self.all_buses["a"] != self.swing_bus)], 

102 "b": self.all_buses["b"][np.where(self.all_buses["b"] != self.swing_bus)], 

103 "c": self.all_buses["c"][np.where(self.all_buses["c"] != self.swing_bus)], 

104 } 

105 self.gen_buses = dict(a=np.array([]), b=np.array([]), c=np.array([])) 

106 if self.gen.shape[0] > 0: 

107 self.gen_buses = { 

108 "a": self.gen.loc[self.gen.phases.str.contains("a")].index.to_numpy(), 

109 "b": self.gen.loc[self.gen.phases.str.contains("b")].index.to_numpy(), 

110 "c": self.gen.loc[self.gen.phases.str.contains("c")].index.to_numpy(), 

111 } 

112 self.n_gens = ( 

113 len(self.gen_buses["a"]) 

114 + len(self.gen_buses["b"]) 

115 + len(self.gen_buses["c"]) 

116 ) 

117 self.cap_buses = dict(a=np.array([]), b=np.array([]), c=np.array([])) 

118 if self.cap.shape[0] > 0: 

119 self.cap_buses = { 

120 "a": self.cap.loc[self.cap.phases.str.contains("a")].index.to_numpy(), 

121 "b": self.cap.loc[self.cap.phases.str.contains("b")].index.to_numpy(), 

122 "c": self.cap.loc[self.cap.phases.str.contains("c")].index.to_numpy(), 

123 } 

124 self.n_caps = ( 

125 len(self.cap_buses["a"]) 

126 + len(self.cap_buses["b"]) 

127 + len(self.cap_buses["c"]) 

128 ) 

129 self.reg_buses = dict(a=np.array([]), b=np.array([]), c=np.array([])) 

130 if self.reg.shape[0] > 0: 

131 self.reg_buses = { 

132 "a": self.reg.loc[self.reg.phases.str.contains("a")].index.to_numpy(), 

133 "b": self.reg.loc[self.reg.phases.str.contains("b")].index.to_numpy(), 

134 "c": self.reg.loc[self.reg.phases.str.contains("c")].index.to_numpy(), 

135 } 

136 self.n_regs = ( 

137 len(self.reg_buses["a"]) 

138 + len(self.reg_buses["b"]) 

139 + len(self.reg_buses["c"]) 

140 ) 

141 self.bat_buses = dict(a=np.array([]), b=np.array([]), c=np.array([])) 

142 if self.bat.shape[0] > 0: 

143 self.bat_buses = { 

144 "a": self.bat.loc[self.bat.phases.str.contains("a")].index.to_numpy(), 

145 "b": self.bat.loc[self.bat.phases.str.contains("b")].index.to_numpy(), 

146 "c": self.bat.loc[self.bat.phases.str.contains("c")].index.to_numpy(), 

147 } 

148 self.n_bats = ( 

149 len(self.bat_buses["a"]) 

150 + len(self.bat_buses["b"]) 

151 + len(self.bat_buses["c"]) 

152 ) 

153 self.controlled_load_buses = { 

154 "a": self.bus.loc[ 

155 self.bus.bus_type.str.contains(opf.PQ_FREE) 

156 ].index.to_numpy(), 

157 "b": self.bus.loc[ 

158 self.bus.bus_type.str.contains(opf.PQ_FREE) 

159 ].index.to_numpy(), 

160 "c": self.bus.loc[ 

161 self.bus.bus_type.str.contains(opf.PQ_FREE) 

162 ].index.to_numpy(), 

163 } 

164 

165 # ~~ initialize index pointers ~~ 

166 self.n_x = 0 

167 self.x_maps: dict[int, dict[str, pd.DataFrame]] = {} 

168 self.v_map: dict[int, dict[str, pd.Series]] = {} 

169 self.pg_map: dict[int, dict[str, pd.Series]] = {} 

170 self.qg_map: dict[int, dict[str, pd.Series]] = {} 

171 self.qc_map: dict[int, dict[str, pd.Series]] = {} 

172 self.pl_map: dict[int, dict[str, pd.Series]] = {} 

173 self.ql_map: dict[int, dict[str, pd.Series]] = {} 

174 self.charge_map: dict[int, dict[str, pd.Series]] = {} 

175 self.discharge_map: dict[int, dict[str, pd.Series]] = {} 

176 self.pb_map: dict[int, dict[str, pd.Series]] = {} 

177 self.qb_map: dict[int, dict[str, pd.Series]] = {} 

178 self.soc_map: dict[int, dict[str, pd.Series]] = {} 

179 self.vx_map: dict[int, dict[str, pd.Series]] = {} 

180 # ~~~~~~~~~~~~~~~~~~~~ initialize Aeq and beq ~~~~~~~~~~~~~~~~~~~~ 

181 # self.a_ineq = csr_array(([0], ([0], [0])), shape=(1, 1)) 

182 # self.b_ineq = zeros(0) 

183 self.a_ub = csr_array([[0]]) 

184 self.b_ub = zeros(0) 

185 self.bounds = zeros(0) 

186 self.bounds_tuple: list[tuple] = [] 

187 self.x_min = zeros(0) 

188 self.x_max = zeros(0) 

189 self.is_built = False 

190 

191 @staticmethod 

192 def _init_rx(branch): 

193 """ 

194 Initializes resistance (`r`) and reactance (`x`) data for network branches 

195 using sparse matrix representations. 

196 

197 Parameters 

198 ---------- 

199 branch : pd.DataFrame 

200 DataFrame containing branch information. 

201 

202 Returns 

203 ------- 

204 r, x : dict 

205 Dictionaries of csr_arrays for resistance and reactance matrices indexed 

206 by phase pairs (e.g., 'aa', 'ab', ...). 

207 """ 

208 row = np.array(np.r_[branch.fb, branch.tb], dtype=int) - 1 

209 col = np.array(np.r_[branch.tb, branch.fb], dtype=int) - 1 

210 r = { 

211 "aa": csr_array((np.r_[branch.raa, branch.raa], (row, col))), 

212 "ab": csr_array((np.r_[branch.rab, branch.rab], (row, col))), 

213 "ac": csr_array((np.r_[branch.rac, branch.rac], (row, col))), 

214 "bb": csr_array((np.r_[branch.rbb, branch.rbb], (row, col))), 

215 "bc": csr_array((np.r_[branch.rbc, branch.rbc], (row, col))), 

216 "cc": csr_array((np.r_[branch.rcc, branch.rcc], (row, col))), 

217 } 

218 x = { 

219 "aa": csr_array((np.r_[branch.xaa, branch.xaa], (row, col))), 

220 "ab": csr_array((np.r_[branch.xab, branch.xab], (row, col))), 

221 "ac": csr_array((np.r_[branch.xac, branch.xac], (row, col))), 

222 "bb": csr_array((np.r_[branch.xbb, branch.xbb], (row, col))), 

223 "bc": csr_array((np.r_[branch.xbc, branch.xbc], (row, col))), 

224 "cc": csr_array((np.r_[branch.xcc, branch.xcc], (row, col))), 

225 } 

226 return r, x 

227 

228 @property 

229 def branch_data(self): 

230 return self.branch 

231 

232 @property 

233 def bus_data(self): 

234 return self.bus 

235 

236 @property 

237 def gen_data(self): 

238 return self.gen 

239 

240 @property 

241 def cap_data(self): 

242 return self.cap 

243 

244 @property 

245 def reg_data(self): 

246 return self.reg 

247 

248 @property 

249 def bat_data(self): 

250 return self.bat 

251 

252 @property 

253 def a_ineq(self): 

254 return self.a_ub 

255 

256 @property 

257 def b_ineq(self): 

258 return self.b_ub 

259 

260 

261class LinDistBaseMP(BaseModelMP): 

262 def initialize_variable_index_pointers(self): 

263 # ~~ initialize index pointers ~~ 

264 self.x_maps = {} 

265 self.v_map = {} 

266 self.pg_map = {} 

267 self.qg_map = {} 

268 self.qc_map = {} 

269 self.charge_map = {} 

270 self.discharge_map = {} 

271 self.pb_map = {} 

272 self.qb_map = {} 

273 self.soc_map = {} 

274 self.vx_map = {} 

275 self.n_x = 0 

276 for t in range(self.start_step, self.start_step + self.n_steps): 

277 self.x_maps[t], self.n_x = self._variable_tables(self.branch, n_x=self.n_x) 

278 self.v_map[t], self.n_x = self._add_device_variables( 

279 self.n_x, self.all_buses 

280 ) 

281 self.pg_map[t], self.n_x = self._add_device_variables( 

282 self.n_x, self.gen_buses 

283 ) 

284 self.qg_map[t], self.n_x = self._add_device_variables( 

285 self.n_x, self.gen_buses 

286 ) 

287 self.qc_map[t], self.n_x = self._add_device_variables( 

288 self.n_x, self.cap_buses 

289 ) 

290 self.charge_map[t], self.n_x = self._add_device_variables( 

291 self.n_x, self.bat_buses 

292 ) 

293 self.discharge_map[t], self.n_x = self._add_device_variables( 

294 self.n_x, self.bat_buses 

295 ) 

296 self.pb_map[t], self.n_x = self._add_device_variables( 

297 self.n_x, self.bat_buses 

298 ) 

299 self.qb_map[t], self.n_x = self._add_device_variables( 

300 self.n_x, self.bat_buses 

301 ) 

302 self.soc_map[t], self.n_x = self._add_device_variables_no_phases( 

303 self.n_x, self.bat_buses 

304 ) 

305 self.vx_map[t], self.n_x = self._add_device_variables( 

306 self.n_x, self.reg_buses 

307 ) 

308 

309 def build(self): 

310 self.initialize_variable_index_pointers() 

311 self.a_eq, self.b_eq = self.create_model() 

312 self.a_ub, self.b_ub = self.create_inequality_constraints() 

313 self.bounds = self.init_bounds() 

314 self.bounds_tuple = list(map(tuple, self.bounds)) 

315 self.x_min = self.bounds[:, 0] 

316 self.x_max = self.bounds[:, 1] 

317 self.is_built = True 

318 

319 @staticmethod 

320 def _variable_tables( 

321 branch: pd.DataFrame, n_x: int = 0 

322 ) -> tuple[dict[str, pd.DataFrame], int]: 

323 """ 

324 Constructs tables to map branch power variables to indices within the 

325 optimization variable vector. 

326 

327 Parameters 

328 ---------- 

329 branch : pd.DataFrame 

330 DataFrame containing branch information. 

331 n_x : int 

332 Initial variable index; used to offset additional device variables. 

333 

334 Returns 

335 ------- 

336 x_maps : dict 

337 Dictionary of DataFrames mapping branch indices to variable indices. 

338 n_x : int 

339 Updated variable index after accounting for new variables. 

340 """ 

341 x_maps = {} 

342 for a in "abc": 

343 indices = branch.phases.str.contains(a) 

344 lines = branch.loc[indices, ["fb", "tb"]].values.astype(int) - 1 

345 n_lines = len(lines) 

346 df = pd.DataFrame(columns=["bi", "bj", "pij", "qij"], index=range(n_lines)) 

347 if n_lines == 0: 

348 x_maps[a] = df.astype(int) 

349 continue 

350 g: nx.Graph = nx.Graph() 

351 g.add_edges_from(lines) 

352 i_root = list(set(lines[:, 0]) - set(lines[:, 1]))[ 

353 0 

354 ] # root node is only node with no from-bus 

355 edges = np.array(list(nx.dfs_edges(g, source=i_root))) 

356 df["bi"] = edges[:, 0] 

357 df["bj"] = edges[:, 1] 

358 df["pij"] = np.array([i for i in range(n_x, n_x + n_lines)]) 

359 n_x = n_x + n_lines 

360 df["qij"] = np.array([i for i in range(n_x, n_x + n_lines)]) 

361 n_x = n_x + n_lines 

362 x_maps[a] = df.astype(int) 

363 return x_maps, n_x 

364 

365 @staticmethod 

366 def _add_device_variables( 

367 n_x: int, device_buses: dict 

368 ) -> tuple[dict[str, pd.Series], int]: 

369 """ 

370 Adds device-related variables (e.g., voltage, power load) to the 

371 optimization problem, indexed by the phase. 

372 

373 Parameters 

374 ---------- 

375 n_x : int 

376 Starting offset for variable indices. 

377 device_buses : dict 

378 Dictionary containing bus indices for devices, categorized by phase. 

379 

380 Returns 

381 ------- 

382 device_maps : dict 

383 Mapping of variable indices to bus indices for each device. 

384 n_x : int 

385 Updated offset after adding device variables. 

386 """ 

387 n_a = len(device_buses["a"]) 

388 n_b = len(device_buses["b"]) 

389 n_c = len(device_buses["c"]) 

390 device_maps = { 

391 "a": pd.Series(range(n_x, n_x + n_a), index=device_buses["a"]), 

392 "b": pd.Series(range(n_x + n_a, n_x + n_a + n_b), index=device_buses["b"]), 

393 "c": pd.Series( 

394 range(n_x + n_a + n_b, n_x + n_a + n_b + n_c), index=device_buses["c"] 

395 ), 

396 } 

397 n_x = n_x + n_a + n_b + n_c 

398 return device_maps, n_x 

399 

400 @staticmethod 

401 def _add_device_variables_no_phases(n_x: int, device_buses: dict): 

402 buses = np.unique( 

403 np.r_[device_buses["a"], device_buses["b"], device_buses["c"]] 

404 ) 

405 n = len(buses) 

406 all_map = pd.Series(range(n_x, n_x + n), index=buses) 

407 device_maps = { 

408 "a": all_map.loc[device_buses["a"]], 

409 "b": all_map.loc[device_buses["b"]], 

410 "c": all_map.loc[device_buses["c"]], 

411 } 

412 n_x = n_x + n 

413 return device_maps, n_x 

414 

415 def init_bounds(self) -> NDArray: 

416 """ 

417 Initializes the variable bounds for the optimization problem. 

418 

419 Returns 

420 ------- 

421 bounds : np.ndarray 

422 Array of upper and lower bounds for each variable. 

423 """ 

424 default = 100e3 # Default for unbounded variables. 

425 # ~~~~~~~~~~ x limits ~~~~~~~~~~ 

426 x_lim_lower = np.ones(self.n_x) * -default 

427 x_lim_upper = np.ones(self.n_x) * default 

428 for t in range(self.start_step, self.start_step + self.n_steps): 

429 x_lim_lower, x_lim_upper = self.add_voltage_limits( 

430 x_lim_lower, x_lim_upper, t=t 

431 ) 

432 x_lim_lower, x_lim_upper = self.add_generator_limits( 

433 x_lim_lower, x_lim_upper, t=t 

434 ) 

435 x_lim_lower, x_lim_upper = self.add_battery_discharging_limits( 

436 x_lim_lower, x_lim_upper, t=t 

437 ) 

438 x_lim_lower, x_lim_upper = self.add_battery_charging_limits( 

439 x_lim_lower, x_lim_upper, t=t 

440 ) 

441 x_lim_lower, x_lim_upper = self.add_battery_soc_limits( 

442 x_lim_lower, x_lim_upper, t=t 

443 ) 

444 x_lim_lower, x_lim_upper = self.additional_limits( 

445 x_lim_lower, x_lim_upper, t=t 

446 ) 

447 bounds = np.c_[x_lim_lower, x_lim_upper] 

448 return bounds 

449 

450 def additional_limits( 

451 self, x_lim_lower: NDArray, x_lim_upper: NDArray, t: int = 0 

452 ) -> tuple[NDArray, NDArray]: 

453 """ 

454 User added limits function. Override this function to add custom variable limits. 

455 Parameters 

456 ---------- 

457 x_lim_lower : 

458 x_lim_upper : 

459 

460 Returns 

461 ------- 

462 x_lim_lower : lower limits for x-vector 

463 x_lim_upper : upper limits for x-vector 

464 

465 Examples 

466 -------- 

467 ```python 

468 p_lim = 10 

469 q_lim = 10 

470 for a in "abc": 

471 if not self.phase_exists(a): 

472 continue 

473 x_lim_lower[self.x_maps[a].pij] = -p_lim 

474 x_lim_upper[self.x_maps[a].pij] = p_lim 

475 x_lim_lower[self.x_maps[a].qij] = -q_lim 

476 x_lim_upper[self.x_maps[a].qij] = q_lim 

477 ``` 

478 """ 

479 if t < self.start_step: 

480 t = self.start_step 

481 return x_lim_lower, x_lim_upper 

482 

483 def add_voltage_limits( 

484 self, x_lim_lower: np.ndarray, x_lim_upper: np.ndarray, t: int = 0 

485 ) -> tuple[np.ndarray, np.ndarray]: 

486 if t < self.start_step: 

487 t = self.start_step 

488 for a in "abc": 

489 if not self.phase_exists(a): 

490 continue 

491 # ~~ v limits ~~: 

492 x_lim_upper[self.v_map[t][a]] = ( 

493 self.bus.loc[self.v_map[t][a].index, "v_max"] ** 2 

494 ) 

495 x_lim_lower[self.v_map[t][a]] = ( 

496 self.bus.loc[self.v_map[t][a].index, "v_min"] ** 2 

497 ) 

498 return x_lim_lower, x_lim_upper 

499 

500 def add_generator_limits( 

501 self, x_lim_lower: np.ndarray, x_lim_upper: np.ndarray, t: int = 0 

502 ) -> tuple[np.ndarray, np.ndarray]: 

503 if t < self.start_step: 

504 t = self.start_step 

505 gen_mult = 1 

506 if "PV" in self.schedules.columns: 

507 gen_mult = self.schedules.PV[t] 

508 for a in "abc": 

509 if not self.phase_exists(a): 

510 continue 

511 s_rated = self.gen[f"s{a}_max"] 

512 p_out = self.gen[f"p{a}"] * gen_mult 

513 q_max = ((s_rated**2) - ((p_out) ** 2)) ** (1 / 2) 

514 q_min = -q_max 

515 q_max_manual = self.gen.get(f"q{a}_max", np.ones_like(q_min) * 100e3) 

516 q_min_manual = self.gen.get(f"q{a}_min", np.ones_like(q_min) * -100e3) 

517 for j in self.gen_buses[a]: 

518 mode = self.gen.loc[j, "control_variable"] 

519 pg = self.idx("pg", j, a, t) 

520 qg = self.idx("qg", j, a, t) 

521 # active power bounds 

522 x_lim_lower[pg] = 0 

523 x_lim_upper[pg] = p_out[j] 

524 # reactive power bounds 

525 if mode == opf.CONSTANT_P: 

526 x_lim_lower[qg] = max(q_min[j], q_min_manual[j]) 

527 x_lim_upper[qg] = min(q_max[j], q_max_manual[j]) 

528 if mode != opf.CONSTANT_P: 

529 # reactive power bounds 

530 x_lim_lower[qg] = max(-s_rated[j], q_min_manual[j]) 

531 x_lim_upper[qg] = min(s_rated[j], q_max_manual[j]) 

532 return x_lim_lower, x_lim_upper 

533 

534 def add_battery_discharging_limits( 

535 self, x_lim_lower: np.ndarray, x_lim_upper: np.ndarray, t: int = 0 

536 ) -> tuple[np.ndarray, np.ndarray]: 

537 if t < self.start_step: 

538 t = self.start_step 

539 phases = self.bat.loc[:, "phases"] 

540 n_phases = np.array([len(ph) for ph in phases]) 

541 for a in "abc": 

542 if not self.phase_exists(a): 

543 continue 

544 x_lim_lower[self.discharge_map[t][a]] = 0 

545 x_lim_upper[self.discharge_map[t][a]] = ( 

546 self.bat.loc[self.discharge_map[t][a].index, "s_max"] / n_phases 

547 ) 

548 return x_lim_lower, x_lim_upper 

549 

550 def add_battery_charging_limits( 

551 self, x_lim_lower: np.ndarray, x_lim_upper: np.ndarray, t: int = 0 

552 ) -> tuple[np.ndarray, np.ndarray]: 

553 if t < self.start_step: 

554 t = self.start_step 

555 phases = self.bat.loc[:, "phases"] 

556 n_phases = np.array([len(ph) for ph in phases]) 

557 for a in "abc": 

558 if not self.phase_exists(a): 

559 continue 

560 x_lim_lower[self.charge_map[t][a]] = 0 

561 x_lim_upper[self.charge_map[t][a]] = ( 

562 self.bat.loc[self.charge_map[t][a].index, "s_max"] / n_phases 

563 ) 

564 return x_lim_lower, x_lim_upper 

565 

566 def add_battery_soc_limits( 

567 self, x_lim_lower: np.ndarray, x_lim_upper: np.ndarray, t: int = 0 

568 ) -> tuple[np.ndarray, np.ndarray]: 

569 if t < self.start_step: 

570 t = self.start_step 

571 for a in "abc": 

572 if not self.phase_exists(a): 

573 continue 

574 max_soc = self.bat.loc[self.soc_map[t][a].index, "max_soc"] 

575 min_soc = self.bat.loc[self.soc_map[t][a].index, "min_soc"] 

576 max_soc_energy = ( 

577 self.bat.loc[self.soc_map[t][a].index, "energy_capacity"] * max_soc 

578 ) 

579 min_soc_energy = ( 

580 self.bat.loc[self.soc_map[t][a].index, "energy_capacity"] * min_soc 

581 ) 

582 x_lim_upper[self.soc_map[t][a]] = max_soc_energy 

583 x_lim_lower[self.soc_map[t][a]] = min_soc_energy 

584 return x_lim_lower, x_lim_upper 

585 

586 @cache 

587 def branch_into_j(self, var: str, j: int, phase: str, t: int = 0) -> list[int]: 

588 if t < self.start_step: 

589 t = self.start_step 

590 idx = self.x_maps[t][phase].loc[self.x_maps[t][phase].bj == j, var].to_numpy() 

591 return idx[~np.isnan(idx)].astype(int).tolist() 

592 

593 @cache 

594 def branches_out_of_j(self, var: str, j: int, phase: str, t: int = 0) -> list[int]: 

595 if t < self.start_step: 

596 t = self.start_step 

597 idx = self.x_maps[t][phase].loc[self.x_maps[t][phase].bi == j, var].to_numpy() 

598 return idx[~np.isnan(idx)].astype(int).tolist() 

599 

600 @cache 

601 def idx(self, var: str, node_j: int, phase: str, t: int = 0) -> list[int] | int: 

602 if t < self.start_step: 

603 t = self.start_step 

604 if t in self.x_maps.keys() and var in self.x_maps[t][phase].columns: 

605 return self.branch_into_j(var, node_j, phase, t=t) 

606 if var in ["pjk"]: # indexes of all branch active power out of node j 

607 return self.branches_out_of_j("pij", node_j, phase, t=t) 

608 if var in ["qjk"]: # indexes of all branch reactive power out of node j 

609 return self.branches_out_of_j("qij", node_j, phase, t=t) 

610 if var in ["v"]: # active power generation at node 

611 return self.v_map.get(t, {}).get(phase, pd.Series()).get(node_j, []) 

612 if var in ["pg", "p_gen"]: # active power generation at node 

613 return self.pg_map.get(t, {}).get(phase, pd.Series()).get(node_j, []) 

614 if var in ["qg", "q_gen"]: # reactive power generation at node 

615 return self.qg_map.get(t, {}).get(phase, pd.Series()).get(node_j, []) 

616 if var in ["qc", "q_cap"]: # reactive power injection by capacitor 

617 return self.qc_map.get(t, {}).get(phase, pd.Series()).get(node_j, []) 

618 if var in ["ch", "charge"]: 

619 return self.charge_map.get(t, {}).get(phase, pd.Series()).get(node_j, []) 

620 if var in ["dis", "discharge"]: 

621 return self.discharge_map.get(t, {}).get(phase, pd.Series()).get(node_j, []) 

622 if var in ["pb"]: # active power injection by battery 

623 return self.pb_map.get(t, {}).get(phase, pd.Series()).get(node_j, []) 

624 if var in ["qb"]: # reactive power injection by battery 

625 return self.qb_map.get(t, {}).get(phase, pd.Series()).get(node_j, []) 

626 if var in ["soc"]: 

627 return self.soc_map.get(t, {}).get(phase, pd.Series()).get(node_j, []) 

628 if var in ["vx"]: 

629 return self.vx_map.get(t, {}).get(phase, pd.Series()).get(node_j, []) 

630 ix = self.additional_variable_idx(var, node_j, phase, t=t) 

631 if ix is not None: 

632 return ix 

633 raise ValueError(f"Variable name, '{var}', not found.") 

634 

635 def additional_variable_idx(self, var: str, node_j: int, phase: str, t: int = 0): 

636 """ 

637 User added index function. Override this function to add custom variables. Return None if `var` is not found. 

638 Parameters 

639 ---------- 

640 var : name of variable 

641 node_j : node index (0 based; bus.id - 1) 

642 phase : "a", "b", or "c" 

643 t : integer time step >=0 (default: 0) 

644 

645 Returns 

646 ------- 

647 ix : index or list of indices of variable within x-vector or None if `var` is not found. 

648 """ 

649 if t < self.start_step: 

650 t = self.start_step 

651 return None 

652 

653 @cache 

654 def phase_exists(self, phase: str, index: Optional[int] = None, t: int = 0) -> bool: 

655 if t < self.start_step: 

656 t = self.start_step 

657 if index is None: 

658 return self.x_maps[t][phase].shape[0] > 0 

659 return len(self.idx("bj", index, phase, t=t)) > 0 

660 

661 def create_model(self) -> tuple[csr_array, np.ndarray]: 

662 """ 

663 Constructs the equality constraint matrices for the linear optimization 

664 problem based on power flow equations. 

665 a_eq*x == b_eq 

666 

667 Returns 

668 ------- 

669 a_eq : csr_array 

670 Sparse array representing the equality constraint matrix. 

671 b_eq : np.ndarray 

672 Array representing the equality constraint vector. 

673 """ 

674 # ########## Aeq and Beq Formation ########### 

675 n_rows = self.n_x 

676 n_cols = self.n_x 

677 # Aeq has the same number of rows as equations with a column for each x 

678 a_eq = lil_array((n_rows, n_cols)) 

679 b_eq = zeros(n_rows) 

680 for t in range(self.start_step, self.start_step + self.n_steps): 

681 for j in range(1, self.nb): 

682 for ph in ["abc", "bca", "cab"]: 

683 a, b, c = ph[0], ph[1], ph[2] 

684 if not self.phase_exists(a, j): 

685 continue 

686 a_eq, b_eq = self.add_power_flow_model(a_eq, b_eq, j, a, t=t) 

687 a_eq, b_eq = self.add_voltage_drop_model( 

688 a_eq, b_eq, j, a, b, c, t=t 

689 ) 

690 a_eq, b_eq = self.add_swing_voltage_model(a_eq, b_eq, j, a, t=t) 

691 a_eq, b_eq = self.add_regulator_model(a_eq, b_eq, j, a, t=t) 

692 a_eq, b_eq = self.add_load_model(a_eq, b_eq, j, a, t=t) 

693 a_eq, b_eq = self.add_generator_model(a_eq, b_eq, j, a, t=t) 

694 a_eq, b_eq = self.add_capacitor_model(a_eq, b_eq, j, a, t=t) 

695 a_eq, b_eq = self.add_battery_model(a_eq, b_eq, j, a, b, c, t=t) 

696 return csr_array(a_eq), b_eq 

697 

698 def add_power_flow_model( 

699 self, a_eq: lil_array, b_eq, j, phase, t=0 

700 ) -> tuple[lil_array, np.ndarray]: 

701 if t < self.start_step: 

702 t = self.start_step 

703 pij = self.idx("pij", j, phase, t=t) 

704 qij = self.idx("qij", j, phase, t=t) 

705 pjk = self.idx("pjk", j, phase, t=t) 

706 qjk = self.idx("qjk", j, phase, t=t) 

707 # Set P equation variable coefficients in a_eq 

708 a_eq[pij, pij] = 1 

709 a_eq[pij, pjk] = -1 

710 # Set Q equation variable coefficients in a_eq 

711 a_eq[qij, qij] = 1 

712 a_eq[qij, qjk] = -1 

713 # Other power flow variables added to this equation in other methods 

714 return a_eq, b_eq 

715 

716 def add_voltage_drop_model( 

717 self, a_eq: lil_array, b_eq, j, a, b, c, t=0 

718 ) -> tuple[lil_array, np.ndarray]: 

719 if t < self.start_step: 

720 t = self.start_step 

721 if self.reg is not None: 

722 if j in self.reg.tb: 

723 return a_eq, b_eq 

724 r, x = self.r, self.x 

725 aa = "".join(sorted(a + a)) 

726 # if ph=='cab', then a+b=='ca'. Sort so ab=='ac' 

727 ab = "".join(sorted(a + b)) 

728 ac = "".join(sorted(a + c)) 

729 i = self.idx("bi", j, a, t=t)[0] # get the upstream node, i 

730 pij = self.idx("pij", j, a, t=t) 

731 qij = self.idx("qij", j, a, t=t) 

732 pijb = self.idx("pij", j, b, t=t) 

733 qijb = self.idx("qij", j, b, t=t) 

734 pijc = self.idx("pij", j, c, t=t) 

735 qijc = self.idx("qij", j, c, t=t) 

736 vi = self.idx("v", i, a, t=t) 

737 vj = self.idx("v", j, a, t=t) 

738 # Set V equation variable coefficients in a_eq and constants in b_eq 

739 a_eq[vj, vj] = 1 

740 a_eq[vj, vi] = -1 

741 a_eq[vj, pij] = 2 * r[aa][i, j] 

742 a_eq[vj, qij] = 2 * x[aa][i, j] 

743 if self.phase_exists(b, j): 

744 a_eq[vj, pijb] = -r[ab][i, j] + sqrt(3) * x[ab][i, j] 

745 a_eq[vj, qijb] = -x[ab][i, j] - sqrt(3) * r[ab][i, j] 

746 if self.phase_exists(c, j): 

747 a_eq[vj, pijc] = -r[ac][i, j] - sqrt(3) * x[ac][i, j] 

748 a_eq[vj, qijc] = -x[ac][i, j] + sqrt(3) * r[ac][i, j] 

749 return a_eq, b_eq 

750 

751 def add_regulator_model( 

752 self, a_eq: lil_array, b_eq, j, a, t=0 

753 ) -> tuple[lil_array, np.ndarray]: 

754 if t < self.start_step: 

755 t = self.start_step 

756 if self.reg is None: 

757 return a_eq, b_eq 

758 if j not in self.reg.tb: 

759 return a_eq, b_eq 

760 i = self.idx("bi", j, a, t=t)[0] # get the upstream node, i 

761 pij = self.idx("pij", j, a, t=t) 

762 qij = self.idx("qij", j, a, t=t) 

763 vi = self.idx("v", i, a, t=t) 

764 vj = self.idx("v", j, a, t=t) 

765 vx = self.idx("vx", j, a) 

766 r, x = self.r, self.x 

767 aa = "".join(sorted(a + a)) 

768 

769 a_eq[vj, vj] = 1 

770 a_eq[vj, vx] = -1 

771 a_eq[vj, pij] = 2 * r[aa][i, j] 

772 a_eq[vj, qij] = 2 * x[aa][i, j] 

773 

774 reg_ratio = get(self.reg[f"ratio_{a}"], j, 1) 

775 a_eq[vx, vx] = 1 

776 a_eq[vx, vi] = -1 * reg_ratio**2 

777 return a_eq, b_eq 

778 

779 def add_swing_voltage_model( 

780 self, a_eq: lil_array, b_eq: np.ndarray, j: int, a: str, t: int = 0 

781 ) -> tuple[lil_array, np.ndarray]: 

782 if t < self.start_step: 

783 t = self.start_step 

784 i = self.idx("bi", j, a, t=t)[0] # get the upstream node 

785 vi = self.idx("v", i, a, t=t) 

786 # Set V equation variable coefficients in a_eq and constants in b_eq 

787 if self.bus.bus_type[i] == opf.SWING_BUS: # Swing bus 

788 v_source = float(self.bus.at[i, f"v_{a}"]) 

789 if f"v_{a}" in self.schedules: 

790 v_source = float(self.schedules.at[t, f"v_{a}"]) 

791 a_eq[vi, vi] = 1 

792 b_eq[vi] = v_source**2 

793 return a_eq, b_eq 

794 

795 def add_generator_model( 

796 self, a_eq: lil_array, b_eq, j, a, t=0 

797 ) -> tuple[lil_array, np.ndarray]: 

798 if j not in self.gen.index: 

799 return a_eq, b_eq 

800 if t < self.start_step: 

801 t = self.start_step 

802 p_gen_nom, q_gen_nom = 0, 0 

803 pv_mult = 1 

804 if "PV" in self.schedules.columns: 

805 pv_mult = self.schedules.PV[t] 

806 if self.gen is not None: 

807 p_gen_nom = get(self.gen[f"p{a}"], j, 0) 

808 q_gen_nom = get(self.gen[f"q{a}"], j, 0) 

809 # equation indexes 

810 pij = self.idx("pij", j, a, t=t) 

811 qij = self.idx("qij", j, a, t=t) 

812 pg = self.idx("pg", j, a, t=t) 

813 qg = self.idx("qg", j, a, t=t) 

814 # Set Generator equation variable coefficients in a_eq 

815 a_eq[pij, pg] = 1 

816 a_eq[qij, qg] = 1 

817 if get(self.gen["control_variable"], j, 0) in [opf.CONSTANT_PQ, opf.CONSTANT_P]: 

818 a_eq[pg, pg] = 1 

819 b_eq[pg] = p_gen_nom * pv_mult 

820 if get(self.gen["control_variable"], j, 0) in [opf.CONSTANT_PQ, opf.CONSTANT_Q]: 

821 a_eq[qg, qg] = 1 

822 b_eq[qg] = q_gen_nom 

823 return a_eq, b_eq 

824 

825 def add_load_model( 

826 self, a_eq: lil_array, b_eq, j, a, t=0 

827 ) -> tuple[lil_array, np.ndarray]: 

828 if t < self.start_step: 

829 t = self.start_step 

830 pij = self.idx("pij", j, a, t=t) 

831 qij = self.idx("qij", j, a, t=t) 

832 vj = self.idx("v", j, a, t=t) 

833 p_load_nom, q_load_nom = 0, 0 

834 load_mult_p = load_mult_q = 1 

835 load_shape = self.bus.load_shape[j] 

836 if load_shape in self.schedules.columns: 

837 load_mult_p = load_mult_q = self.schedules.at[t, load_shape] 

838 elif f"{load_shape}.{a}.p" in self.schedules.columns: 

839 load_mult_p = self.schedules.at[t, f"{load_shape}.{a}.p"] 

840 load_mult_q = self.schedules.at[t, f"{load_shape}.{a}.q"] 

841 if self.bus.bus_type[j] == opf.PQ_BUS: 

842 p_load_nom = self.bus[f"pl_{a}"][j] * load_mult_p 

843 q_load_nom = self.bus[f"ql_{a}"][j] * load_mult_q 

844 # boundary p and q 

845 if self.bus.bus_type[j] != opf.PQ_FREE: 

846 # Set Load equation variable coefficients in a_eq 

847 a_eq[pij, vj] = -(self.bus.cvr_p[j] / 2) * p_load_nom 

848 b_eq[pij] = (1 - (self.bus.cvr_p[j] / 2)) * p_load_nom 

849 a_eq[qij, vj] = -(self.bus.cvr_q[j] / 2) * q_load_nom 

850 b_eq[qij] = (1 - (self.bus.cvr_q[j] / 2)) * q_load_nom 

851 return a_eq, b_eq 

852 

853 def add_capacitor_model( 

854 self, a_eq: lil_array, b_eq, j, a, t=0 

855 ) -> tuple[lil_array, np.ndarray]: 

856 if t < self.start_step: 

857 t = self.start_step 

858 q_cap_nom = 0 

859 if self.cap is not None: 

860 q_cap_nom = get(self.cap[f"q{a}"], j, 0) 

861 # equation indexes 

862 qij = self.idx("qij", j, a, t=t) 

863 vj = self.idx("v", j, a, t=t) 

864 qc = self.idx("q_cap", j, a, t=t) 

865 a_eq[qij, qc] = 1 # add capacitor q variable to power flow equation 

866 a_eq[qc, qc] = 1 

867 a_eq[qc, vj] = -q_cap_nom 

868 return a_eq, b_eq 

869 

870 def add_battery_model( 

871 self, 

872 a_eq: lil_array, 

873 b_eq: np.ndarray, 

874 j: int, 

875 a: str, 

876 b: str, 

877 c: str, 

878 t: int = 0, 

879 ) -> tuple[lil_array, np.ndarray]: 

880 if j not in self.bat.index: 

881 return a_eq, b_eq 

882 if t < self.start_step: 

883 t = self.start_step 

884 pij = self.idx("pij", j, a, t=t) 

885 qij = self.idx("qij", j, a, t=t) 

886 energy = self.idx("soc", j, a, t=t) 

887 p_dis_a = self.idx("discharge", j, a, t=t) 

888 p_cha_a = self.idx("charge", j, a, t=t) 

889 pb = self.idx("pb", j, a, t=t) 

890 qb = self.idx("qb", j, a, t=t) 

891 

892 eta_c = self.bat["charge_efficiency"].get(j, 1) 

893 eta_d = self.bat["discharge_efficiency"].get(j, 1) 

894 control_variable = self.bat["control_variable"].get(j, "P") 

895 energy_capacity = self.bat["energy_capacity"].get(j, 0) 

896 start_soc = self.bat["start_soc"].get(j, 0.5) 

897 energy0 = energy_capacity * start_soc 

898 

899 a_eq[pij, pb] = 1 

900 a_eq[qij, qb] = 1 

901 if "Q" not in control_variable.upper(): 

902 a_eq[qb, qb] = 1 

903 b_eq[qb] = 0 

904 a_eq[energy, p_dis_a] = 1 / eta_d * self.delta_t 

905 a_eq[energy, p_cha_a] = -eta_c * self.delta_t 

906 a_eq[energy, energy] = 1 

907 # pb = p_discharge - p_charge 

908 a_eq[p_dis_a, pb] = 1 

909 a_eq[p_dis_a, p_dis_a] = -1 

910 a_eq[p_dis_a, p_cha_a] = 1 

911 # force each phase equal 

912 if self.phase_exists(b, j) and self.phase_exists(c, j): 

913 # soc_b = self.idx("soc", j, b, t=t) 

914 qb_b = self.idx("qb", j, b, t=t) 

915 pb_b = self.idx("pb", j, b, t=t) 

916 # a_eq[p_dis_a, soc_a] = 1 

917 # a_eq[p_dis_a, soc_b] = -1 

918 a_eq[pb, pb] = 1 

919 a_eq[pb, pb_b] = -1 

920 

921 if "Q" in control_variable.upper(): 

922 a_eq[qb, qb] = 1 

923 a_eq[qb, qb_b] = -1 

924 

925 if t == self.start_step: 

926 b_eq[energy] = energy0 

927 else: 

928 energy_prev = self.idx("soc", j, a, t=t - 1) 

929 a_eq[energy, energy_prev] = -1 

930 return a_eq, b_eq 

931 

932 def create_battery_cycle_limit_constraints(self) -> tuple[csr_array, np.ndarray]: 

933 # ########## Aineq and Bineq Formation ########### 

934 n_inequalities = 1 

935 n_rows_ineq = n_inequalities * self.n_bats 

936 n_rows_ineq = max(n_rows_ineq, 1) 

937 a_ineq = lil_array((n_rows_ineq, self.n_x)) 

938 b_ineq = zeros(n_rows_ineq) 

939 eq_idx = 0 

940 for j in self.bat.index: 

941 for a in "abc": 

942 if not self.phase_exists(a, j): 

943 continue 

944 annual_cycle_limit = self.bat["annual_cycle_limit"].get(j, 365) 

945 daily_cycle_limit = annual_cycle_limit / 365 

946 total_hours = self.n_steps * self.delta_t 

947 c_max = total_hours / 24 * daily_cycle_limit 

948 max_soc_energy = self.bat["energy_capacity"].get(j, 0) 

949 b_ineq[eq_idx] = c_max * max_soc_energy 

950 for t in range(self.start_step, self.start_step + self.n_steps): 

951 eta_d = self.bat["discharge_efficiency"].get(j, 1) 

952 p_dis = self.idx("discharge", j, a, t=t) 

953 a_ineq[eq_idx, p_dis] = self.delta_t / eta_d 

954 eq_idx += 1 

955 return csr_array(a_ineq), b_ineq 

956 

957 def create_inequality_constraints(self) -> tuple[csr_array, np.ndarray]: 

958 """ 

959 Constructs the inequality constraint matrices. 

960 a_ub*x <= b_ub 

961 

962 Returns 

963 ------- 

964 a_ub, : csr_array 

965 Sparse array representing the inequality constraint matrix. 

966 b_ub : np.ndarray 

967 Array representing the inequality constraint vector. 

968 """ 

969 # a_bat, b_bat = self.create_battery_cycle_limit_constraints() 

970 a_inv, b_inv = self.create_inverter_octagon_constraints() 

971 # a_therm, b_therm = self.create_octagon_thermal_constraints() 

972 a_bat8, b_bat8 = self.create_octagon_battery_constraints() 

973 a_ub = vstack([a_inv, a_bat8]) # a_therm, a_bat 

974 b_ub = np.r_[b_inv, b_bat8] # b_therm, b_bat 

975 return csr_array(a_ub), b_ub 

976 

977 def create_hexagon_constraints(self) -> tuple[csr_array, np.ndarray]: 

978 """ 

979 Use a hexagon to approximate the circular inequality constraint of an inverter. 

980 """ 

981 n_inequalities = 5 

982 n_rows_ineq = ( 

983 n_inequalities 

984 * (len(np.where(self.gen.control_variable == opf.CONTROL_PQ)[0]) * 3) 

985 * self.n_steps 

986 ) 

987 n_rows_ineq = max(n_rows_ineq, 1) 

988 a_ineq = lil_array((n_rows_ineq, self.n_x)) 

989 b_ineq = zeros(n_rows_ineq) 

990 ineq = list(range(n_inequalities)) # initialize equation indices 

991 for t in range(self.start_step, self.start_step + self.n_steps): 

992 for j in self.gen.index: 

993 for a in "abc": 

994 if not self.phase_exists(a, j): 

995 continue 

996 if self.gen.loc[j, "control_variable"] != opf.CONTROL_PQ: 

997 continue 

998 pg = self.idx("pg", j, a, t=t) 

999 qg = self.idx("qg", j, a, t=t) 

1000 s_rated = float(self.gen.at[j, f"s{a}_max"]) 

1001 coef = sqrt(3) / 3 # ~=0.5774 

1002 # Right half plane. Positive P 

1003 # limit for small +P and large +Q 

1004 a_ineq[ineq[0], pg] = 0 

1005 a_ineq[ineq[0], qg] = 2 * coef 

1006 b_ineq[ineq[0]] = s_rated 

1007 # limit for large +P and small +Q 

1008 a_ineq[ineq[1], pg] = 1 

1009 a_ineq[ineq[1], qg] = coef 

1010 b_ineq[ineq[1]] = s_rated 

1011 # limit for large +P and small -Q 

1012 a_ineq[ineq[2], pg] = 1 

1013 a_ineq[ineq[2], qg] = -coef 

1014 b_ineq[ineq[2]] = s_rated 

1015 # limit for small +P and large -Q 

1016 a_ineq[ineq[3], pg] = 0 

1017 a_ineq[ineq[3], qg] = -2 * coef 

1018 b_ineq[ineq[3]] = s_rated 

1019 # limit to right half plane 

1020 a_ineq[ineq[4], pg] = -1 

1021 b_ineq[ineq[4]] = 0 

1022 # increment equation indices 

1023 for n_ineq in range(len(ineq)): 

1024 ineq[n_ineq] += len(ineq) 

1025 return csr_array(a_ineq), b_ineq 

1026 

1027 def create_inverter_octagon_constraints(self) -> tuple[csr_array, np.ndarray]: 

1028 """ 

1029 Use an octagon to approximate the circular inequality constraint of an inverter. 

1030 """ 

1031 n_inequalities = 5 

1032 n_rows_ineq = ( 

1033 n_inequalities 

1034 * (len(np.where(self.gen.control_variable == opf.CONTROL_PQ)[0]) * 3) 

1035 * self.n_steps 

1036 ) 

1037 n_rows_ineq = max(n_rows_ineq, 1) 

1038 a_ineq = lil_array((n_rows_ineq, self.n_x)) 

1039 b_ineq = zeros(n_rows_ineq) 

1040 ineq = list(range(n_inequalities)) # initialize equation indices 

1041 for t in range(self.start_step, self.start_step + self.n_steps): 

1042 for j in self.gen.index: 

1043 for a in "abc": 

1044 if not self.phase_exists(a, j): 

1045 continue 

1046 if self.gen.loc[j, "control_variable"] != opf.CONTROL_PQ: 

1047 continue 

1048 pg = self.idx("pg", j, a, t=t) 

1049 qg = self.idx("qg", j, a, t=t) 

1050 s_rated = float(self.gen.at[j, f"s{a}_max"]) 

1051 coef = sqrt(2) - 1 # ~=0.4142 

1052 # Right half plane. Positive P 

1053 # limit for small +P and large +Q 

1054 a_ineq[ineq[0], pg] = coef 

1055 a_ineq[ineq[0], qg] = 1 

1056 b_ineq[ineq[0]] = s_rated 

1057 # limit for large +P and small +Q 

1058 a_ineq[ineq[1], pg] = 1 

1059 a_ineq[ineq[1], qg] = coef 

1060 b_ineq[ineq[1]] = s_rated 

1061 # limit for large +P and small -Q 

1062 a_ineq[ineq[2], pg] = 1 

1063 a_ineq[ineq[2], qg] = -coef 

1064 b_ineq[ineq[2]] = s_rated 

1065 # limit for small +P and large -Q 

1066 a_ineq[ineq[3], pg] = coef 

1067 a_ineq[ineq[3], qg] = -1 

1068 b_ineq[ineq[3]] = s_rated 

1069 # limit to right half plane 

1070 a_ineq[ineq[4], pg] = -1 

1071 b_ineq[ineq[4]] = 0 

1072 # increment equation indices 

1073 for n_ineq in range(len(ineq)): 

1074 ineq[n_ineq] += len(ineq) 

1075 return csr_array(a_ineq), b_ineq 

1076 

1077 def create_octagon_battery_constraints(self): 

1078 """ 

1079 Create inequality constraints for the optimization problem. 

1080 """ 

1081 

1082 # ########## Aineq and Bineq Formation ########### 

1083 n_inequalities = 8 

1084 

1085 n_rows_ineq = n_inequalities * (len(self.bat)) * self.n_steps 

1086 a_ineq = lil_array((n_rows_ineq, self.n_x)) 

1087 b_ineq = zeros(n_rows_ineq) 

1088 ineq = list(range(n_inequalities)) 

1089 for t in range(self.start_step, self.start_step + self.n_steps): 

1090 for j in self.bat.index: 

1091 for a in "abc": 

1092 if not self.phase_exists(a, j): 

1093 continue 

1094 if self.bat.loc[j, "control_variable"] != opf.CONTROL_PQ: 

1095 continue 

1096 pb = self.idx("pb", j, a, t=t) 

1097 qb = self.idx("qb", j, a, t=t) 

1098 s_rated = self.bat.at[j, "s_max"] / len(self.bat.at[j, "phases"]) 

1099 coef = sqrt(2) - 1 # ~=0.4142 

1100 # equation indexes 

1101 # Right half plane. Positive P 

1102 # limit for small +P and large +Q 

1103 a_ineq[ineq[0], pb] = coef 

1104 a_ineq[ineq[0], qb] = 1 

1105 b_ineq[ineq[0]] = s_rated 

1106 # limit for large +P and small +Q 

1107 a_ineq[ineq[1], pb] = 1 

1108 a_ineq[ineq[1], qb] = coef 

1109 b_ineq[ineq[1]] = s_rated 

1110 # limit for large +P and small -Q 

1111 a_ineq[ineq[2], pb] = 1 

1112 a_ineq[ineq[2], qb] = -coef 

1113 b_ineq[ineq[2]] = s_rated 

1114 # limit for small +P and large -Q 

1115 a_ineq[ineq[3], pb] = coef 

1116 a_ineq[ineq[3], qb] = -1 

1117 b_ineq[ineq[3]] = s_rated 

1118 # Left half plane. Negative P 

1119 # limit for small -P and large -Q 

1120 a_ineq[ineq[4], pb] = -coef 

1121 a_ineq[ineq[4], qb] = -1 

1122 b_ineq[ineq[4]] = s_rated 

1123 # limit for large -P and small -Q 

1124 a_ineq[ineq[5], pb] = -1 

1125 a_ineq[ineq[5], qb] = -coef 

1126 b_ineq[ineq[5]] = s_rated 

1127 # limit for large -P and small +Q 

1128 a_ineq[ineq[6], pb] = -1 

1129 a_ineq[ineq[6], qb] = coef 

1130 b_ineq[ineq[6]] = s_rated 

1131 # limit for small -P and large +Q 

1132 a_ineq[ineq[7], pb] = -coef 

1133 a_ineq[ineq[7], qb] = 1 

1134 b_ineq[ineq[7]] = s_rated 

1135 for n_ineq in range(len(ineq)): 

1136 ineq[n_ineq] += len(ineq) 

1137 break 

1138 return csr_array(a_ineq), b_ineq 

1139 

1140 def create_octagon_thermal_constraints(self): 

1141 """ 

1142 Create inequality constraints for the optimization problem. 

1143 """ 

1144 

1145 # ########## Aineq and Bineq Formation ########### 

1146 if ( 

1147 "sa_max" not in self.branch.columns 

1148 or "sb_max" not in self.branch.columns 

1149 or "sc_max" not in self.branch.columns 

1150 ): 

1151 return lil_array((0, self.n_x)), zeros(0) 

1152 n_inequalities = 8 

1153 

1154 n_rows_ineq = ( 

1155 n_inequalities 

1156 * ( 

1157 len(np.where(~np.isnan(self.branch.sa_max))[0]) 

1158 + len(np.where(~np.isnan(self.branch.sb_max))[0]) 

1159 + len(np.where(~np.isnan(self.branch.sc_max))[0]) 

1160 ) 

1161 * self.n_steps 

1162 ) 

1163 a_ineq = lil_array((n_rows_ineq, self.n_x)) 

1164 b_ineq = zeros(n_rows_ineq) 

1165 ineq = list(range(n_inequalities)) 

1166 for t in range(self.start_step, self.start_step + self.n_steps): 

1167 for tb in self.branch.tb: 

1168 for a in "abc": 

1169 j = tb - 1 

1170 if not self.phase_exists(a, j): 

1171 continue 

1172 s_rated = self.branch.loc[ 

1173 self.branch.tb == tb, f"s{a}_max" 

1174 ].to_numpy()[0] 

1175 if np.isnan(s_rated): 

1176 continue 

1177 pij = self.idx("pij", j, a, t=t) 

1178 qij = self.idx("qij", j, a, t=t) 

1179 coef = sqrt(2) - 1 # ~=0.4142 

1180 # equation indexes 

1181 # Right half plane. Positive Pij 

1182 # limit for small +Pij and large +Qij 

1183 a_ineq[ineq[0], pij] = coef 

1184 a_ineq[ineq[0], qij] = 1 

1185 b_ineq[ineq[0]] = s_rated 

1186 # limit for large +Pij and small +Qij 

1187 a_ineq[ineq[1], pij] = 1 

1188 a_ineq[ineq[1], qij] = coef 

1189 b_ineq[ineq[1]] = s_rated 

1190 # limit for large +Pij and small -Qij 

1191 a_ineq[ineq[2], pij] = 1 

1192 a_ineq[ineq[2], qij] = -coef 

1193 b_ineq[ineq[2]] = s_rated 

1194 # limit for small +Pij and large -Qij 

1195 a_ineq[ineq[3], pij] = coef 

1196 a_ineq[ineq[3], qij] = -1 

1197 b_ineq[ineq[3]] = s_rated 

1198 # Left half plane. Negative Pij 

1199 # limit for small -Pij and large -Qij 

1200 a_ineq[ineq[4], pij] = -coef 

1201 a_ineq[ineq[4], qij] = -1 

1202 b_ineq[ineq[4]] = s_rated 

1203 # limit for large -Pij and small -Qij 

1204 a_ineq[ineq[5], pij] = -1 

1205 a_ineq[ineq[5], qij] = -coef 

1206 b_ineq[ineq[5]] = s_rated 

1207 # limit for large -Pij and small +Qij 

1208 a_ineq[ineq[6], pij] = -1 

1209 a_ineq[ineq[6], qij] = coef 

1210 b_ineq[ineq[6]] = s_rated 

1211 # limit for small -Pij and large +Qij 

1212 a_ineq[ineq[7], pij] = -coef 

1213 a_ineq[ineq[7], qij] = 1 

1214 b_ineq[ineq[7]] = s_rated 

1215 

1216 for n_ineq in range(len(ineq)): 

1217 ineq[n_ineq] += len(ineq) 

1218 return csr_array(a_ineq), b_ineq 

1219 

1220 def get_device_variables(self, x, variable_map): 

1221 df_list = [] 

1222 if len(variable_map.keys()) == 0: 

1223 return pd.DataFrame(columns=["id", "name", "t", "a", "b", "c"]) 

1224 for t in range(self.start_step, self.start_step + self.n_steps): 

1225 index = np.unique( 

1226 np.r_[ 

1227 variable_map[t]["a"].index, 

1228 variable_map[t]["b"].index, 

1229 variable_map[t]["c"].index, 

1230 ] 

1231 ) 

1232 bus_id = index + 1 

1233 df = pd.DataFrame(columns=["id", "name", "t", "a", "b", "c"], index=bus_id) 

1234 df.id = bus_id 

1235 df.t = t 

1236 df.loc[bus_id, "name"] = self.bus.loc[index, "name"].to_numpy() 

1237 for a in "abc": 

1238 df.loc[variable_map[t][a].index + 1, a] = x[variable_map[t][a]] 

1239 df_list.append(df) 

1240 df = pd.concat(df_list, axis=0).reset_index(drop=True) 

1241 df.a = df.a.astype(float) 

1242 df.b = df.b.astype(float) 

1243 df.c = df.c.astype(float) 

1244 return df 

1245 

1246 def get_device_variables_no_phases(self, x, variable_map): 

1247 df_list = [] 

1248 if len(variable_map.keys()) == 0: 

1249 return pd.DataFrame(columns=["id", "name", "t", "value"]) 

1250 for t in range(self.start_step, self.start_step + self.n_steps): 

1251 index = np.unique( 

1252 np.r_[ 

1253 variable_map[t]["a"].index, 

1254 variable_map[t]["b"].index, 

1255 variable_map[t]["c"].index, 

1256 ] 

1257 ) 

1258 bus_id = index + 1 

1259 df = pd.DataFrame(columns=["id", "name", "t", "value"], index=bus_id) 

1260 df.id = bus_id 

1261 df.t = t 

1262 df.loc[bus_id, "name"] = self.bus.loc[index, "name"].to_numpy() 

1263 for a in "abc": 

1264 df.loc[variable_map[t][a].index + 1, "value"] = x[variable_map[t][a]] 

1265 df_list.append(df) 

1266 df = pd.concat(df_list, axis=0).reset_index(drop=True) 

1267 return df 

1268 

1269 def get_voltages(self, x): 

1270 df = self.get_device_variables(x, self.v_map) 

1271 df.loc[:, ["a", "b", "c"]] = df.loc[:, ["a", "b", "c"]] ** 0.5 

1272 df.a = df.a.astype(float) 

1273 df.b = df.b.astype(float) 

1274 df.c = df.c.astype(float) 

1275 return df 

1276 

1277 def get_p_gens(self, x): 

1278 return self.get_device_variables(x, self.pg_map) 

1279 

1280 def get_q_gens(self, x): 

1281 return self.get_device_variables(x, self.qg_map) 

1282 

1283 def get_p_batt(self, x): 

1284 return self.get_device_variables(x, self.pb_map) 

1285 

1286 def get_q_batt(self, x): 

1287 return self.get_device_variables(x, self.qb_map) 

1288 

1289 def get_q_caps(self, x): 

1290 return self.get_device_variables(x, self.qc_map) 

1291 

1292 def get_p_charge(self, x): 

1293 return self.get_device_variables(x, self.charge_map) 

1294 

1295 def get_p_discharge(self, x): 

1296 return self.get_device_variables(x, self.discharge_map) 

1297 

1298 def get_soc(self, x): 

1299 return self.get_device_variables_no_phases(x, self.soc_map) 

1300 

1301 def get_apparent_power_flows(self, x): 

1302 df_list = [] 

1303 for t in range(self.start_step, self.start_step + self.n_steps): 

1304 s_df = pd.DataFrame( 

1305 columns=["fb", "tb", "from_name", "to_name", "t", "a", "b", "c"], 

1306 index=range(2, self.nb + 1), 

1307 ) 

1308 s_df["a"] = s_df["a"].astype(complex) 

1309 s_df["b"] = s_df["b"].astype(complex) 

1310 s_df["c"] = s_df["c"].astype(complex) 

1311 s_df.t = t 

1312 for ph in "abc": 

1313 fb_idxs = self.x_maps[t][ph].bi.to_numpy() 

1314 fb_names = self.bus.name[fb_idxs].to_numpy() 

1315 tb_idxs = self.x_maps[t][ph].bj.to_numpy() 

1316 tb_names = self.bus.name[tb_idxs].to_numpy() 

1317 s_df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, "fb"] = fb_idxs + 1 

1318 s_df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, "tb"] = tb_idxs + 1 

1319 s_df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, "from_name"] = fb_names 

1320 s_df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, "to_name"] = tb_names 

1321 s_df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, ph] = ( 

1322 x[self.x_maps[t][ph].pij] + 1j * x[self.x_maps[t][ph].qij] 

1323 ) 

1324 df_list.append(s_df) 

1325 s_df = pd.concat(df_list, axis=0).reset_index(drop=True) 

1326 return s_df 

1327 

1328 def get_p_flows(self, x): 

1329 df_list = [] 

1330 for t in range(self.start_step, self.start_step + self.n_steps): 

1331 df = pd.DataFrame( 

1332 columns=["fb", "id", "from_name", "name", "t", "a", "b", "c"], 

1333 index=range(2, self.nb + 1), 

1334 ) 

1335 df["a"] = df["a"].astype(float) 

1336 df["b"] = df["b"].astype(float) 

1337 df["c"] = df["c"].astype(float) 

1338 df.t = t 

1339 for ph in "abc": 

1340 fb_idxs = self.x_maps[t][ph].bi.to_numpy() 

1341 fb_names = self.bus.name[fb_idxs].to_numpy() 

1342 tb_idxs = self.x_maps[t][ph].bj.to_numpy() 

1343 tb_names = self.bus.name[tb_idxs].to_numpy() 

1344 df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, "fb"] = fb_idxs + 1 

1345 df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, "id"] = tb_idxs + 1 

1346 df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, "from_name"] = fb_names 

1347 df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, "name"] = tb_names 

1348 df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, ph] = x[ 

1349 self.x_maps[t][ph].pij 

1350 ] 

1351 df_list.append(df) 

1352 df = pd.concat(df_list, axis=0).reset_index(drop=True) 

1353 return df 

1354 

1355 def get_q_flows(self, x): 

1356 df_list = [] 

1357 for t in range(self.start_step, self.start_step + self.n_steps): 

1358 df = pd.DataFrame( 

1359 columns=["fb", "id", "from_name", "name", "t", "a", "b", "c"], 

1360 index=range(2, self.nb + 1), 

1361 ) 

1362 df["a"] = df["a"].astype(float) 

1363 df["b"] = df["b"].astype(float) 

1364 df["c"] = df["c"].astype(float) 

1365 df.t = t 

1366 for ph in "abc": 

1367 fb_idxs = self.x_maps[t][ph].bi.to_numpy() 

1368 fb_names = self.bus.name[fb_idxs].to_numpy() 

1369 tb_idxs = self.x_maps[t][ph].bj.to_numpy() 

1370 tb_names = self.bus.name[tb_idxs].to_numpy() 

1371 df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, "fb"] = fb_idxs + 1 

1372 df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, "id"] = tb_idxs + 1 

1373 df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, "from_name"] = fb_names 

1374 df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, "name"] = tb_names 

1375 df.loc[self.x_maps[t][ph].bj.to_numpy() + 1, ph] = x[ 

1376 self.x_maps[t][ph].qij 

1377 ] 

1378 df_list.append(df) 

1379 df = pd.concat(df_list, axis=0).reset_index(drop=True) 

1380 return df 

1381 

1382 def update( 

1383 self, 

1384 bus_data: Optional[pd.DataFrame] = None, 

1385 gen_data: Optional[pd.DataFrame] = None, 

1386 cap_data: Optional[pd.DataFrame] = None, 

1387 reg_data: Optional[pd.DataFrame] = None, 

1388 bat_data: Optional[pd.DataFrame] = None, 

1389 schedules: Optional[pd.DataFrame] = None, 

1390 start_step: Optional[int] = None, 

1391 n_steps: Optional[int] = None, 

1392 delta_t: Optional[float] = None, # hours per step 

1393 ): 

1394 warnings.warn("update method is untested!") 

1395 # TODO: update is untested! 

1396 

1397 if bus_data is not None: 

1398 self.bus = handle_bus_input(bus_data) 

1399 if gen_data is not None: 

1400 self.gen = handle_gen_input(gen_data) 

1401 if cap_data is not None: 

1402 self.cap = handle_cap_input(cap_data) 

1403 if reg_data is not None: 

1404 self.reg = handle_reg_input(reg_data) 

1405 if bat_data is not None: 

1406 self.bat = handle_bat_input(bat_data) 

1407 if schedules is not None: 

1408 self.schedules = handle_schedules_input(schedules) 

1409 

1410 if self.start_step == start_step: 

1411 start_step = None # no change, remove input 

1412 if self.n_steps == n_steps: 

1413 n_steps = None # no change, remove input 

1414 if self.delta_t == delta_t: 

1415 delta_t = None # no change, remove input 

1416 

1417 if start_step is not None: 

1418 self.start_step = start_step 

1419 if delta_t is not None: 

1420 self.delta_t = delta_t 

1421 if n_steps is not None: 

1422 # need to do complete rebuild 

1423 self.n_steps = n_steps 

1424 self.build() 

1425 return 

1426 a_eq = lil_array(self.a_eq) 

1427 for t in range(self.start_step, self.start_step + self.n_steps): 

1428 for j in range(1, self.nb): 

1429 for ph in ["abc", "bca", "cab"]: 

1430 a, b, c = ph[0], ph[1], ph[2] 

1431 if not self.phase_exists(a, j): 

1432 continue 

1433 if bus_data is not None: 

1434 a_eq, self.b_eq = self.add_swing_voltage_model( 

1435 a_eq, self.b_eq, j, a 

1436 ) 

1437 if bus_data is not None or start_step is not None: 

1438 a_eq, self.b_eq = self.add_load_model(a_eq, self.b_eq, j, a, t) 

1439 if gen_data is not None or start_step is not None: 

1440 a_eq, self.b_eq = self.add_generator_model( 

1441 a_eq, self.b_eq, j, a, t 

1442 ) 

1443 if schedules is not None: 

1444 a_eq, self.b_eq = self.add_load_model(a_eq, self.b_eq, j, a, t) 

1445 a_eq, self.b_eq = self.add_generator_model( 

1446 a_eq, self.b_eq, j, a, t 

1447 ) 

1448 

1449 if cap_data is not None: 

1450 a_eq, self.b_eq = self.add_capacitor_model( 

1451 a_eq, self.b_eq, j, a, t 

1452 ) 

1453 if reg_data is not None: 

1454 a_eq, self.b_eq = self.add_regulator_model( 

1455 a_eq, self.b_eq, j, a, t 

1456 ) 

1457 if bat_data is not None or delta_t is not None: 

1458 a_eq, self.b_eq = self.add_battery_model( 

1459 a_eq, self.b_eq, j, a, b, c, t 

1460 ) 

1461 self.a_eq = csr_array(a_eq) 

1462 self.a_ub, self.b_ub = self.create_inequality_constraints() 

1463 self.bounds = self.init_bounds() 

1464 self.bounds_tuple = list(map(tuple, self.bounds)) 

1465 self.x_min = self.bounds[:, 0] 

1466 self.x_max = self.bounds[:, 1]