Coverage for src/distopf/dssconverter/dssparser.py: 0%

536 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-09 17:44 -0700

1from __future__ import annotations 

2 

3from functools import cache 

4from pathlib import Path 

5import networkx as nx 

6import numpy as np 

7import opendssdirect as dss 

8import pandas as pd 

9 

10 

11class DSSParser: 

12 def __init__( 

13 self, 

14 dssfile: (str, Path), 

15 s_base: float = 1e6, 

16 v_min: float = 0.95, 

17 v_max: float = 1.05, 

18 cvr_p: float = 0, 

19 cvr_q: float = 0, 

20 ) -> None: 

21 self.dss = dss 

22 self.dssfile = dssfile 

23 self.dss.Text.Command(f"Redirect {self.dssfile}") 

24 # if self.dss.Topology.NumLoops() > 0: 

25 # raise ValueError("Toplogy must be radial; topology has .") 

26 self.s_base = s_base 

27 self.v_min = v_min 

28 self.v_max = v_max 

29 self.cvr_p = cvr_p 

30 self.cvr_q = cvr_q 

31 self.bus_names = self.get_bus_names() 

32 # get dataframes and results 

33 self.branch_data = self.get_branch_data() 

34 self.bus_data = self.get_bus_data() 

35 self.gen_data = self.get_gen_data() 

36 self.cap_data = self.get_cap_data() 

37 self.reg_data = self.get_reg_data() 

38 self.v_solved = self.get_v_solved() 

39 self.s_solved = self.get_apparent_power_flows() 

40 

41 def update(self) -> None: 

42 self.dss.Solution.Solve() 

43 self.bus_names = self.get_bus_names() 

44 # get dataframes and results 

45 self.branch_data = self.get_branch_data() 

46 self.bus_data = self.get_bus_data() 

47 self.gen_data = self.get_gen_data() 

48 self.cap_data = self.get_cap_data() 

49 self.reg_data = self.get_reg_data() 

50 self.v_solved = self.get_v_solved() 

51 self.s_solved = self.get_apparent_power_flows() 

52 

53 @cache 

54 def get_bus_names(self) -> list[str]: 

55 """Access all the bus (node) names from the circuit 

56 

57 Returns: 

58 list[str]: list of all the bus names 

59 """ 

60 

61 flag = self.dss.PDElements.First() 

62 branches = [] 

63 while flag: 

64 element_type = self.dss.CktElement.Name().lower().split(".")[0] 

65 if element_type not in ["line", "transformer", "reactor"]: 

66 flag = self.dss.PDElements.Next() 

67 continue 

68 

69 if element_type == "line" and self.dss.Lines.IsSwitch(): 

70 element_type = "switch" 

71 switch_status = ( 

72 "OPEN" 

73 if ( 

74 self.dss.CktElement.IsOpen(1, 0) 

75 or self.dss.CktElement.IsOpen(2, 0) 

76 ) 

77 else "CLOSED" 

78 ) 

79 if switch_status == "OPEN": 

80 flag = self.dss.PDElements.Next() 

81 continue 

82 bus1 = self.dss.CktElement.BusNames()[0].split(".")[0] 

83 bus2 = self.dss.CktElement.BusNames()[1].split(".")[0] 

84 branches.append((bus1, bus2)) 

85 self.dss.Circuit.SetActiveBus(bus2) 

86 flag = self.dss.PDElements.Next() 

87 g = nx.Graph() 

88 g.add_edges_from(set(branches)) 

89 node_list = nx.dfs_preorder_nodes(g, self.source) 

90 node_list = list(node_list) 

91 return node_list 

92 

93 @property 

94 @cache 

95 def bus_names_to_index_map(self) -> dict[str, int]: 

96 """each of the bus mapped to its corresponding index in the bus names list 

97 

98 Returns: 

99 dict[str,int]: dictionary with key as bus names and value as its index 

100 """ 

101 _map = {bus: index + 1 for index, bus in enumerate(self.bus_names)} 

102 return _map 

103 

104 def bus_names_to_index_map_fun(self, bus: str) -> int: 

105 return self.bus_names_to_index_map[bus] 

106 

107 @property 

108 def basekV_LL(self) -> float: 

109 """Returns basekV (line to line) of the circuit based on the sourcebus 

110 

111 Returns: 

112 float: base kV of the circuit as referred to the source bus 

113 """ 

114 # make the source bus active before accessing the base kV since there is no provision to get base kV of circuit 

115 self.dss.Circuit.SetActiveBus(self.source) 

116 return round(self.dss.Bus.kVBase() * np.sqrt(3), 2) 

117 

118 @property 

119 def source(self) -> str: 

120 """source bus of the circuit. 

121 

122 Returns: 

123 str: returns the source bus of the circuit 

124 """ 

125 # typically the first bus is the source bus 

126 self.dss.Vsources.First() 

127 return self.dss.CktElement.BusNames()[0].split(".")[0] 

128 

129 @property 

130 # @cache 

131 def gen_buses(self) -> set[str]: 

132 flag = self.dss.Generators.First() 

133 gen_buses = set() 

134 while flag: 

135 gen_buses.add(self.dss.Generators.Bus1().split(".")[0]) 

136 flag = self.dss.Generators.Next() 

137 return gen_buses 

138 

139 @property 

140 # @cache 

141 def cap_buses(self) -> set[str]: 

142 flag = self.dss.Capacitors.First() 

143 cap_buses = set() 

144 while flag: 

145 cap_buses.add(self.dss.CktElement.BusNames()[0].split(".")[0]) 

146 flag = self.dss.Capacitors.Next() 

147 return cap_buses 

148 

149 @property 

150 # @cache 

151 def load_buses(self) -> set[str]: 

152 flag = self.dss.Loads.First() 

153 load_buses = set() 

154 while flag: 

