Coverage for src/distopf/dss_importer/dss_to_csv_converter.py: 8%

506 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1from __future__ import annotations 

2from typing import Optional 

3from functools import cache 

4from pathlib import Path 

5import networkx as nx 

6import numpy as np 

7import opendssdirect as dss 

8import pandas as pd 

9 

10 

11def load_dss_model( 

12 cim_file: str | Path, s_base: float = 1e6 

13) -> dict[str, pd.DataFrame]: 

14 converter = DSSToCSVConverter(cim_file, s_base=s_base) 

15 data = dict( 

16 bus_data=converter.bus_data, 

17 branch_data=converter.branch_data, 

18 gen_data=converter.gen_data, 

19 cap_data=converter.cap_data, 

20 reg_data=converter.reg_data, 

21 ) 

22 return data 

23 

24 

25class DSSToCSVConverter: 

26 def __init__( 

27 self, 

28 dssfile: str | Path, 

29 s_base: float = 1e6, 

30 v_min: float = 0.95, 

31 v_max: float = 1.05, 

32 cvr_p: float = 0, 

33 cvr_q: float = 0, 

34 ) -> None: 

35 self.dss = dss 

36 self.dssfile = dssfile 

37 self.dss.Text.Command(f"Redirect {self.dssfile}") 

38 # if self.dss.Topology.NumLoops() > 0: 

39 # raise ValueError("Toplogy must be radial; topology has .") 

40 self.s_base = s_base 

41 self.v_min = v_min 

42 self.v_max = v_max 

43 self.cvr_p = cvr_p 

44 self.cvr_q = cvr_q 

45 self.bus_names = self.get_bus_names() 

46 # get dataframes and results 

47 self.branch_data = self.get_branch_data() 

48 self.bus_data = self.get_bus_data() 

49 self.gen_data = self.get_gen_data() 

50 self.cap_data = self.get_cap_data() 

51 self.reg_data = self.get_reg_data() 

52 self.v_solved = self.get_v_solved() 

53 self.s_solved = self.get_apparent_power_flows() 

54 

55 def update(self) -> None: 

56 self.dss.Solution.Solve() 

57 self.bus_names = self.get_bus_names() 

58 # get dataframes and results 

59 self.branch_data = self.get_branch_data() 

60 self.bus_data = self.get_bus_data() 

61 self.gen_data = self.get_gen_data() 

62 self.cap_data = self.get_cap_data() 

63 self.reg_data = self.get_reg_data() 

64 self.v_solved = self.get_v_solved() 

65 self.s_solved = self.get_apparent_power_flows() 

66 

67 @cache 

68 def get_bus_names(self) -> list[str]: 

69 """Access all the bus (node) names from the circuit 

70 

71 Returns: 

72 list[str]: list of all the bus names 

73 """ 

74 

75 flag = self.dss.PDElements.First() 

76 branches = [] 

77 while flag: 

78 element_type = self.dss.CktElement.Name().lower().split(".")[0] 

79 if element_type not in ["line", "transformer", "reactor"]: 

80 flag = self.dss.PDElements.Next() 

81 continue 

82 

83 if element_type == "line" and self.dss.Lines.IsSwitch(): 

84 element_type = "switch" 

85 switch_status = ( 

86 "OPEN" 

87 if ( 

88 self.dss.CktElement.IsOpen(1, 0) 

89 or self.dss.CktElement.IsOpen(2, 0) 

90 ) 

91 else "CLOSED" 

92 ) 

93 if switch_status == "OPEN": 

94 flag = self.dss.PDElements.Next() 

95 continue 

96 bus1 = self.dss.CktElement.BusNames()[0].split(".")[0] 

97 bus2 = self.dss.CktElement.BusNames()[1].split(".")[0] 

98 branches.append((bus1, bus2)) 

99 self.dss.Circuit.SetActiveBus(bus2) 

100 flag = self.dss.PDElements.Next() 

101 g = nx.Graph() 

102 g.add_edges_from(set(branches)) 

103 node_list = nx.dfs_preorder_nodes(g, self.source) 

104 node_list = list(node_list) 

105 return node_list 

106 

107 @property 

108 @cache 

109 def bus_names_to_index_map(self) -> dict[str, int]: 

110 """each of the bus mapped to its corresponding index in the bus names list 

111 

112 Returns: 

113 dict[str,int]: dictionary with key as bus names and value as its index 

114 """ 

115 _map = {bus: index + 1 for index, bus in enumerate(self.bus_names)} 

116 return _map 

117 

118 def bus_names_to_index_map_fun(self, bus: str) -> int: 

119 return self.bus_names_to_index_map[bus] 

120 

121 @property 

122 def basekV_LL(self) -> float: 

123 """Returns basekV (line to line) of the circuit based on the sourcebus 

124 

125 Returns: 

126 float: base kV of the circuit as referred to the source bus 

127 """ 

128 # make the source bus active before accessing the base kV since there is no provision to get base kV of circuit 

129 self.dss.Circuit.SetActiveBus(self.source) 

130 return round(self.dss.Bus.kVBase() * np.sqrt(3), 2) 

131 

132 @property 

133 def source(self) -> str: 

134 """source bus of the circuit. 

135 

136 Returns: 

137 str: returns the source bus of the circuit 

138 """ 

139 # typically the first bus is the source bus 

140 self.dss.Vsources.First() 

141 return self.dss.CktElement.BusNames()[0].split(".")[0] 

142 

143 @property 

144 # @cache 

145 def gen_buses(self) -> set[str]: 

146 flag = self.dss.Generators.First() 

147 gen_buses = set() 

148 while flag: 

149 gen_buses.add(self.dss.Generators.Bus1().split(".")[0]) 

150 flag = self.dss.Generators.Next() 

151 return gen_buses 

152 

153 @property 

154 # @cache 

155 def cap_buses(self) -> set[str]: 

156 flag = self.dss.Capacitors.First() 

157 cap_buses = set() 

