Coverage for src/distopf/pyomo_models/lindist.py: 86%

223 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1from enum import IntEnum 

2import pyomo.environ as pyo 

3from typing import Tuple, List 

4import pandas as pd 

5from distopf.importer import Case 

6from distopf.pyomo_models.protocol import LindistModelProtocol 

7 

8 

9class ControlVariable(IntEnum): 

10 NONE = 0 

11 Q = 1 

12 P = 2 

13 PQ = 3 

14 

15 

16CONTROL_VARIABLE_MAP = {"": 0, "Q": 1, "P": 2, "PQ": 3} 

17 

18 

19def _create_phase_tuples(df: pd.DataFrame, id_col: str = "id") -> List[Tuple[str, str]]: 

20 """Create (id, phase) tuples from dataframe with phases column""" 

21 result = [] 

22 for _, row in df.iterrows(): 

23 result.extend([(int(row[id_col]), str(phase)) for phase in row.phases]) 

24 return result 

25 

26 

27def _create_sets(m: pyo.ConcreteModel, case: Case) -> None: 

28 """Create all Pyomo sets""" 

29 m.time_set = pyo.RangeSet(case.start_step, case.start_step + case.n_steps - 1) 

30 m.bus_set = pyo.Set(initialize=case.bus_data.id.tolist()) 

31 m.swing_bus_set = pyo.Set( 

32 initialize=case.bus_data[case.bus_data.bus_type == "SWING"].id.tolist() 

33 ) 

34 m.swing_phase_set = pyo.Set( 

35 initialize=_create_phase_tuples( 

36 case.bus_data[case.bus_data.bus_type == "SWING"] 

37 ), 

38 dimen=2, 

39 ) 

40 m.branch_set = pyo.Set( 

41 initialize=case.bus_data[case.bus_data.bus_type != "SWING"].id.tolist() 

42 ) 

43 m.phase_pair_set = pyo.Set(initialize=["aa", "ab", "ac", "bb", "bc", "cc"]) 

44 m.bus_phase_set = pyo.Set(initialize=_create_phase_tuples(case.bus_data), dimen=2) 

45 m.branch_phase_set = pyo.Set( 

46 initialize=_create_phase_tuples(case.branch_data, "tb"), dimen=2 

47 ) 

48 m.gen_phase_set = pyo.Set(initialize=_create_phase_tuples(case.gen_data), dimen=2) 

49 m.cap_phase_set = pyo.Set(initialize=_create_phase_tuples(case.cap_data), dimen=2) 

50 m.reg_phase_set = pyo.Set( 

51 initialize=_create_phase_tuples(case.reg_data, "tb"), dimen=2 

52 ) 

53 m.bat_phase_set = pyo.Set( 

54 initialize=_create_phase_tuples(case.bat_data, "id"), dimen=2 

55 ) 

56 m.bat_set = pyo.Set(initialize=case.bat_data.id.tolist()) 

57 

58 

59def _create_rx_parameters(m: pyo.ConcreteModel, case: Case) -> None: 

60 """Create resistance and reactance parameters""" 

61 r_data, x_data = {}, {} 

62 

63 for _, row in case.branch_data.iterrows(): 

64 for phase_pair in m.phase_pair_set: 

65 r_col, x_col = f"r{phase_pair}", f"x{phase_pair}" 

66 

67 if r_col in case.branch_data.columns and x_col in case.branch_data.columns: 

68 r_data[(row.tb, phase_pair)] = row[r_col] 

69 x_data[(row.tb, phase_pair)] = row[x_col] 

70 

71 m.r = pyo.Param(m.branch_set, m.phase_pair_set, initialize=r_data, default=0.0) 

72 m.x = pyo.Param(m.branch_set, m.phase_pair_set, initialize=x_data, default=0.0) 

73 

74 

75def _create_load_parameters(m: pyo.ConcreteModel, case: Case) -> None: 

76 load_p_data, load_q_data = {}, {} 

77 cvr_p_data, cvr_q_data = {}, {} 

78 

79 for _, row in case.bus_data.iterrows(): 

80 for phase in row.phases: 

81 if (row.id, phase) not in m.bus_phase_set: 

82 continue 

83 

84 # CVR factors for voltage-dependent loads 

85 cvr_p = getattr(row, "cvr_p", 0.0) 

86 cvr_q = getattr(row, "cvr_q", 0.0) 

87 cvr_p_data[(row.id, phase)] = cvr_p 