155 load_buses.add(self.dss.CktElement.BusNames()[0].split(".")[0]) 

156 flag = self.dss.Loads.Next() 

157 return load_buses 

158 

159 @property 

160 def num_phase_map(self) -> dict[str, str]: 

161 # opendss provides nodes phase in number format so we convert it to letter format 

162 num_phase_mapper = { 

163 "[1]": "a", 

164 "[2]": "b", 

165 "[3]": "c", 

166 "[1, 2]": "ab", 

167 "[1, 3]": "ac", 

168 "[2, 3]": "bc", 

169 "[1, 2, 3]": "abc", 

170 "[1, 2, 3, 4]": "abc", # excluding 4th node 

171 } 

172 return num_phase_mapper 

173 

174 def get_v_solved(self) -> pd.DataFrame: 

175 va = pd.DataFrame( 

176 { 

177 "name": [ 

178 name.split(".")[0] 

179 for name in self.dss.Circuit.AllNodeNamesByPhase(1) 

180 ], 

181 "a": self.dss.Circuit.AllNodeVmagPUByPhase(1), 

182 } 

183 ) 

184 vb = pd.DataFrame( 

185 { 

186 "name": [ 

187 name.split(".")[0] 

188 for name in self.dss.Circuit.AllNodeNamesByPhase(2) 

189 ], 

190 "b": self.dss.Circuit.AllNodeVmagPUByPhase(2), 

191 } 

192 ) 

193 vc = pd.DataFrame( 

194 { 

195 "name": [ 

196 name.split(".")[0] 

197 for name in self.dss.Circuit.AllNodeNamesByPhase(3) 

198 ], 

199 "c": self.dss.Circuit.AllNodeVmagPUByPhase(3), 

200 } 

201 ) 

202 v_df = pd.merge(va, vb, on="name", how="outer") 

203 v_df = pd.merge(v_df, vc, on="name", how="outer") 

204 v_df.index = v_df.name.apply(self.bus_names_to_index_map_fun) 

205 v_df = v_df.sort_index() 

206 return v_df 

207 

208 def get_apparent_power_flows(self) -> pd.DataFrame: 

209 s_base = self.s_base 

210 flag = self.dss.PDElements.First() 

211 power_data = [] 

212 while flag: 

213 element_type = self.dss.CktElement.Name().lower().split(".")[0] 

214 is_open = [ 

215 self.dss.CktElement.IsOpen(0, ph) 

216 for ph in range(self.dss.CktElement.NumPhases()) 

217 ] 

218 if all(is_open): 

219 flag = self.dss.PDElements.Next() 

220 continue 

221 s_out = self._get_powers() * 1000 / self.s_base 

222 if element_type not in ["line", "transformer", "reactor"]: 

223 flag = self.dss.PDElements.Next() 

224 continue 

225 bus1 = self.dss.CktElement.BusNames()[0].split(".")[0] 

226 bus2 = self.dss.CktElement.BusNames()[1].split(".")[0] 

227 self.dss.Circuit.SetActiveBus(bus2) 

228 

229 each_power = dict( 

230 fb=self.bus_names_to_index_map[bus1], 

231 tb=self.bus_names_to_index_map[bus2], 

232 from_name=bus1, 

233 to_name=bus2, 

234 a=s_out[0], 

235 b=s_out[1], 

236 c=s_out[2], 

237 ) 

238 

239 power_data.append(each_power) 

240 flag = self.dss.PDElements.Next() 

241 

242 # combine lines between identical buses. 

243 power_df = pd.DataFrame(power_data) 

244 power_df.fb = power_df.fb.astype(int) 

245 power_df.tb = power_df.tb.astype(int) 

246 power_df = ( 

247 power_df.groupby(by=["fb", "tb"], as_index=False) 

248 .agg( 

249 { 

250 "fb": "first", 

251 "tb": "first", 

252 "from_name": "first", 

253 "to_name": "first", 

254 "a": "sum", 

255 "b": "sum", 

256 "c": "sum", 

257 } 

258 ) 

259 .reset_index(drop=True) 

260 .sort_values(by=["fb"], ignore_index=True) 

261 .sort_values(by=["tb"], ignore_index=True) 

262 ) 

263 

264 return power_df 

265 

266 def _get_line_zmatrix(self) -> tuple[np.ndarray, np.ndarray]: 

267 """Returns the z_matrix of a specified line element. 

268 

269 Returns: 

270 real z_matrix, imag z_matrix (np.ndarray, np.ndarray): 3x3 numpy array of the z_matrix corresponding to the each of the phases(real,imag) 

271 """ 

272 n_phases = self.dss.Lines.Phases() 

273 

274 z_matrix_real = np.zeros((3, 3)) 

275 z_matrix_imag = np.zeros((3, 3)) 

276 if n_phases > 3: 

277 pass 

278 if (len(self.dss.CktElement.BusNames()[0].split(".")) == 4) or ( 

279 len(self.dss.CktElement.BusNames()[0].split(".")) == 1 

280 ): 

281 # this is the condition check for three phase since three phase is either represented by bus_name.1.2.3 or bus_name 

282 z_matrix = ( 

283 np.array(self.dss.Lines.RMatrix()) 

284 + 1j * np.array(self.dss.Lines.XMatrix()) 

285 ) * self.dss.Lines.Length() 

286 

287 z_matrix = z_matrix.reshape(3, 3) 

288 

289 return np.real(z_matrix), np.imag(z_matrix) 

290 

291 else: 

292 # for other than 3 phases 

293 active_phases = [ 

294 int(phase) for phase in self.dss.CktElement.BusNames()[0].split(".")[1:] 

295 ] 

296 z_matrix = np.zeros((3, 3), dtype=complex) 

297 r_matrix = self.dss.Lines.RMatrix() 

298 x_matrix = self.dss.Lines.XMatrix() 

299 counter = 0 