158 while flag: 

159 cap_buses.add(self.dss.CktElement.BusNames()[0].split(".")[0]) 

160 flag = self.dss.Capacitors.Next() 

161 return cap_buses 

162 

163 @property 

164 # @cache 

165 def load_buses(self) -> set[str]: 

166 flag = self.dss.Loads.First() 

167 load_buses = set() 

168 while flag: 

169 load_buses.add(self.dss.CktElement.BusNames()[0].split(".")[0]) 

170 flag = self.dss.Loads.Next() 

171 return load_buses 

172 

173 @property 

174 def num_phase_map(self) -> dict[str, str]: 

175 # opendss provides nodes phase in number format so we convert it to letter format 

176 num_phase_mapper = { 

177 "[1]": "a", 

178 "[2]": "b", 

179 "[3]": "c", 

180 "[1, 2]": "ab", 

181 "[1, 3]": "ac", 

182 "[2, 3]": "bc", 

183 "[1, 2, 3]": "abc", 

184 "[1, 2, 3, 4]": "abc", # excluding 4th node 

185 } 

186 return num_phase_mapper 

187 

188 def get_v_solved(self) -> pd.DataFrame: 

189 va = pd.DataFrame( 

190 { 

191 "name": [ 

192 name.split(".")[0] 

193 for name in self.dss.Circuit.AllNodeNamesByPhase(1) 

194 ], 

195 "a": self.dss.Circuit.AllNodeVmagPUByPhase(1), 

196 } 

197 ) 

198 vb = pd.DataFrame( 

199 { 

200 "name": [ 

201 name.split(".")[0] 

202 for name in self.dss.Circuit.AllNodeNamesByPhase(2) 

203 ], 

204 "b": self.dss.Circuit.AllNodeVmagPUByPhase(2), 

205 } 

206 ) 

207 vc = pd.DataFrame( 

208 { 

209 "name": [ 

210 name.split(".")[0] 

211 for name in self.dss.Circuit.AllNodeNamesByPhase(3) 

212 ], 

213 "c": self.dss.Circuit.AllNodeVmagPUByPhase(3), 

214 } 

215 ) 

216 v_df = pd.merge(va, vb, on="name", how="outer") 

217 v_df = pd.merge(v_df, vc, on="name", how="outer") 

218 v_df.index = v_df.name.apply(self.bus_names_to_index_map_fun) 

219 # v_df.set_index(v_df.name.apply(self.bus_names_to_index_map_fun)) 

220 v_df = v_df.sort_index() 

221 return v_df 

222 

223 def get_apparent_power_flows(self) -> pd.DataFrame: 

224 # s_base = self.s_base 

225 flag = self.dss.PDElements.First() 

226 power_data = [] 

227 while flag: 

228 element_type = self.dss.CktElement.Name().lower().split(".")[0] 

229 is_open = [ 

230 self.dss.CktElement.IsOpen(0, ph) 

231 for ph in range(self.dss.CktElement.NumPhases()) 

232 ] 

233 if all(is_open): 

234 flag = self.dss.PDElements.Next() 

235 continue 

236 s_out = self._get_powers() * 1000 / self.s_base 

237 if element_type not in ["line", "transformer", "reactor"]: 

238 flag = self.dss.PDElements.Next() 

239 continue 

240 bus1 = self.dss.CktElement.BusNames()[0].split(".")[0] 

241 bus2 = self.dss.CktElement.BusNames()[1].split(".")[0] 

242 self.dss.Circuit.SetActiveBus(bus2) 

243 

244 each_power = dict( 

245 fb=self.bus_names_to_index_map[bus1], 

246 tb=self.bus_names_to_index_map[bus2], 

247 from_name=bus1, 

248 to_name=bus2, 

249 a=s_out[0], 

250 b=s_out[1], 

251 c=s_out[2], 

252 ) 

253 

254 power_data.append(each_power) 

255 flag = self.dss.PDElements.Next() 

256 

257 # combine lines between identical buses. 

258 power_df = pd.DataFrame(power_data) 

259 power_df.fb = power_df.fb.astype(int) 

260 power_df.tb = power_df.tb.astype(int) 

261 power_df = ( 

262 power_df.groupby(by=["fb", "tb"], as_index=False) 

263 .agg( 

264 { 

265 "fb": "first", 

266 "tb": "first", 

267 "from_name": "first", 

268 "to_name": "first", 

269 "a": "sum", 

270 "b": "sum", 

271 "c": "sum", 

272 } 

273 ) 

274 .reset_index(drop=True) 

275 .sort_values(by=["fb"], ignore_index=True) 

276 .sort_values(by=["tb"], ignore_index=True) 

277 ) 

278 

279 return power_df 

280 

281 def _get_line_zmatrix(self) -> tuple[np.ndarray, np.ndarray]: 

282 """Returns the z_matrix of a specified line element. 

283 

284 Returns: 

285 real z_matrix, imag z_matrix (np.ndarray, np.ndarray): 3x3 numpy array of the z_matrix corresponding to the each of the phases(real,imag) 

286 """ 

287 n_phases = self.dss.Lines.Phases() 

288 

289 # z_matrix_real = np.zeros((3, 3)) 

290 # z_matrix_imag = np.zeros((3, 3)) 

291 if n_phases > 3: 

292 pass 

293 if (len(self.dss.CktElement.BusNames()[0].split(".")) == 4) or ( 

294 len(self.dss.CktElement.BusNames()[0].split(".")) == 1 

295 ): 

296 # this is the condition check for three phase since three phase is either represented by bus_name.1.2.3 or bus_name 

297 z_matrix = ( 

298 np.array(self.dss.Lines.RMatrix()) 

299 + 1j * np.array(self.dss.Lines.XMatrix()) 

300 ) * self.dss.Lines.Length() 

301 

302 z_matrix = z_matrix.reshape(3, 3) 

303 

304 return np.real(z_matrix), np.imag(z_matrix) 