88 cvr_q_data[(row.id, phase)] = cvr_q 

89 # Active and reactive loads 

90 p_load = getattr(row, f"pl_{phase}", 0.0) 

91 q_load = getattr(row, f"ql_{phase}", 0.0) 

92 load_mult_p = load_mult_q = 1.0 

93 load_shape = getattr(row, "load_shape", "default") 

94 for t in m.time_set: 

95 if load_shape in case.schedules.columns: 

96 load_mult_p = load_mult_q = case.schedules.at[t, load_shape] 

97 elif f"{load_shape}.{phase}.p" in case.schedules.columns: 

98 load_mult_p = case.schedules.at[t, f"{load_shape}.{phase}.p"] 

99 load_mult_q = case.schedules.at[t, f"{load_shape}.{phase}.q"] 

100 load_p_data[(row.id, phase, t)] = p_load * load_mult_p 

101 load_q_data[(row.id, phase, t)] = q_load * load_mult_q 

102 

103 m.p_load_nom = pyo.Param( 

104 m.bus_phase_set, 

105 m.time_set, 

106 initialize=load_p_data, 

107 default=0.0, 

108 doc="Nominal active power load at 1.0 p.u. voltage", 

109 ) 

110 m.q_load_nom = pyo.Param( 

111 m.bus_phase_set, 

112 m.time_set, 

113 initialize=load_q_data, 

114 default=0.0, 

115 doc="Nominal reactive power load at 1.0 p.u. voltage", 

116 ) 

117 m.cvr_p = pyo.Param( 

118 m.bus_phase_set, 

119 initialize=cvr_p_data, 

120 default=0.0, 

121 doc="CVR factor for active power loads", 

122 ) 

123 m.cvr_q = pyo.Param( 

124 m.bus_phase_set, 

125 initialize=cvr_q_data, 

126 default=0.0, 

127 doc="CVR factor for reactive power loads", 

128 ) 

129 

130 

131def _create_generator_parameters(m: pyo.ConcreteModel, case: Case) -> None: 

132 p_gen_data, q_gen_data = {}, {} 

133 s_rated_data, q_gen_min_data, q_gen_max_data = {}, {}, {} 

134 gen_control_data = {} 

135 

136 for _, row in case.gen_data.iterrows(): 

137 for phase in row.phases: 

138 if (row.id, phase) not in m.gen_phase_set: 

139 continue 

140 

141 # Generation limits 

142 s_rated = getattr(row, f"s{phase}_max", 1000.0) 

143 q_max = getattr(row, f"q{phase}_max", s_rated) 

144 q_min = getattr(row, f"q{phase}_min", -s_rated) 

145 s_rated_data[(row.id, phase)] = s_rated 

146 q_gen_min_data[(row.id, phase)] = q_min 

147 q_gen_max_data[(row.id, phase)] = q_max 

148 

149 # Control variable type 

150 control_var = getattr(row, "control_variable", "") 

151 gen_control_data[(row.id, phase)] = CONTROL_VARIABLE_MAP[control_var] 

152 

153 # Nominal generation values 

154 p_gen = getattr(row, f"p{phase}", 0.0) 

155 q_gen = getattr(row, f"q{phase}", 0.0) 

156 for t in m.time_set: 

157 p_gen_data[(row.id, phase, t)] = p_gen 

158 q_gen_data[(row.id, phase, t)] = q_gen 

159 

160 m.p_gen_nom = pyo.Param( 

161 m.gen_phase_set, 

162 m.time_set, 

163 initialize=p_gen_data, 

164 default=0.0, 

165 doc="Nominal active power generation", 

166 ) 

167 m.q_gen_nom = pyo.Param( 

168 m.gen_phase_set, 

169 m.time_set, 

170 initialize=q_gen_data, 

171 default=0.0, 

172 doc="Nominal reactive power generation", 

173 ) 

174 m.s_rated = pyo.Param( 

175 m.gen_phase_set, 

176 initialize=s_rated_data, 

177 default=1000.0, 

178 doc="Maximum apparent power rating", 

179 ) 

180 m.q_gen_max = pyo.Param( 

181 m.gen_phase_set, 

182 initialize=q_gen_max_data, 

183 default=1000.0, 

184 doc="Maximum reactive power generation", 

185 ) 

186 m.q_gen_min = pyo.Param( 

187 m.gen_phase_set, 

188 initialize=q_gen_min_data, 

189 default=-1000.0, 

190 doc="Minimum reactive power generation", 

191 ) 