300 for _, row in enumerate(active_phases): 

301 for _, col in enumerate(active_phases): 

302 z_matrix[row - 1, col - 1] = ( 

303 complex(r_matrix[counter], x_matrix[counter]) 

304 * self.dss.Lines.Length() 

305 ) 

306 counter = counter + 1 

307 

308 return np.real(z_matrix), np.imag(z_matrix) 

309 

310 def _get_reactor_zmatrix(self) -> tuple[np.ndarray, np.ndarray]: 

311 """Returns the z_matrix of a specified reactor element. 

312 

313 Returns: 

314 real z_matrix, imag z_matrix (np.ndarray, np.ndarray): 3x3 numpy array of the z_matrix corresponding to the each of the phases(real,imag) 

315 """ 

316 n_phases = self.dss.Reactors.Phases() 

317 if n_phases == 3: 

318 return np.eye(3) * self.dss.Reactors.R(), np.eye(3) * self.dss.Reactors.X() 

319 

320 else: 

321 # for other than 3 phases 

322 raise NotImplementedError( 

323 "Parsing reactors with phases other than 3 not implemented" 

324 ) 

325 # active_phases = [ 

326 # int(phase) for phase in self.dss.CktElement.BusNames()[0].split(".")[1:] 

327 # ] 

328 # z_matrix = np.zeros((3, 3), dtype=complex) 

329 # r_matrix = self.dss.Reactors.R() 

330 # x_matrix = self.dss.Reactors.X() 

331 # counter = 0 

332 # for _, row in enumerate(active_phases): 

333 # for _, col in enumerate(active_phases): 

334 # z_matrix[row - 1, col - 1] = ( 

335 # complex(r_matrix[counter], x_matrix[counter]) 

336 # * self.dss.Lines.Length() 

337 # ) 

338 # counter = counter + 1 

339 # 

340 # return np.real(z_matrix), np.imag(z_matrix) 

341 

342 def _get_powers(self): 

343 n_phases = self.dss.CktElement.NumPhases() 

344 pq = np.array(self.dss.CktElement.Powers()) 

345 n_terminals = self.dss.CktElement.NumTerminals() 

346 n_pq_phases = len(pq) // n_terminals // 2 

347 pq = pq.reshape(int(n_pq_phases * n_terminals), 2) 

348 s_out = np.zeros(3, dtype=complex) 

349 active_phases = np.array([0, 1, 2]) 

350 if n_phases < 3: 

351 active_phases = ( 

352 np.array(self.dss.CktElement.BusNames()[0].split(".")[1:]).astype(int) 

353 - 1 

354 ) 

355 

356 p = pq[:, 0] 

357 q = pq[:, 1] 

358 s = p + 1j * q 

359 s_out_ = -s[n_pq_phases:] 

360 s_out[active_phases] = s_out_[:n_phases] 

361 return s_out 

362 

363 def get_branch_data(self) -> pd.DataFrame: 

364 s_base = self.s_base 

365 flag = self.dss.PDElements.First() 

366 line_data = [] 

367 power_data = [] 

368 while flag: 

369 switch_status = None 

370 element_type = self.dss.CktElement.Name().lower().split(".")[0] 

371 element_name = self.dss.CktElement.Name().lower().split(".")[1] 

372 z_matrix_real = np.zeros((3, 3)) 

373 z_matrix_imag = np.zeros((3, 3)) 

374 if element_type not in ["line", "transformer", "reactor"]: 

375 flag = self.dss.PDElements.Next() 

376 continue 

377 if element_type == "transformer": 

378 is_delta = self.dss.Transformers.IsDelta() 

379 n_windings = self.dss.Transformers.NumWindings() 

380 r_xfmr = 0 

381 x_xfmr = 0 

382 n_phases = self.dss.CktElement.NumPhases() 

383 n_terminals = self.dss.CktElement.NumTerminals() 

384 y_prime_flat = np.array(self.dss.CktElement.YPrim()) 

385 y_prim = y_prime_flat[::2] + 1j * y_prime_flat[1::2] 

386 y_shape = int(np.sqrt(len(y_prim))) 

387 y_prim = np.reshape(y_prim, (y_shape, y_shape)) 

388 n_y11 = int(y_shape / 2) 

389 v_all = np.array(self.dss.CktElement.Voltages()) 

390 v_all = v_all[::2] + 1j * v_all[1::2] 