305 

306 else: 

307 # for other than 3 phases 

308 active_phases = [ 

309 int(phase) for phase in self.dss.CktElement.BusNames()[0].split(".")[1:] 

310 ] 

311 z_matrix = np.zeros((3, 3), dtype=complex) 

312 r_matrix = self.dss.Lines.RMatrix() 

313 x_matrix = self.dss.Lines.XMatrix() 

314 counter = 0 

315 for _, row in enumerate(active_phases): 

316 for _, col in enumerate(active_phases): 

317 z_matrix[row - 1, col - 1] = ( 

318 complex(r_matrix[counter], x_matrix[counter]) 

319 * self.dss.Lines.Length() 

320 ) 

321 counter = counter + 1 

322 

323 return np.real(z_matrix), np.imag(z_matrix) 

324 

325 def _get_reactor_zmatrix(self) -> tuple[np.ndarray, np.ndarray]: 

326 """Returns the z_matrix of a specified reactor element. 

327 

328 Returns: 

329 real z_matrix, imag z_matrix (np.ndarray, np.ndarray): 3x3 numpy array of the z_matrix corresponding to the each of the phases(real,imag) 

330 """ 

331 n_phases = self.dss.Reactors.Phases() 

332 if n_phases == 3: 

333 return np.eye(3) * self.dss.Reactors.R(), np.eye(3) * self.dss.Reactors.X() 

334 

335 else: 

336 # for other than 3 phases 

337 raise NotImplementedError( 

338 "Parsing reactors with phases other than 3 not implemented" 

339 ) 

340 # active_phases = [ 

341 # int(phase) for phase in self.dss.CktElement.BusNames()[0].split(".")[1:] 

342 # ] 

343 # z_matrix = np.zeros((3, 3), dtype=complex) 

344 # r_matrix = self.dss.Reactors.R() 

345 # x_matrix = self.dss.Reactors.X() 

346 # counter = 0 

347 # for _, row in enumerate(active_phases): 

348 # for _, col in enumerate(active_phases): 

349 # z_matrix[row - 1, col - 1] = ( 

350 # complex(r_matrix[counter], x_matrix[counter]) 

351 # * self.dss.Lines.Length() 

352 # ) 

353 # counter = counter + 1 

354 # 

355 # return np.real(z_matrix), np.imag(z_matrix) 

356 

357 def _get_powers(self): 

358 n_phases = self.dss.CktElement.NumPhases() 

359 pq = np.array(self.dss.CktElement.Powers()) 

360 n_terminals = self.dss.CktElement.NumTerminals() 

361 n_pq_phases = len(pq) // n_terminals // 2 

362 pq = pq.reshape(int(n_pq_phases * n_terminals), 2) 

363 s_out = np.zeros(3, dtype=complex) 

364 active_phases = np.array([0, 1, 2]) 

365 if n_phases < 3: 

366 active_phases = ( 

367 np.array(self.dss.CktElement.BusNames()[0].split(".")[1:]).astype(int) 

368 - 1 

369 ) 

370 

371 p = pq[:, 0] 

372 q = pq[:, 1] 

373 s = p + 1j * q 

374 s_out_ = -s[n_pq_phases:] 

375 s_out[active_phases] = s_out_[:n_phases] 

376 return s_out 

377 

378 def get_branch_data(self) -> pd.DataFrame: 

379 s_base = self.s_base 

380 flag = self.dss.PDElements.First() 

381 line_data = [] 

382 # power_data = [] 

383 while flag: 

384 switch_status = None 

385 element_type = self.dss.CktElement.Name().lower().split(".")[0] 

386 element_name = self.dss.CktElement.Name().lower().split(".")[1] 

387 z_matrix_real = np.zeros((3, 3)) 

388 z_matrix_imag = np.zeros((3, 3)) 

389 if element_type not in ["line", "transformer", "reactor"]: 

390 flag = self.dss.PDElements.Next() 

391 continue 

392 if element_type == "transformer": 

393 # is_delta = self.dss.Transformers.IsDelta() 

394 # n_windings = self.dss.Transformers.NumWindings() 

395 r_xfmr = 0 

396 x_xfmr = 0 

397 n_phases = self.dss.CktElement.NumPhases() 

398 # n_terminals = self.dss.CktElement.NumTerminals() 

399 y_prime_flat = np.array(self.dss.CktElement.YPrim()) 

400 y_prim = y_prime_flat[::2] + 1j * y_prime_flat[1::2] 

401 y_shape = int(np.sqrt(len(y_prim))) 

402 y_prim = np.reshape(y_prim, (y_shape, y_shape)) 

403 # n_y11 = int(y_shape / 2) 

404 v_all = np.array(self.dss.CktElement.Voltages()) 

405 v_all = v_all[::2] + 1j * v_all[1::2] 

406 # v1 = v_all[: len(v_all) // 2] 

407 # v2 = v_all[len(v_all) // 2 :] 

408 i_all = np.array(self.dss.CktElement.Currents()) 

409 i_all = i_all[::2] + 1j * i_all[1::2] 

410 # i1 = i_all[: len(i_all) // 2] 

411 # i2 = i_all[len(i_all) // 2 :] 

412 self.dss.Transformers.Wdg(1) 

413 # v1 = np.array(self.dss.Transformers.WdgVoltages()) 

414 # v1 = v1[::2] + 1j * v1[1::2] 

415 # kv_h = self.dss.Transformers.kV() 

416 self.dss.Transformers.Wdg(2) 

417 # v2 = np.array(self.dss.Transformers.WdgVoltages()) 

418 # v2 = v2[::2] + 1j * v2[1::2] 

419 # kv_l = self.dss.Transformers.kV() 

420 # n = kv_h / kv_l 

421 # y11 = y_prim[:n_y11, :n_y11] * n**2 

422 # y12 = y_prim[:n_y11, n_y11:] * n 

423 # y21 = y_prim[n_y11:, :n_y11] 