192 m.gen_control_type = pyo.Param( 

193 m.gen_phase_set, 

194 initialize=gen_control_data, 

195 default=0, 

196 doc="Generator control variable type", 

197 ) 

198 

199 

200def _create_capacitor_parameters(m: pyo.ConcreteModel, case: Case) -> None: 

201 q_cap_data = {} 

202 for _, row in case.cap_data.iterrows(): 

203 for phase in row.phases: 

204 q_cap = getattr(row, f"q{phase}", 0.0) 

205 q_cap_data[(row.id, phase)] = q_cap 

206 

207 m.q_cap_nom = pyo.Param( 

208 m.cap_phase_set, 

209 initialize=q_cap_data, 

210 default=0.0, 

211 doc="Nominal capacitor reactive power at 1.0 p.u. voltage", 

212 ) 

213 

214 

215def _create_regulator_parameters(m: pyo.ConcreteModel, case: Case) -> None: 

216 ratio_data = {} 

217 for _, row in case.reg_data.iterrows(): 

218 for phase in row.phases: 

219 ratio_data[(row.tb, phase)] = getattr(row, f"ratio_{phase}", 1.0) 

220 

221 m.reg_ratio = pyo.Param( 

222 m.reg_phase_set, 

223 initialize=ratio_data, 

224 default=1.0, 

225 doc="Voltage regulator turn ratio", 

226 ) 

227 

228 

229def _create_v_swing_parameters(m: pyo.ConcreteModel, case: Case) -> None: 

230 v_swing_data = {} 

231 

232 # Find swing buses 

233 swing_buses = case.bus_data[case.bus_data.bus_type == "SWING"] 

234 

235 for _, row in swing_buses.iterrows(): 

236 for phase in row.phases: 

237 if (row.id, phase) not in m.bus_phase_set: 

238 continue 

239 v_swing = getattr(row, f"v_{phase}", 1.0) 

240 for t in m.time_set: 

241 v_swing_data[(row.id, phase, t)] = v_swing 

242 

243 m.v_swing = pyo.Param( 

244 m.swing_phase_set, 

245 m.time_set, 

246 initialize=v_swing_data, 

247 default=1.0, 

248 doc="Swing bus voltage magnitude squared", 

249 ) 

250 

251 

252def _create_v_limit_parameters(m: pyo.ConcreteModel, case: Case) -> None: 

253 v_min_data, v_max_data = {}, {} 

254 

255 for _, row in case.bus_data.iterrows(): 

256 for phase in row.phases: 

257 if (row.id, phase) in m.bus_phase_set: 

258 v_min_data[(row.id, phase)] = getattr(row, "v_min", 0.95) 

259 v_max_data[(row.id, phase)] = getattr(row, "v_max", 1.05) 

260 

261 m.v_min = pyo.Param( 

262 m.bus_phase_set, 

263 initialize=v_min_data, 

264 default=0.95, 

265 doc="Minimum voltage magnitude squared", 

266 ) 

267 m.v_max = pyo.Param( 

268 m.bus_phase_set, 

269 initialize=v_max_data, 

270 default=1.05, 

271 doc="Maximum voltage magnitude squared", 

272 ) 

273 

274 

275def _create_battery_parameters(m: pyo.ConcreteModel, case: Case) -> None: 

276 p_bat_data, q_bat_data = {}, {} 

277 s_rated_data, q_bat_min_data, q_bat_max_data = {}, {}, {} 

278 energy_capacity_data = {} 

279 min_soc_data, max_soc_data = {}, {} 

280 start_soc_data = {} 

281 charge_efficiency_data = {} 

282 discharge_efficiency_data = {} 

283 annual_cycle_limit = {} 

284 bat_control_data = {} 

285 battery_has_a_phase = {} 

286 battery_has_b_phase = {} 

287 battery_has_c_phase = {} 

288 battery_has_phase = {} 

289 battery_n_phases = {} 

290 

291 for _, row in case.bat_data.iterrows(): 

292 energy_capacity_data[row.id] = getattr(row, "energy_capacity") 

293 min_soc_data[row.id] = getattr(row, "min_soc", 0) 

294 max_soc_data[row.id] = getattr(row, "max_soc", 1) 

295 start_soc_data[row.id] = getattr(row, "start_soc", 0.5) 

296 charge_efficiency_data[row.id] = getattr(row, "charge_efficiency", 1) 

297 discharge_efficiency_data[row.id] = getattr(row, "discharge_efficiency", 1) 