391 v1 = v_all[: len(v_all) // 2] 

392 v2 = v_all[len(v_all) // 2 :] 

393 i_all = np.array(self.dss.CktElement.Currents()) 

394 i_all = i_all[::2] + 1j * i_all[1::2] 

395 i1 = i_all[: len(i_all) // 2] 

396 i2 = i_all[len(i_all) // 2 :] 

397 self.dss.Transformers.Wdg(1) 

398 # v1 = np.array(self.dss.Transformers.WdgVoltages()) 

399 # v1 = v1[::2] + 1j * v1[1::2] 

400 kv_h = self.dss.Transformers.kV() 

401 self.dss.Transformers.Wdg(2) 

402 # v2 = np.array(self.dss.Transformers.WdgVoltages()) 

403 # v2 = v2[::2] + 1j * v2[1::2] 

404 kv_l = self.dss.Transformers.kV() 

405 n = kv_h / kv_l 

406 y11 = y_prim[:n_y11, :n_y11] * n**2 

407 y12 = y_prim[:n_y11, n_y11:] * n 

408 y21 = y_prim[n_y11:, :n_y11] 

409 y22 = y_prim[n_y11:, n_y11:] 

410 y_prim_l = np.r_[np.c_[y11, y12], np.c_[y21, y22]] 

411 # z_prim_l = np.linalg.inv(y_prim_l) 

412 # z11 = z_prim_l[:n_y11, :n_y11] 

413 # z12 = z_prim_l[:n_y11, n_y11:] 

414 # z21 = z_prim_l[n_y11:, :n_y11] 

415 # z22 = z_prim_l[n_y11:, n_y11:] 

416 i_all = np.array(self.dss.Transformers.WdgCurrents()) 

417 i_in = i_all[::2] 

418 i_all = i_all[::2] + i_all[1::2] * 1j 

419 i_in = i_all[::2] 

420 i_out = i_all[1::2] 

421 # i1_in = i_in[::2] 

422 # i2_in = i_in[1::2] 

423 # i2_out = i_out[1::2] 

424 # zabc = (v1[:n_phases] / n - v2[:n_phases]) / -i2[:n_phases] 

425 # TODO: tranformer model may be wrong but the 13bus results look better with it. 

426 # if is_delta: 

427 # raise Warning("Delta transformer not implemented") 

428 # if n_windings != 2: 

429 # 

430 # for i_wdg in range(1, n_windings + 1): 

431 # self.dss.Transformers.Wdg(i_wdg) 

432 v_base_xfmr = self.dss.Transformers.kV() / np.sqrt(3) * 1000 

433 s_base_xfmr = self.dss.Transformers.kVA() * 1000 / 3 

434 z_base_xfmr = v_base_xfmr**2 / s_base_xfmr 

435 

436 x_xfmr = self.dss.Transformers.Xhl() / 100 * z_base_xfmr 

437 r_xfmr = self.dss.Transformers.R() / 100 * z_base_xfmr * 2 

438 z_matrix_real[0, 0] = r_xfmr 

439 z_matrix_real[1, 1] = r_xfmr 

440 z_matrix_real[2, 2] = r_xfmr 

441 z_matrix_imag[0, 0] = x_xfmr 

442 z_matrix_imag[1, 1] = x_xfmr 

443 z_matrix_imag[2, 2] = x_xfmr 

444 pass 

445 if element_type == "line": 

446 element_name = self.dss.Lines.Name() 

447 z_matrix_real, z_matrix_imag = self._get_line_zmatrix() 

448 

449 if self.dss.Lines.IsSwitch(): 

450 element_type = "switch" 

451 switch_status = ( 

452 "OPEN" 

453 if ( 

454 self.dss.CktElement.IsOpen(1, 0) 

455 or self.dss.CktElement.IsOpen(2, 0) 

456 ) 

457 else "CLOSED" 

458 ) 

459 if element_type == "reactor": 

460 element_name = self.dss.Reactors.Name() 

461 z_matrix_real, z_matrix_imag = self._get_reactor_zmatrix() 

462 bus1 = self.dss.CktElement.BusNames()[0].split(".")[0] 

463 bus2 = self.dss.CktElement.BusNames()[1].split(".")[0] 

464 fb = self.bus_names_to_index_map[bus1] 

465 tb = self.bus_names_to_index_map[bus2] 

466 if fb > tb: 

467 fb, tb = tb, fb 

468 bus1, bus2 = bus2, bus1 

469 self.dss.Circuit.SetActiveBus(bus2) 

470 base_kv_ln = self.dss.Bus.kVBase() 

471 z_base = (base_kv_ln * 1000) ** 2 / s_base 

472 line_phases = self.dss.CktElement.BusNames()[0].split(".")[1:] 

473 line_phases = sorted(line_phases) 

474 phases = "abc" 

475 n_phases = self.dss.CktElement.NumPhases() 

476 if n_phases < 3: 

477 active_phases = self.dss.CktElement.BusNames()[0].split(".")[1:] 

478 active_phases = np.array(active_phases).astype(int) - 1 

479 phases = "".join("abc"[i] for i in active_phases) 

480 each_line = dict( 

481 fb=fb, 

482 tb=tb, 

483 from_name=bus1, 

484 to_name=bus2, 

485 raa=z_matrix_real[0, 0] / z_base, 

486 rab=z_matrix_real[0, 1] / z_base, 

487 rac=z_matrix_real[0, 2] / z_base, 

488 rbb=z_matrix_real[1, 1] / z_base, 

489 rbc=z_matrix_real[1, 2] / z_base, 

490 rcc=z_matrix_real[2, 2] / z_base, 

491 xaa=z_matrix_imag[0, 0] / z_base, 

492 xab=z_matrix_imag[0, 1] / z_base, 

493 xac=z_matrix_imag[0, 2] / z_base, 

494 xbb=z_matrix_imag[1, 1] / z_base, 

495 xbc=z_matrix_imag[1, 2] / z_base, 

496 xcc=z_matrix_imag[2, 2] / z_base, 

497 type=element_type, 

498 name=element_name, 

499 status=switch_status, 

500 s_base=s_base, 

501 v_ln_base=base_kv_ln * 1000, 

502 z_base=z_base, 

503 phases=phases, 

504 ) 

505 line_data.append(each_line) 

506 flag = self.dss.PDElements.Next() 

507 

508 # combine lines between identical buses. 

509 branch_df = pd.DataFrame(line_data) 

510 branch_df = ( 

511 branch_df.groupby(by=["fb", "tb"], as_index=False) 

512 .agg( 

513 { 

514 "fb": "max", 

515 "tb": "max", 

516 "from_name": "first", 

517 "to_name": "first", 

518 "raa": "sum", 

519 "rab": "sum", 

520 "rac": "sum", 

521 "rbb": "sum", 

522 "rbc": "sum", 

523 "rcc": "sum", 

524 "xaa": "sum", 

525 "xab": "sum", 

526 "xac": "sum", 

527 "xbb": "sum", 

528 "xbc": "sum", 

529 "xcc": "sum", 

530 "type": "first", 

531 "name": "sum", 

532 "status": "first", 

533 "s_base": "first", 

534 "v_ln_base": "first", 

535 "z_base": "first", 

536 "phases": "sum", 

537 } 

538 ) 

539 .sort_values(by=["tb", "fb"], ignore_index=True) 

540 .reset_index(drop=True) 

541 ) 

542 return branch_df 

543 

544 def get_bus_data(self) -> pd.DataFrame: 

545 """Extract the bus data from the distribution model. 

546 

547 Args: 

548 source_voltage (float, optional): Voltage of the source (for all phases) in per unit (pu). Defaults to 1.0. 

549 s_base (float, optional): MVA base of the system (in VA). Defaults to 1000000 (or 1 MVA). 

550 v_min (float, optional): minimum voltage limit of the system in pu. Defaults to 0.95. 

551 v_max (float, optional): maximum voltage limit of the system in pu. Defaults to 1.05. 

552 cvr_p (float, optional): conservative voltage reduction parameter for p (0 means no voltage dependence). Defaults to 0. 

553 cvr_q (float, optional): conservative voltage reduction parameter for q (0 means no voltage dependence). Defaults to 0. 

554 

555 Returns: 

556 pd.DataFrame: bus data in DataFrame format 

557 """ 

558 source_voltage = self.dss.Vsources.PU() 

559 s_base = self.s_base 

560 v_min = self.v_min 

561 v_max = self.v_max 

562 cvr_p = self.cvr_p 

563 cvr_q = self.cvr_q 

564 all_buses_names = self.dss.Circuit.AllBusNames() 

565 # all_loads = self.get_loads() 

566 load_df = self._get_loads() 

567 bus_data = [] 

568 for bus_id, bus in enumerate(all_buses_names): 

569 # need to set the nodes active before extracting their info 

570 self.dss.Circuit.SetActiveBus(bus) 

571 bus_type = "PQ" 

572 v = 1 

573 if ( 

574 len(self.dss.Bus.AllPCEatBus()) > 0 

575 and "Vsource" in self.dss.Bus.AllPCEatBus()[0] 

576 ): 

577 v = source_voltage 

578 bus_type = "SWING" 

579 active_bus_name = self.dss.Bus.Name() 

580 v_ln_base = self.dss.Bus.kVBase() * 1000 

581 each_bus = dict( 

582 id=self.bus_names_to_index_map[bus], # bus id for each active bus 

583 name=active_bus_name, # name of the active bus 

584 bus_type=bus_type, # SWING if source else PQ 

585 v_a=v, # p.u. voltage of the active bus in phase a 

586 v_b=v, # p.u. voltage of the active bus in phase b 

587 v_c=v, # p.u. voltage of the active bus in phase c 

588 v_ln_base=v_ln_base, # line-to-phase voltage base of the active bus 

589 s_base=s_base, # s_base of the system 

590 v_min=v_min, # minimum p.u. voltage for the bus 

591 v_max=v_max, # maximum p.u. voltage for the bus 

592 cvr_p=cvr_p, # conservative voltage reduction parameter for active power 

593 cvr_q=cvr_q, # conservative voltage reduction parameter for reactive power 

594 phases=self.num_phase_map[ 

595 str(self.dss.Bus.Nodes()) 

596 ], # bus phases a,b,c 

597 has_gen=( 

598 True if active_bus_name in self.gen_buses else False 

599 ), # if the bus has a generator or not 

600 has_load=( 

601 True if active_bus_name in self.load_buses else False 

602 ), # if the bus has a load or not 

603 has_cap=( 

604 True if active_bus_name in self.cap_buses else False 

605 ), # if the bus has a capacitor or not 

606 # be careful that X gives you lon and Y gives you lat 

607 # extra elements 

608 latitude=self.dss.Bus.Y(), # latitude of the bus location (Y) 

609 longitude=self.dss.Bus.X(), 

610 ) # longitude of the bus location (X) 

611 bus_data.append(each_bus) 

612 bus_df = pd.DataFrame(bus_data) 

613 bus_df = ( 

614 pd.merge(load_df, bus_df, on=["id"], how="outer") 

615 .sort_values(by="id", ignore_index=True) 

616 .fillna(0) 

617 ) 

618 return bus_df 

619 

620 def get_gen_data(self) -> pd.DataFrame: 

621 s_base = self.s_base 

622 generator_flag = self.dss.Generators.First() 

623 gen_data = [] 

624 

625 while generator_flag: 

626 bus_phases = self.dss.CktElement.BusNames()[0].split(".")[1:] 

627 n_phases = len(bus_phases) 

628 if len(bus_phases) == 0 or len(bus_phases) >= 3: 

629 n_phases = 3 

630 active_phases = np.array([0, 1, 2]) 

631 if n_phases < 3: 

632 active_phases = ( 

633 np.array(self.dss.CktElement.BusNames()[0].split(".")[1:]).astype( 

634 int 

635 ) 

636 - 1 

637 ) 

638 

639 active_power_per_phase = ( 

640 self.dss.Generators.kW() / n_phases * 1000 

641 ) / s_base 

642 reactive_power_per_phase = ( 

643 self.dss.Generators.kvar() / n_phases * 1000 

644 ) / s_base 

645 apparent_power_rating = ( 

646 self.dss.Generators.kVARated() / n_phases * 1000 / s_base 

647 ) 

648 gen_name = self.dss.Generators.Name() 

649 bus_name = self.dss.Generators.Bus1().split(".")[0] 

650 each_gen = dict( 

651 id=self.bus_names_to_index_map[bus_name], 

652 name=gen_name, 

653 ) 

654 phases = "" 

655 for phase_id in active_phases: 

656 ph = "abc"[phase_id] 

657 each_gen[f"p{ph}"] = active_power_per_phase 

658 phases = phases + ph 

659 for phase_id in active_phases: 

660 ph = "abc"[phase_id] 

661 each_gen[f"q{ph}"] = reactive_power_per_phase 

662 for phase_id in active_phases: 

663 ph = "abc"[phase_id] 

664 each_gen[f"s{ph}_max"] = apparent_power_rating 

665 for ph in "abc": 

666 if ph not in phases: 

667 each_gen[f"p{ph}"] = 0 

668 each_gen[f"q{ph}"] = 0 

669 each_gen[f"s{ph}_max"] = 0 

670 

671 each_gen["phases"] = phases 

672 

673 each_gen["qa_max"] = (None,) 

674 each_gen["qb_max"] = (None,) 

675 each_gen["qc_max"] = (None,) 

676 each_gen["qa_min"] = (None,) 

677 each_gen["qb_min"] = (None,) 

678 each_gen["qc_min"] = (None,) 

679 

680 gen_data.append(each_gen) 

681 generator_flag = self.dss.Generators.Next() 

682 

683 pv_flag = self.dss.PVsystems.First() 

684 while pv_flag: 

685 bus_phases = self.dss.CktElement.BusNames()[0].split(".")[1:] 

686 n_phases = len(bus_phases) 

687 if len(bus_phases) == 0 or len(bus_phases) >= 3: 

688 n_phases = 3 

689 active_phases = np.array([0, 1, 2]) 

690 if n_phases < 3: 

691 active_phases = ( 

692 np.array(self.dss.CktElement.BusNames()[0].split(".")[1:]).astype( 

693 int 

694 ) 

695 - 1 

696 ) 

697 

698 active_power_per_phase = ( 

699 self.dss.PVsystems.Pmpp() / n_phases * 1000 

700 ) / s_base 

701 reactive_power_per_phase = ( 

702 self.dss.PVsystems.kvar() / n_phases * 1000 

703 ) / s_base 

704 apparent_power_rating = ( 

705 self.dss.PVsystems.kVARated() / n_phases * 1000 / s_base 

706 ) 

707 gen_name = self.dss.PVsystems.Name() 

708 bus_name = self.dss.CktElement.BusNames()[0].split(".")[0] 

709 each_gen = dict( 

710 id=self.bus_names_to_index_map[bus_name], 

711 name=gen_name, 

712 ) 

713 phases = "" 

714 for phase_id in active_phases: 

715 ph = "abc"[phase_id] 

716 each_gen[f"p{ph}"] = active_power_per_phase 

717 phases = phases + ph 

718 for phase_id in active_phases: 

719 ph = "abc"[phase_id] 

720 each_gen[f"q{ph}"] = reactive_power_per_phase 

721 for phase_id in active_phases: 

722 ph = "abc"[phase_id] 

723 each_gen[f"s{ph}_max"] = apparent_power_rating 

724 for ph in "abc": 

725 if ph not in phases: 

726 each_gen[f"p{ph}"] = 0 

727 each_gen[f"q{ph}"] = 0 

728 each_gen[f"s{ph}_max"] = 0 

729 

730 each_gen["phases"] = phases 

731 

732 each_gen["qa_max"] = (None,) 

733 each_gen["qb_max"] = (None,) 

734 each_gen["qc_max"] = (None,) 

735 each_gen["qa_min"] = (None,) 

736 each_gen["qb_min"] = (None,) 

737 each_gen["qc_min"] = (None,) 

738 

739 gen_data.append(each_gen) 

740 pv_flag = self.dss.PVsystems.Next() 

741 

742 gen_df = pd.DataFrame(gen_data) 

743 if len(gen_data) < 1: 

744 gen_df = pd.DataFrame( 

745 { 

746 "id": [], 

747 "name": [], 

748 "pa": [], 

749 "pb": [], 

750 "pc": [], 

751 "qa": [], 

752 "qb": [], 

753 "qc": [], 

754 "sa_max": [], 

755 "sb_max": [], 

756 "sc_max": [], 

757 "phases": [], 

758 "qa_max": [], 

759 "qb_max": [], 

760 "qc_max": [], 

761 "qa_min": [], 

762 "qb_min": [], 

763 "qc_min": [], 

764 } 

765 ) 

766 gen_df = gen_df.groupby(by=["id"], as_index=False).agg( 

767 dict( 

768 id="first", 

769 name="first", 

770 pa="sum", 

771 pb="sum", 

772 pc="sum", 

773 qa="sum", 

774 qb="sum", 

775 qc="sum", 

776 sa_max="sum", 

777 sb_max="sum", 

778 sc_max="sum", 

779 phases="sum", 

780 qa_max="sum", 

781 qb_max="sum", 

782 qc_max="sum", 

783 qa_min="sum", 

784 qb_min="sum", 

785 qc_min="sum", 

786 ) 

787 ) 

788 return gen_df 

789 

790 def get_cap_data(self) -> pd.DataFrame: 

791 s_base = self.s_base 

792 flag = self.dss.Capacitors.First() 

793 cap_data = [] 

794 while flag: 

795 cap_bus_name = self.dss.CktElement.BusNames()[0].split(".")[0] 

796 cap_bus_phases = self.dss.CktElement.BusNames()[0].split(".")[1:] 

797 

798 # convert this to string to be consistent with how we conver num to phase letters 

799 cap_bus_phases = str([int(phase) for phase in cap_bus_phases]) 

800 if cap_bus_phases == "[]": 

801 # three phases are usually represented by either .1.2.3 or nothing in opendss 

802 # for second case we should ensure that 3 phase is actually represented 

803 cap_bus_phases = "[1, 2, 3]" 

804 

805 cap_phase = self.num_phase_map[cap_bus_phases] 

806 

807 if cap_phase != "abc": 

808 each_cap = dict( 

809 id=self.bus_names_to_index_map[cap_bus_name], 

810 name=cap_bus_name, 

811 qa=( 

812 self.dss.Capacitors.kvar() * 1000 / s_base 

813 if cap_phase in {"a"} 

814 else 0 

815 ), 

816 qb=( 

817 self.dss.Capacitors.kvar() * 1000 / s_base 

818 if cap_phase in {"b"} 

819 else 0 

820 ), 

821 qc=( 

822 self.dss.Capacitors.kvar() * 1000 / s_base 

823 if cap_phase in {"c"} 

824 else 0 

825 ), 

826 phases=cap_phase, 

827 ) 

828 else: 

829 each_cap = dict( 

830 id=self.bus_names_to_index_map[cap_bus_name], 

831 name=cap_bus_name, 

832 qa=(self.dss.Capacitors.kvar() * 1000 / 3) / s_base, 

833 qb=(self.dss.Capacitors.kvar() * 1000 / 3) / s_base, 

834 qc=(self.dss.Capacitors.kvar() * 1000 / 3) / s_base, 

835 phases=cap_phase, 

836 ) 

837 

838 cap_data.append(each_cap) 

839 flag = self.dss.Capacitors.Next() 

840 cap_df = pd.DataFrame(cap_data) 

841 if len(cap_data) < 1: 

842 cap_df = pd.DataFrame( 

843 { 

844 "id": [], 

845 "name": [], 

846 "qa": [], 

847 "qb": [], 

848 "qc": [], 

849 "phases": [], 

850 } 

851 ) 

852 

853 cap_df = cap_df.groupby(by=["id"], as_index=False).agg( 

854 dict( 

855 id="first", 

856 name="first", 

857 qa="sum", 

858 qb="sum", 

859 qc="sum", 

860 phases="sum", 

861 ) 

862 ) 

863 return cap_df 

864 

865 def get_reg_data(self) -> pd.DataFrame: 

866 s_base = self.s_base 

867 reg_data = [] 

868 reg_control_names = self.dss.RegControls.AllNames() 

869 reg_names = [] 

870 if len(reg_control_names) != 0: 

871 dss_reg_df = self.dss.utils.regcontrols_to_dataframe() 

872 reg_names = dss_reg_df.Transformer.to_list() 

873 flag = self.dss.Transformers.First() 

874 while flag: 

875 element_type = self.dss.CktElement.Name().lower().split(".")[0] 

876 element_name = self.dss.CktElement.Name().lower().split(".")[1] 

877 if element_type not in ["transformer"]: 

878 flag = self.dss.Transformers.Next() 

879 continue 

880 if element_name not in reg_names: 

881 flag = self.dss.Transformers.Next() 

882 continue 

883 bus1 = self.dss.CktElement.BusNames()[0].split(".")[0] 

884 bus2 = self.dss.CktElement.BusNames()[-1].split(".")[0] 

885 fb = self.bus_names_to_index_map[bus1] 

886 tb = self.bus_names_to_index_map[bus2] 

887 tap_direction = 1 

888 if fb > tb: 

889 fb, tb = tb, fb 

890 bus1, bus2 = bus2, bus1 

891 tap_direction = -1 

892 self.dss.Circuit.SetActiveBus(bus2) 

893 line_phases = self.dss.CktElement.BusNames()[0].split(".")[1:] 

894 line_phases = sorted(line_phases) 

895 

896 # convert this to string to be consistent with how we conver num to phase letters 

897 line_phases = str([int(phase) for phase in line_phases]) 

898 if line_phases == "[]": 

899 # three phases are usually represented by either .1.2.3 or nothing in opendss 

900 # for second case we should ensure that 3 phase is actually represented 

901 line_phases = "[1, 2, 3]" 

902 line_phase = self.num_phase_map[line_phases] 

903 ratio = self.dss.Transformers.Tap() 

904 tap = (ratio - 1) / 0.00625 * tap_direction 

905 each_reg = {} 

906 each_reg["fb"] = self.bus_names_to_index_map[bus1] 

907 each_reg["tb"] = self.bus_names_to_index_map[bus2] 

908 each_reg["from_name"] = bus1 

909 each_reg["to_name"] = bus2 

910 for ph in line_phase: 

911 each_reg[f"tap_{ph}"] = int(round(tap)) 

912 each_reg["phases"] = line_phase 

913 reg_data.append(each_reg) 

914 

915 flag = self.dss.Transformers.Next() 

916 

917 # combine lines between identical buses. 

918 reg_df = pd.DataFrame(reg_data) 

919 if len(reg_data) < 1: 

920 reg_df = pd.DataFrame( 

921 { 

922 "fb": [], 

923 "tb": [], 

924 "from_name": [], 

925 "to_name": [], 

926 "tap_a": [], 

927 "tap_b": [], 

928 "tap_c": [], 

929 "phases": [], 

930 } 

931 ) 

932 reg_df = reg_df.groupby(["fb", "tb"]).agg( 

933 { 

934 "fb": "first", 

935 "tb": "first", 

936 "from_name": "first", 

937 "to_name": "first", 

938 "tap_a": "max", 

939 "tap_b": "max", 

940 "tap_c": "max", 

941 "phases": "sum", 

942 } 

943 ) 

944 reg_df = reg_df.reset_index(drop=True) 

945 reg_df = reg_df.sort_values(by="tb", ignore_index=True).fillna(1) 

946 # reg_df["tap_a"] = (reg_df["ratio_a"] - 1) / 0.00625 

947 # reg_df["tap_b"] = (reg_df["ratio_b"] - 1) / 0.00625 

948 # reg_df["tap_c"] = (reg_df["ratio_c"] - 1) / 0.00625 

949 return reg_df 

950 

951 def _get_loads(self) -> dict[str, list[float]]: 

952 """Extract load information for each node for each phase. This method extracts load on the exact bus(node) as 

953 modeled in the distribution model, including secondary. 

954 

955 Returns: 

956 load_per_phase(pd.DataFrame): Per phase load data in a pandas dataframe 

957 """ 

958 s_base = self.s_base 

959 load_df = pd.DataFrame( 

960 [], columns=["id", "name", "pl_a", "ql_a", "pl_b", "ql_b", "pl_c", "ql_c"] 

961 ) 

962 loads_flag = self.dss.Loads.First() 

963 load_data = [] 

964 model_to_cvr_map = { 

965 1: (0, 0), 

966 2: (2, 2), 

967 3: (0, 2), 

968 5: (1, 1), 

969 6: (0, 0), 

970 7: (0, 2), 

971 } 

972 while loads_flag: 

973 connected_buses = self.dss.CktElement.BusNames() 

974 if len(connected_buses) > 1: 

975 raise Exception("Multiple connected buses") 

976 model = self.dss.Loads.Model() 

977 cvr_p, cvr_q = model_to_cvr_map.get(model, 0) 

978 if model == 4: # exponential model 

979 cvr_p = self.dss.Loads.CVRwatts() 

980 cvr_q = self.dss.Loads.CVRvars() 

981 if model == 8: # zip model 

982 zipv = self.dss.Loads.ZipV() 

983 cvr_p = 2 * zipv[0] + zipv[1] 

984 cvr_q = 2 * zipv[3] + zipv[4] 

985 bus = connected_buses[0] 

986 bus_name = bus.split(".")[0] 

987 each_load = { 

988 "id": 0, 

989 "pl_a": 0, 

990 "ql_a": 0, 

991 "pl_b": 0, 

992 "ql_b": 0, 

993 "pl_c": 0, 

994 "ql_c": 0, 

995 "cvr_p": cvr_p, 

996 "cvr_q": cvr_q, 

997 } 

998 bus_split = bus.split(".") 

999 each_load["id"] = self.bus_names_to_index_map[bus_name] 

1000 connected_phase_secondary = bus_split[1:] 

1001 

1002 # conductor power contains info on active and reactive power 

1003 conductor_power = np.array(self.dss.CktElement.Powers()) 

1004 # nonzero_power_indices = np.where(conductor_power != 0)[0] 

1005 # nonzero_power = conductor_power[nonzero_power_indices] 

1006 # Extract P and Q values (every alternate elements) 

1007 a1 = np.exp(-1 / 6 * 1j * np.pi) 

1008 a2 = np.exp(-5 / 6 * 1j * np.pi) 

1009 p_values = conductor_power[::2] 

1010 q_values = conductor_power[1::2] 

1011 phases = "abc" 

1012 n_phases = self.dss.Loads.Phases() 

1013 pf = self.dss.Loads.PF() 

1014 kw = self.dss.Loads.kW() 

1015 kv = self.dss.Loads.kV() 

1016 kvar = self.dss.Loads.kvar() 

1017 is_delta = self.dss.Loads.IsDelta() 

1018 if len(connected_phase_secondary) > 0: 

1019 phases = "".join("abc"[int(n) - 1] for n in connected_phase_secondary) 

1020 # if len(phases) != n_phases: 

1021 # raise Exception("Number of load phases does not match with bus phases") 

1022 for phase_index, ph in enumerate(phases): 

1023 each_load[f"pl_{ph}"] = p_values[phase_index] * 1000 / s_base 

1024 each_load[f"ql_{ph}"] = q_values[phase_index] * 1000 / s_base 

1025 load_data.append(each_load) 

1026 loads_flag = self.dss.Loads.Next() 

1027 load_df = pd.DataFrame(load_data) 

1028 load_df = load_df.groupby("id").agg( 

1029 { 

1030 "id": "first", 

1031 "pl_a": "sum", 

1032 "ql_a": "sum", 

1033 "pl_b": "sum", 

1034 "ql_b": "sum", 

1035 "pl_c": "sum", 

1036 "ql_c": "sum", 

1037 } 

1038 ) 

1039 load_df = load_df.fillna(0) 

1040 return load_df.reset_index(drop=True) 

1041 

1042 def to_csv(self, dir_name: str = None, overwrite: bool = True) -> None: 

1043 if dir_name is None: 

1044 dir_name = "testfiles" 

1045 

1046 Path(dir_name).mkdir(parents=True, exist_ok=overwrite) 

1047 self.branch_data.to_csv(f"{dir_name}/branch_data.csv", index=False) 

1048 self.bus_data.to_csv(f"{dir_name}/bus_data.csv", index=False) 

1049 self.cap_data.to_csv(f"{dir_name}/cap_data.csv", index=False) 

1050 self.gen_data.to_csv(f"{dir_name}/gen_data.csv", index=False) 

1051 self.reg_data.to_csv(f"{dir_name}/reg_data.csv", index=False) 

1052 

1053 def update_gen_q(self, q: pd.DataFrame): 

1054 flag = self.dss.Generators.First() 

1055 while flag: 

1056 bus_phases = np.array( 

1057 self.dss.CktElement.BusNames()[0].split(".")[1:] 

1058 ).astype(int) 

1059 n_phases = len(bus_phases) 

1060 if len(bus_phases) == 0 or len(bus_phases) >= 3: 

1061 n_phases = 3 

1062 active_phases = np.array([0, 1, 2]) 

1063 if n_phases < 3: 

1064 active_phases = bus_phases - 1 

1065 phase_columns = ["abc"[ph_idx] for ph_idx in active_phases] 

1066 bus = self.dss.Generators.Bus1().split(".")[0] 

1067 bus_id = self.bus_names_to_index_map[bus] 

1068 kvar = q.loc[bus_id, phase_columns].sum() * self.s_base / 1000 

1069 self.dss.Generators.kvar(kvar) 

1070 flag = self.dss.Generators.Next() 

1071 

1072 

1073def main() -> None: 

1074 # dss_data = DSSParser(r'ieee9500_dss/Master-unbal-initial-config.dss') 

1075 dss_data = DSSParser(r"ieee13Bus/IEEE13Nodeckt.dss") 

1076 print(dss_data.get_branches()) 

1077 

1078 

1079if __name__ == "__main__": 

1080 main()