424 # y22 = y_prim[n_y11:, n_y11:] 

425 # y_prim_l = np.r_[np.c_[y11, y12], np.c_[y21, y22]] 

426 # z_prim_l = np.linalg.inv(y_prim_l) 

427 # z11 = z_prim_l[:n_y11, :n_y11] 

428 # z12 = z_prim_l[:n_y11, n_y11:] 

429 # z21 = z_prim_l[n_y11:, :n_y11] 

430 # z22 = z_prim_l[n_y11:, n_y11:] 

431 i_all = np.array(self.dss.Transformers.WdgCurrents()) 

432 # i_in = i_all[::2] 

433 i_all = i_all[::2] + i_all[1::2] * 1j 

434 # i_in = i_all[::2] 

435 # i_out = i_all[1::2] 

436 # i1_in = i_in[::2] 

437 # i2_in = i_in[1::2] 

438 # i2_out = i_out[1::2] 

439 # zabc = (v1[:n_phases] / n - v2[:n_phases]) / -i2[:n_phases] 

440 # TODO: tranformer model may be wrong but the 13bus results look better with it. 

441 # if is_delta: 

442 # raise Warning("Delta transformer not implemented") 

443 # if n_windings != 2: 

444 # 

445 # for i_wdg in range(1, n_windings + 1): 

446 # self.dss.Transformers.Wdg(i_wdg) 

447 v_base_xfmr = self.dss.Transformers.kV() / np.sqrt(3) * 1000 

448 s_base_xfmr = self.dss.Transformers.kVA() * 1000 / 3 

449 z_base_xfmr = v_base_xfmr**2 / s_base_xfmr 

450 

451 x_xfmr = self.dss.Transformers.Xhl() / 100 * z_base_xfmr 

452 r_xfmr = self.dss.Transformers.R() / 100 * z_base_xfmr * 2 

453 z_matrix_real[0, 0] = r_xfmr 

454 z_matrix_real[1, 1] = r_xfmr 

455 z_matrix_real[2, 2] = r_xfmr 

456 z_matrix_imag[0, 0] = x_xfmr 

457 z_matrix_imag[1, 1] = x_xfmr 

458 z_matrix_imag[2, 2] = x_xfmr 

459 pass 

460 if element_type == "line": 

461 element_name = self.dss.Lines.Name() 

462 z_matrix_real, z_matrix_imag = self._get_line_zmatrix() 

463 

464 if self.dss.Lines.IsSwitch(): 

465 element_type = "switch" 

466 switch_status = ( 

467 "OPEN" 

468 if ( 

469 self.dss.CktElement.IsOpen(1, 0) 

470 or self.dss.CktElement.IsOpen(2, 0) 

471 ) 

472 else "CLOSED" 

473 ) 

474 if element_type == "reactor": 

475 element_name = self.dss.Reactors.Name() 

476 z_matrix_real, z_matrix_imag = self._get_reactor_zmatrix() 

477 bus1 = self.dss.CktElement.BusNames()[0].split(".")[0] 

478 bus2 = self.dss.CktElement.BusNames()[1].split(".")[0] 

479 fb = self.bus_names_to_index_map[bus1] 

480 tb = self.bus_names_to_index_map[bus2] 

481 if fb > tb: 

482 fb, tb = tb, fb 

483 bus1, bus2 = bus2, bus1 

484 self.dss.Circuit.SetActiveBus(bus2) 

485 base_kv_ln = self.dss.Bus.kVBase() 

486 z_base = (base_kv_ln * 1000) ** 2 / s_base 

487 line_phases = self.dss.CktElement.BusNames()[0].split(".")[1:] 

488 line_phases = sorted(line_phases) 

489 phases = "abc" 

490 n_phases = self.dss.CktElement.NumPhases() 

491 if n_phases < 3: 

492 active_phases = self.dss.CktElement.BusNames()[0].split(".")[1:] 

493 active_phases = np.array(active_phases).astype(int) - 1 

494 phases = "".join("abc"[i] for i in active_phases) 

495 each_line = dict( 

496 fb=fb, 

497 tb=tb, 

498 from_name=bus1, 

499 to_name=bus2, 

500 raa=z_matrix_real[0, 0] / z_base, 

501 rab=z_matrix_real[0, 1] / z_base, 

502 rac=z_matrix_real[0, 2] / z_base, 

503 rbb=z_matrix_real[1, 1] / z_base, 

504 rbc=z_matrix_real[1, 2] / z_base, 

505 rcc=z_matrix_real[2, 2] / z_base, 

506 xaa=z_matrix_imag[0, 0] / z_base, 

507 xab=z_matrix_imag[0, 1] / z_base, 

508 xac=z_matrix_imag[0, 2] / z_base, 

509 xbb=z_matrix_imag[1, 1] / z_base, 

510 xbc=z_matrix_imag[1, 2] / z_base, 

511 xcc=z_matrix_imag[2, 2] / z_base, 

512 type=element_type, 

513 name=element_name, 

514 status=switch_status, 

515 s_base=s_base, 

516 v_ln_base=base_kv_ln * 1000, 

517 z_base=z_base, 

518 phases=phases, 

519 ) 

520 line_data.append(each_line) 

521 flag = self.dss.PDElements.Next() 

522 

523 # combine lines between identical buses. 

524 branch_df = pd.DataFrame(line_data) 

525 branch_df = ( 

526 branch_df.groupby(by=["fb", "tb"], as_index=False) 

527 .agg( 

528 { 

529 "fb": "max", 

530 "tb": "max", 

531 "from_name": "first", 

532 "to_name": "first", 

533 "raa": "sum", 

534 "rab": "sum", 

535 "rac": "sum", 

536 "rbb": "sum", 

537 "rbc": "sum", 

538 "rcc": "sum", 

539 "xaa": "sum", 

540 "xab": "sum", 

541 "xac": "sum", 

542 "xbb": "sum", 

543 "xbc": "sum", 

544 "xcc": "sum", 

545 "type": "first", 

546 "name": "sum", 

547 "status": "first", 

548 "s_base": "first", 

549 "v_ln_base": "first", 

550 "z_base": "first", 

551 "phases": "sum", 

552 } 

553 ) 

554 .sort_values(by=["tb", "fb"], ignore_index=True) 

555 .reset_index(drop=True) 

556 ) 