298 annual_cycle_limit[row.id] = getattr(row, "annual_cycle_limit", 365) 

299 bat_control_data[row.id] = CONTROL_VARIABLE_MAP[ 

300 getattr(row, "control_variable", "P") 

301 ] 

302 

303 battery_has_a_phase[row.id] = "a" in row.phases 

304 battery_has_b_phase[row.id] = "b" in row.phases 

305 battery_has_c_phase[row.id] = "c" in row.phases 

306 battery_has_phase[(row.id, "a")] = "a" in row.phases 

307 battery_has_phase[(row.id, "b")] = "b" in row.phases 

308 battery_has_phase[(row.id, "c")] = "c" in row.phases 

309 n_phases = len(row.phases) 

310 battery_n_phases[row.id] = n_phases 

311 # Generation limits 

312 for phase in row.phases: 

313 if (row.id, phase) not in m.bat_phase_set: 

314 continue 

315 s_rated = getattr(row, "s_max", 1000.0) 

316 s_rated_data[(row.id, phase)] = s_rated / n_phases 

317 q_bat_min_data[(row.id, phase)] = getattr(row, "q_min", -s_rated) / n_phases 

318 q_bat_max_data[(row.id, phase)] = getattr(row, "q_max", s_rated) / n_phases 

319 # Nominal generation values 

320 for t in m.time_set: 

321 p_bat_data[(row.id, phase, t)] = getattr(row, "p", 0.0) / n_phases 

322 q_bat_data[(row.id, phase, t)] = getattr(row, "q", 0.0) / n_phases 

323 

324 m.p_bat_nom = pyo.Param( 

325 m.bat_phase_set, 

326 m.time_set, 

327 initialize=p_bat_data, 

328 default=0.0, 

329 doc="Nominal active power discharge from battery", 

330 ) 

331 

332 m.q_bat_nom = pyo.Param( 

333 m.bat_phase_set, 

334 m.time_set, 

335 initialize=q_bat_data, 

336 default=0.0, 

337 doc="Nominal reactive power discharge from battery", 

338 ) 

339 m.s_bat_rated = pyo.Param( 

340 m.bat_phase_set, 

341 initialize=s_rated_data, 

342 default=1000.0, 

343 doc="Maximum apparent power rating", 

344 ) 

345 m.q_bat_max = pyo.Param( 

346 m.bat_phase_set, 

347 initialize=q_bat_max_data, 

348 default=1000.0, 

349 doc="Maximum reactive power generation", 

350 ) 

351 m.q_bat_min = pyo.Param( 

352 m.bat_phase_set, 

353 initialize=q_bat_min_data, 

354 default=-1000.0, 

355 doc="Minimum reactive power generation", 

356 ) 

357 m.bat_control_type = pyo.Param( 

358 m.bat_set, 

359 initialize=bat_control_data, 

360 default=0, 

361 doc="Battery control variable type", 

362 ) 

363 m.energy_capacity = pyo.Param( 

364 m.bat_set, 

365 initialize=energy_capacity_data, 

366 default=0, 

367 doc="Battery energy capacity in units power-base * Wh", 

368 ) 

369 m.soc_min = pyo.Param( 

370 m.bat_set, 

371 initialize=min_soc_data, 

372 default=0, 

373 doc="Battery soc minimum as a fraction of energy capacity", 

374 ) 

375 m.soc_max = pyo.Param( 

376 m.bat_set, 

377 initialize=max_soc_data, 

378 default=1, 

379 doc="Battery soc maximum as a fraction of energy capacity", 

380 ) 

381 m.start_soc = pyo.Param( 

382 m.bat_set, 

383 initialize=start_soc_data, 

384 default=0.5, 

385 doc="Battery starting soc as a fraction of energy capacity", 

386 ) 

387 m.charge_efficiency = pyo.Param( 

388 m.bat_set, 

389 initialize=charge_efficiency_data, 

390 default=1.0, 

391 doc="Battery charging efficiency", 

392 ) 

393 m.discharge_efficiency = pyo.Param( 

394 m.bat_set, 

395 initialize=discharge_efficiency_data, 

396 default=1.0, 

397 doc="Battery discharging efficiency", 

398 ) 

399 m.annual_cycle_limit = pyo.Param( 

400 m.bat_set, 

401 initialize=annual_cycle_limit, 

402 default=365, 

403 doc="Limit to number of discharge cycles per year", 

404 ) 