557 return branch_df 

558 

559 def get_bus_data(self) -> pd.DataFrame: 

560 """Extract the bus data from the distribution model. 

561 

562 Args: 

563 source_voltage (float, optional): Voltage of the source (for all phases) in per unit (pu). Defaults to 1.0. 

564 s_base (float, optional): MVA base of the system (in VA). Defaults to 1000000 (or 1 MVA). 

565 v_min (float, optional): minimum voltage limit of the system in pu. Defaults to 0.95. 

566 v_max (float, optional): maximum voltage limit of the system in pu. Defaults to 1.05. 

567 cvr_p (float, optional): conservative voltage reduction parameter for p (0 means no voltage dependence). Defaults to 0. 

568 cvr_q (float, optional): conservative voltage reduction parameter for q (0 means no voltage dependence). Defaults to 0. 

569 

570 Returns: 

571 pd.DataFrame: bus data in DataFrame format 

572 """ 

573 source_voltage = self.dss.Vsources.PU() 

574 s_base = self.s_base 

575 v_min = self.v_min 

576 v_max = self.v_max 

577 cvr_p = self.cvr_p 

578 cvr_q = self.cvr_q 

579 all_buses_names = self.dss.Circuit.AllBusNames() 

580 # all_loads = self.get_loads() 

581 load_df = self._get_loads() 

582 bus_data = [] 

583 for bus_id, bus in enumerate(all_buses_names): 

584 # need to set the nodes active before extracting their info 

585 self.dss.Circuit.SetActiveBus(bus) 

586 bus_type = "PQ" 

587 v = 1 

588 if ( 

589 len(self.dss.Bus.AllPCEatBus()) > 0 

590 and "Vsource" in self.dss.Bus.AllPCEatBus()[0] 

591 ): 

592 v = source_voltage 

593 bus_type = "SWING" 

594 active_bus_name = self.dss.Bus.Name() 

595 v_ln_base = self.dss.Bus.kVBase() * 1000 

596 each_bus = dict( 

597 id=self.bus_names_to_index_map[bus], # bus id for each active bus 

598 name=active_bus_name, # name of the active bus 

599 bus_type=bus_type, # SWING if source else PQ 

600 v_a=v, # p.u. voltage of the active bus in phase a 

601 v_b=v, # p.u. voltage of the active bus in phase b 

602 v_c=v, # p.u. voltage of the active bus in phase c 

603 v_ln_base=v_ln_base, # line-to-phase voltage base of the active bus 

604 s_base=s_base, # s_base of the system 

605 v_min=v_min, # minimum p.u. voltage for the bus 

606 v_max=v_max, # maximum p.u. voltage for the bus 

607 cvr_p=cvr_p, # conservative voltage reduction parameter for active power 

608 cvr_q=cvr_q, # conservative voltage reduction parameter for reactive power 

609 phases=self.num_phase_map[ 

610 str(self.dss.Bus.Nodes()) 

611 ], # bus phases a,b,c 

612 has_gen=( 

613 True if active_bus_name in self.gen_buses else False 

614 ), # if the bus has a generator or not 

615 has_load=( 

616 True if active_bus_name in self.load_buses else False 

617 ), # if the bus has a load or not 

618 has_cap=( 

619 True if active_bus_name in self.cap_buses else False 

620 ), # if the bus has a capacitor or not 

621 # be careful that X gives you lon and Y gives you lat 

622 # extra elements 

623 latitude=self.dss.Bus.Y(), # latitude of the bus location (Y) 

624 longitude=self.dss.Bus.X(), 

625 ) # longitude of the bus location (X) 

626 bus_data.append(each_bus) 

627 bus_df = pd.DataFrame(bus_data) 

628 bus_df = ( 

629 pd.merge(load_df, bus_df, on=["id"], how="outer") 

630 .sort_values(by="id", ignore_index=True) 

631 .fillna(0) 

632 ) 

633 return bus_df 

634 

635 def get_gen_data(self) -> pd.DataFrame: 

636 s_base = self.s_base 

637 generator_flag = self.dss.Generators.First() 

638 gen_data = [] 

639 

640 while generator_flag: 

641 bus_phases = self.dss.CktElement.BusNames()[0].split(".")[1:] 

642 n_phases = len(bus_phases) 

643 if len(bus_phases) == 0 or len(bus_phases) >= 3: 

644 n_phases = 3 

645 active_phases = np.array([0, 1, 2]) 

646 if n_phases < 3: 

647 active_phases = ( 

648 np.array(self.dss.CktElement.BusNames()[0].split(".")[1:]).astype( 

649 int 

650 ) 

651 - 1 

652 ) 

653 

654 active_power_per_phase = ( 

655 self.dss.Generators.kW() / n_phases * 1000 

656 ) / s_base 

657 reactive_power_per_phase = ( 

658 self.dss.Generators.kvar() / n_phases * 1000 

659 ) / s_base 

660 apparent_power_rating = ( 

661 self.dss.Generators.kVARated() / n_phases * 1000 / s_base 

662 ) 

663 gen_name = self.dss.Generators.Name() 

664 bus_name = self.dss.Generators.Bus1().split(".")[0] 

665 each_gen = dict( 

666 id=self.bus_names_to_index_map[bus_name], 

667 name=gen_name, 

668 ) 

669 phases = "" 

670 for phase_id in active_phases: 

671 ph = "abc"[phase_id] 

672 each_gen[f"p{ph}"] = active_power_per_phase 

673 phases = phases + ph 

674 for phase_id in active_phases: 

675 ph = "abc"[phase_id] 

676 each_gen[f"q{ph}"] = reactive_power_per_phase 

677 for phase_id in active_phases: 

678 ph = "abc"[phase_id] 

679 each_gen[f"s{ph}_max"] = apparent_power_rating 

680 for ph in "abc": 

681 if ph not in phases: 

682 each_gen[f"p{ph}"] = 0 

683 each_gen[f"q{ph}"] = 0 

684 each_gen[f"s{ph}_max"] = 0 

685 

686 each_gen["phases"] = phases 

687 

688 each_gen["qa_max"] = each_gen["sa_max"] 

689 each_gen["qb_max"] = each_gen["sb_max"] 

690 each_gen["qc_max"] = each_gen["sc_max"] 

691 each_gen["qa_min"] = -each_gen["sa_max"] 

692 each_gen["qb_min"] = -each_gen["sb_max"] 

693 each_gen["qc_min"] = -each_gen["sc_max"] 

694 each_gen["control_variable"] = "PQ" 

695 

696 gen_data.append(each_gen) 

697 generator_flag = self.dss.Generators.Next() 

698 

699 pv_flag = self.dss.PVsystems.First() 

700 while pv_flag: 

701 bus_phases = self.dss.CktElement.BusNames()[0].split(".")[1:] 

702 n_phases = len(bus_phases) 

703 if len(bus_phases) == 0 or len(bus_phases) >= 3: 

704 n_phases = 3 

705 active_phases = np.array([0, 1, 2]) 

706 if n_phases < 3: 

707 active_phases = ( 

708 np.array(self.dss.CktElement.BusNames()[0].split(".")[1:]).astype( 

709 int 

710 ) 

711 - 1 

712 ) 

713 

714 active_power_per_phase = ( 

715 self.dss.PVsystems.Pmpp() / n_phases * 1000 

716 ) / s_base 

717 reactive_power_per_phase = ( 

718 self.dss.PVsystems.kvar() / n_phases * 1000 

719 ) / s_base 

720 apparent_power_rating = ( 

721 self.dss.PVsystems.kVARated() / n_phases * 1000 / s_base 

722 ) 

723 gen_name = self.dss.PVsystems.Name() 

724 bus_name = self.dss.CktElement.BusNames()[0].split(".")[0] 

725 each_gen = dict( 

726 id=self.bus_names_to_index_map[bus_name], 

727 name=gen_name, 

728 ) 

729 phases = "" 

730 for phase_id in active_phases: 

731 ph = "abc"[phase_id] 

732 each_gen[f"p{ph}"] = active_power_per_phase 

733 phases = phases + ph 

734 for phase_id in active_phases: 

735 ph = "abc"[phase_id] 

736 each_gen[f"q{ph}"] = reactive_power_per_phase 

737 for phase_id in active_phases: 

738 ph = "abc"[phase_id] 

739 each_gen[f"s{ph}_max"] = apparent_power_rating 

740 for ph in "abc": 

741 if ph not in phases: 

742 each_gen[f"p{ph}"] = 0 

743 each_gen[f"q{ph}"] = 0 

744 each_gen[f"s{ph}_max"] = 0 

745 

746 each_gen["phases"] = phases 

747 

748 each_gen["qa_max"] = each_gen["sa_max"] 

749 each_gen["qb_max"] = each_gen["sb_max"] 

750 each_gen["qc_max"] = each_gen["sc_max"] 

751 each_gen["qa_min"] = -each_gen["sa_max"] 

752 each_gen["qb_min"] = -each_gen["sb_max"] 

753 each_gen["qc_min"] = -each_gen["sc_max"] 

754 each_gen["control_variable"] = "PQ" 

755 

756 gen_data.append(each_gen) 

757 pv_flag = self.dss.PVsystems.Next() 

758 

759 gen_df = pd.DataFrame(gen_data) 

760 if len(gen_data) < 1: 

761 gen_df = pd.DataFrame( 

762 { 

763 "id": [], 

764 "name": [], 

765 "pa": [], 

766 "pb": [], 

767 "pc": [], 

768 "qa": [], 

769 "qb": [], 

770 "qc": [], 

771 "sa_max": [], 

772 "sb_max": [], 

773 "sc_max": [], 

774 "phases": [], 

775 "qa_max": [], 

776 "qb_max": [], 

777 "qc_max": [], 

778 "qa_min": [], 

779 "qb_min": [], 

780 "qc_min": [], 

781 "control_variable": [], 

782 } 

783 ) 

784 gen_df = gen_df.groupby(by=["id"], as_index=False).agg( 

785 dict( 

786 id="first", 

787 name="first", 

788 pa="sum", 

789 pb="sum", 

790 pc="sum", 

791 qa="sum", 

792 qb="sum", 

793 qc="sum", 

794 sa_max="sum", 

795 sb_max="sum", 

796 sc_max="sum", 

797 phases="sum", 

798 qa_max="sum", 

799 qb_max="sum", 

800 qc_max="sum", 

801 qa_min="sum", 

802 qb_min="sum", 

803 qc_min="sum", 

804 control_variable="first", 

805 ) 

806 ) 

807 return gen_df 

808 

809 def get_cap_data(self) -> pd.DataFrame: 

810 s_base = self.s_base 

811 flag = self.dss.Capacitors.First() 

812 cap_data = [] 

813 while flag: 

814 cap_bus_name = self.dss.CktElement.BusNames()[0].split(".")[0] 

815 cap_bus_phases = self.dss.CktElement.BusNames()[0].split(".")[1:] 

816 

817 # convert this to string to be consistent with how we conver num to phase letters 

818 cap_bus_phases = str([int(phase) for phase in cap_bus_phases]) 

819 if cap_bus_phases == "[]": 

820 # three phases are usually represented by either .1.2.3 or nothing in opendss 

821 # for second case we should ensure that 3 phase is actually represented 

822 cap_bus_phases = "[1, 2, 3]" 

823 

824 cap_phase = self.num_phase_map[cap_bus_phases] 