405 m.battery_has_a_phase = pyo.Param( 

406 m.bat_set, initialize=battery_has_a_phase, default=True 

407 ) 

408 m.battery_has_b_phase = pyo.Param( 

409 m.bat_set, initialize=battery_has_b_phase, default=True 

410 ) 

411 m.battery_has_c_phase = pyo.Param( 

412 m.bat_set, initialize=battery_has_c_phase, default=True 

413 ) 

414 m.battery_has_phase = pyo.Param( 

415 m.bat_set, ["a", "b", "c"], initialize=battery_has_phase, default=True 

416 ) 

417 

418 m.battery_n_phases = pyo.Param( 

419 m.bat_set, 

420 initialize=battery_n_phases, 

421 default=3, 

422 doc="Number of phases connected to battery.", 

423 ) 

424 

425 

426def _create_parameters(m: pyo.ConcreteModel, case: Case) -> None: 

427 """ 

428 Create all parameters for the Pyomo model including impedances, loads, 

429 generators, capacitors, regulator ratios, and swing bus voltages. 

430 """ 

431 _create_rx_parameters(m, case) 

432 _create_load_parameters(m, case) 

433 _create_generator_parameters(m, case) 

434 _create_capacitor_parameters(m, case) 

435 _create_regulator_parameters(m, case) 

436 _create_v_swing_parameters(m, case) 

437 _create_v_limit_parameters(m, case) 

438 _create_battery_parameters(m, case) 

439 

440 

441def create_lindist_model(case: Case) -> LindistModelProtocol: 

442 """ 

443 Factory function to create a Pyomo ConcreteModel for multiperiod linear distribution system optimization. 

444 

445 Parameters 

446 ---------- 

447 case : Case 

448 Dataclass containing network data frames 

449 

450 Returns 

451 ------- 

452 pyo.ConcreteModel 

453 Configured Pyomo model with sets, variables, and parameters 

454 """ 

455 

456 m = pyo.ConcreteModel() 

457 m.from_bus_map = { 

458 int(tb): int(fb) for fb, tb in case.branch_data.loc[:, ["fb", "tb"]].to_numpy() 

459 } 

460 m.to_bus_map = { 

461 int(bus_id): case.branch_data.loc[ 

462 case.branch_data.fb == int(bus_id), "tb" 

463 ].to_list() 

464 for bus_id in case.bus_data.id.to_numpy() 

465 } 

466 m.name_map = { 

467 int(_id): str(name) 

468 for _id, name in case.bus_data.loc[:, ["id", "name"]].to_numpy() 

469 } 

470 m.delta_t = pyo.Param(initialize=case.delta_t) 

471 m.start_step = pyo.Param(initialize=case.start_step) 

472 m.n_steps = pyo.Param(initialize=case.n_steps) 

473 _create_sets(m, case) 

474 _create_parameters(m, case) 

475 

476 # Time-indexed variables 

477 m.v2 = pyo.Var( 

478 m.bus_phase_set, m.time_set, domain=pyo.NonNegativeReals, initialize=1 

479 ) # Voltage magnitude squared 

480 m.p_flow = pyo.Var(m.branch_phase_set, m.time_set) 

481 m.q_flow = pyo.Var(m.branch_phase_set, m.time_set, initialize=0) 

482 m.p_gen = pyo.Var(m.gen_phase_set, m.time_set, domain=pyo.NonNegativeReals) 

483 m.q_gen = pyo.Var(m.gen_phase_set, m.time_set, initialize=0) 

484 m.q_cap = pyo.Var(m.cap_phase_set, m.time_set) 

485 m.v2_reg = pyo.Var( 

486 m.reg_phase_set, m.time_set, domain=pyo.NonNegativeReals, initialize=1 

487 ) 

488 m.p_load = pyo.Var(m.bus_phase_set, m.time_set) 

489 m.q_load = pyo.Var(m.bus_phase_set, m.time_set) 

490 

491 # create battery variables 

492 m.p_charge = pyo.Var(m.bat_set, m.time_set, initialize=0) 

493 m.p_discharge = pyo.Var(m.bat_set, m.time_set, initialize=0) 

494 m.p_bat = pyo.Var(m.bat_phase_set, m.time_set, initialize=0) 

495 m.q_bat = pyo.Var(m.bat_phase_set, m.time_set, initialize=0) 

496 m.soc = pyo.Var(m.bat_set, m.time_set, initialize=0.5) 

497 model: LindistModelProtocol = m 

498 return model