825 

826 if cap_phase != "abc": 

827 each_cap = dict( 

828 id=self.bus_names_to_index_map[cap_bus_name], 

829 name=cap_bus_name, 

830 qa=( 

831 self.dss.Capacitors.kvar() * 1000 / s_base 

832 if cap_phase in {"a"} 

833 else 0 

834 ), 

835 qb=( 

836 self.dss.Capacitors.kvar() * 1000 / s_base 

837 if cap_phase in {"b"} 

838 else 0 

839 ), 

840 qc=( 

841 self.dss.Capacitors.kvar() * 1000 / s_base 

842 if cap_phase in {"c"} 

843 else 0 

844 ), 

845 phases=cap_phase, 

846 ) 

847 else: 

848 each_cap = dict( 

849 id=self.bus_names_to_index_map[cap_bus_name], 

850 name=cap_bus_name, 

851 qa=(self.dss.Capacitors.kvar() * 1000 / 3) / s_base, 

852 qb=(self.dss.Capacitors.kvar() * 1000 / 3) / s_base, 

853 qc=(self.dss.Capacitors.kvar() * 1000 / 3) / s_base, 

854 phases=cap_phase, 

855 ) 

856 

857 cap_data.append(each_cap) 

858 flag = self.dss.Capacitors.Next() 

859 cap_df = pd.DataFrame(cap_data) 

860 if len(cap_data) < 1: 

861 cap_df = pd.DataFrame( 

862 { 

863 "id": [], 

864 "name": [], 

865 "qa": [], 

866 "qb": [], 

867 "qc": [], 

868 "phases": [], 

869 } 

870 ) 

871 

872 cap_df = cap_df.groupby(by=["id"], as_index=False).agg( 

873 dict( 

874 id="first", 

875 name="first", 

876 qa="sum", 

877 qb="sum", 

878 qc="sum", 

879 phases="sum", 

880 ) 

881 ) 

882 return cap_df 

883 

884 def get_reg_data(self) -> pd.DataFrame: 

885 # s_base = self.s_base 

886 reg_data = [] 

887 reg_control_names = self.dss.RegControls.AllNames() 

888 reg_names = [] 

889 if len(reg_control_names) != 0: 

890 dss_reg_df = self.dss.utils.regcontrols_to_dataframe() 

891 reg_names = dss_reg_df.Transformer.to_list() 

892 flag = self.dss.Transformers.First() 

893 while flag: 

894 element_type = self.dss.CktElement.Name().lower().split(".")[0] 

895 element_name = self.dss.CktElement.Name().lower().split(".")[1] 

896 if element_type not in ["transformer"]: 

897 flag = self.dss.Transformers.Next() 

898 continue 

899 if element_name not in reg_names: 

900 flag = self.dss.Transformers.Next() 

901 continue 

902 bus1 = self.dss.CktElement.BusNames()[0].split(".")[0] 

903 bus2 = self.dss.CktElement.BusNames()[-1].split(".")[0] 

904 fb = self.bus_names_to_index_map[bus1] 

905 tb = self.bus_names_to_index_map[bus2] 

906 tap_direction = 1 

907 if fb > tb: 

908 fb, tb = tb, fb 

909 bus1, bus2 = bus2, bus1 

910 tap_direction = -1 

911 self.dss.Circuit.SetActiveBus(bus2) 

912 line_phases = self.dss.CktElement.BusNames()[0].split(".")[1:] 

913 line_phases = sorted(line_phases) 

914 

915 # convert this to string to be consistent with how we conver num to phase letters 

916 line_phases = str([int(phase) for phase in line_phases]) 

917 if line_phases == "[]": 

918 # three phases are usually represented by either .1.2.3 or nothing in opendss 

919 # for second case we should ensure that 3 phase is actually represented 

920 line_phases = "[1, 2, 3]" 

921 line_phase = self.num_phase_map[line_phases] 

922 ratio = self.dss.Transformers.Tap() 

923 tap = (ratio - 1) / 0.00625 * tap_direction 

924 each_reg = {} 

925 each_reg["fb"] = self.bus_names_to_index_map[bus1] 

926 each_reg["tb"] = self.bus_names_to_index_map[bus2] 

927 each_reg["from_name"] = bus1 

928 each_reg["to_name"] = bus2 

929 for ph in line_phase: 

930 each_reg[f"tap_{ph}"] = int(round(tap)) 

931 each_reg["phases"] = line_phase 

932 reg_data.append(each_reg) 

933 

934 flag = self.dss.Transformers.Next() 

935 

936 # combine lines between identical buses. 

937 reg_df = pd.DataFrame(reg_data) 

938 if len(reg_data) < 1: 

939 reg_df = pd.DataFrame( 

940 { 

941 "fb": [], 

942 "tb": [], 

943 "from_name": [], 

944 "to_name": [], 

945 "tap_a": [], 

946 "tap_b": [], 

947 "tap_c": [], 

948 "phases": [], 

949 } 

950 ) 

951 reg_df = reg_df.groupby(["fb", "tb"]).agg( 

952 { 

953 "fb": "first", 

954 "tb": "first", 

955 "from_name": "first", 

956 "to_name": "first", 

957 "tap_a": "max", 

958 "tap_b": "max", 

959 "tap_c": "max", 

960 "phases": "sum", 

961 } 

962 ) 

963 reg_df = reg_df.reset_index(drop=True) 

964 reg_df = reg_df.sort_values(by="tb", ignore_index=True).fillna(1) 

965 # reg_df["tap_a"] = (reg_df["ratio_a"] - 1) / 0.00625 

966 # reg_df["tap_b"] = (reg_df["ratio_b"] - 1) / 0.00625 

967 # reg_df["tap_c"] = (reg_df["ratio_c"] - 1) / 0.00625 

968 return reg_df 

969 

970 def _get_loads(self) -> pd.DataFrame: 

971 """Extract load information for each node for each phase. This method extracts load on the exact bus(node) as 

972 modeled in the distribution model, including secondary. 

973 

974 Returns: 

975 load_per_phase(pd.DataFrame): Per phase load data in a pandas dataframe 

976 """ 

977 s_base = self.s_base 

978 load_df = pd.DataFrame( 

979 [], columns=["id", "name", "pl_a", "ql_a", "pl_b", "ql_b", "pl_c", "ql_c"] 

980 ) 

981 loads_flag = self.dss.Loads.First() 

982 load_data = [] 

983 model_to_cvr_map = { 

984 1: (0, 0), 

985 2: (2, 2), 

986 3: (0, 2), 

987 5: (1, 1), 

988 6: (0, 0), 

989 7: (0, 2), 

990 } 

991 while loads_flag: 

992 connected_buses = self.dss.CktElement.BusNames() 

993 if len(connected_buses) > 1: 

994 raise Exception("Multiple connected buses") 

995 model = self.dss.Loads.Model() 

996 cvr_p, cvr_q = model_to_cvr_map.get(model, 0) 

997 if model == 4: # exponential model 

998 cvr_p = self.dss.Loads.CVRwatts() 

999 cvr_q = self.dss.Loads.CVRvars() 

1000 if model == 8: # zip model 

1001 zipv = self.dss.Loads.ZipV() 

1002 cvr_p = 2 * zipv[0] + zipv[1] 

1003 cvr_q = 2 * zipv[3] + zipv[4] 

1004 bus = connected_buses[0] 

1005 bus_name = bus.split(".")[0] 

1006 each_load = { 

1007 "id": 0, 

1008 "pl_a": 0, 

1009 "ql_a": 0, 

1010 "pl_b": 0, 

1011 "ql_b": 0, 

1012 "pl_c": 0, 

1013 "ql_c": 0, 

1014 "cvr_p": cvr_p, 

1015 "cvr_q": cvr_q, 

1016 } 

1017 bus_split = bus.split(".") 

1018 each_load["id"] = self.bus_names_to_index_map[bus_name] 

1019 connected_phase_secondary = bus_split[1:] 

1020 

1021 # conductor power contains info on active and reactive power 

1022 conductor_power = np.array(self.dss.CktElement.Powers()) 

1023 # nonzero_power_indices = np.where(conductor_power != 0)[0] 

1024 # nonzero_power = conductor_power[nonzero_power_indices] 

1025 # Extract P and Q values (every alternate elements) 

1026 # a1 = np.exp(-1 / 6 * 1j * np.pi) 

1027 # a2 = np.exp(-5 / 6 * 1j * np.pi) 

1028 p_values = conductor_power[::2] 

1029 q_values = conductor_power[1::2] 

1030 phases = "abc" 

1031 # n_phases = self.dss.Loads.Phases() 

1032 # pf = self.dss.Loads.PF() 

1033 # kw = self.dss.Loads.kW() 

1034 # kv = self.dss.Loads.kV() 

1035 # kvar = self.dss.Loads.kvar() 

1036 # is_delta = self.dss.Loads.IsDelta() 

1037 if len(connected_phase_secondary) > 0: 

1038 phases = "".join("abc"[int(n) - 1] for n in connected_phase_secondary) 

1039 # if len(phases) != n_phases: 

1040 # raise Exception("Number of load phases does not match with bus phases") 

1041 for phase_index, ph in enumerate(phases): 

1042 each_load[f"pl_{ph}"] = p_values[phase_index] * 1000 / s_base 

1043 each_load[f"ql_{ph}"] = q_values[phase_index] * 1000 / s_base 

1044 load_data.append(each_load) 

1045 loads_flag = self.dss.Loads.Next() 

1046 load_df = pd.DataFrame(load_data) 

1047 load_df = load_df.groupby("id").agg( 

1048 { 

1049 "id": "first", 

1050 "pl_a": "sum", 

1051 "ql_a": "sum", 

1052 "pl_b": "sum", 

1053 "ql_b": "sum", 

1054 "pl_c": "sum", 

1055 "ql_c": "sum", 

1056 } 

1057 ) 

1058 load_df = load_df.fillna(0) 

1059 return load_df.reset_index(drop=True) 

1060 

1061 def to_csv(self, dir_name: Optional[str] = None, overwrite: bool = True) -> None: 

1062 if dir_name is None: 

1063 dir_name = "testfiles" 

1064 

1065 Path(dir_name).mkdir(parents=True, exist_ok=overwrite) 

1066 self.branch_data.to_csv(f"{dir_name}/branch_data.csv", index=False) 

1067 self.bus_data.to_csv(f"{dir_name}/bus_data.csv", index=False) 

1068 self.cap_data.to_csv(f"{dir_name}/cap_data.csv", index=False) 

1069 self.gen_data.to_csv(f"{dir_name}/gen_data.csv", index=False) 

1070 self.reg_data.to_csv(f"{dir_name}/reg_data.csv", index=False) 

1071 

1072 def update_gen_q(self, q: pd.DataFrame): 

1073 flag = self.dss.Generators.First() 

1074 while flag: 

1075 bus_phases = np.array( 

1076 self.dss.CktElement.BusNames()[0].split(".")[1:] 

1077 ).astype(int) 

1078 n_phases = len(bus_phases) 

1079 if len(bus_phases) == 0 or len(bus_phases) >= 3: 

1080 n_phases = 3 

1081 active_phases = np.array([0, 1, 2]) 

1082 if n_phases < 3: 

1083 active_phases = bus_phases - 1 

1084 phase_columns = ["abc"[ph_idx] for ph_idx in active_phases] 

1085 bus = self.dss.Generators.Bus1().split(".")[0] 

1086 bus_id = self.bus_names_to_index_map[bus] 

1087 kvar = q.loc[bus_id, phase_columns].sum() * self.s_base / 1000 

1088 self.dss.Generators.kvar(kvar) 

1089 flag = self.dss.Generators.Next()