Coverage for src/distopf/dssconverter/dssparser.py: 0%
536 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-09 17:44 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-09 17:44 -0700
1from __future__ import annotations
3from functools import cache
4from pathlib import Path
5import networkx as nx
6import numpy as np
7import opendssdirect as dss
8import pandas as pd
11class DSSParser:
12 def __init__(
13 self,
14 dssfile: (str, Path),
15 s_base: float = 1e6,
16 v_min: float = 0.95,
17 v_max: float = 1.05,
18 cvr_p: float = 0,
19 cvr_q: float = 0,
20 ) -> None:
21 self.dss = dss
22 self.dssfile = dssfile
23 self.dss.Text.Command(f"Redirect {self.dssfile}")
24 # if self.dss.Topology.NumLoops() > 0:
25 # raise ValueError("Toplogy must be radial; topology has .")
26 self.s_base = s_base
27 self.v_min = v_min
28 self.v_max = v_max
29 self.cvr_p = cvr_p
30 self.cvr_q = cvr_q
31 self.bus_names = self.get_bus_names()
32 # get dataframes and results
33 self.branch_data = self.get_branch_data()
34 self.bus_data = self.get_bus_data()
35 self.gen_data = self.get_gen_data()
36 self.cap_data = self.get_cap_data()
37 self.reg_data = self.get_reg_data()
38 self.v_solved = self.get_v_solved()
39 self.s_solved = self.get_apparent_power_flows()
41 def update(self) -> None:
42 self.dss.Solution.Solve()
43 self.bus_names = self.get_bus_names()
44 # get dataframes and results
45 self.branch_data = self.get_branch_data()
46 self.bus_data = self.get_bus_data()
47 self.gen_data = self.get_gen_data()
48 self.cap_data = self.get_cap_data()
49 self.reg_data = self.get_reg_data()
50 self.v_solved = self.get_v_solved()
51 self.s_solved = self.get_apparent_power_flows()
53 @cache
54 def get_bus_names(self) -> list[str]:
55 """Access all the bus (node) names from the circuit
57 Returns:
58 list[str]: list of all the bus names
59 """
61 flag = self.dss.PDElements.First()
62 branches = []
63 while flag:
64 element_type = self.dss.CktElement.Name().lower().split(".")[0]
65 if element_type not in ["line", "transformer", "reactor"]:
66 flag = self.dss.PDElements.Next()
67 continue
69 if element_type == "line" and self.dss.Lines.IsSwitch():
70 element_type = "switch"
71 switch_status = (
72 "OPEN"
73 if (
74 self.dss.CktElement.IsOpen(1, 0)
75 or self.dss.CktElement.IsOpen(2, 0)
76 )
77 else "CLOSED"
78 )
79 if switch_status == "OPEN":
80 flag = self.dss.PDElements.Next()
81 continue
82 bus1 = self.dss.CktElement.BusNames()[0].split(".")[0]
83 bus2 = self.dss.CktElement.BusNames()[1].split(".")[0]
84 branches.append((bus1, bus2))
85 self.dss.Circuit.SetActiveBus(bus2)
86 flag = self.dss.PDElements.Next()
87 g = nx.Graph()
88 g.add_edges_from(set(branches))
89 node_list = nx.dfs_preorder_nodes(g, self.source)
90 node_list = list(node_list)
91 return node_list
93 @property
94 @cache
95 def bus_names_to_index_map(self) -> dict[str, int]:
96 """each of the bus mapped to its corresponding index in the bus names list
98 Returns:
99 dict[str,int]: dictionary with key as bus names and value as its index
100 """
101 _map = {bus: index + 1 for index, bus in enumerate(self.bus_names)}
102 return _map
104 def bus_names_to_index_map_fun(self, bus: str) -> int:
105 return self.bus_names_to_index_map[bus]
107 @property
108 def basekV_LL(self) -> float:
109 """Returns basekV (line to line) of the circuit based on the sourcebus
111 Returns:
112 float: base kV of the circuit as referred to the source bus
113 """
114 # make the source bus active before accessing the base kV since there is no provision to get base kV of circuit
115 self.dss.Circuit.SetActiveBus(self.source)
116 return round(self.dss.Bus.kVBase() * np.sqrt(3), 2)
118 @property
119 def source(self) -> str:
120 """source bus of the circuit.
122 Returns:
123 str: returns the source bus of the circuit
124 """
125 # typically the first bus is the source bus
126 self.dss.Vsources.First()
127 return self.dss.CktElement.BusNames()[0].split(".")[0]
129 @property
130 # @cache
131 def gen_buses(self) -> set[str]:
132 flag = self.dss.Generators.First()
133 gen_buses = set()
134 while flag:
135 gen_buses.add(self.dss.Generators.Bus1().split(".")[0])
136 flag = self.dss.Generators.Next()
137 return gen_buses
139 @property
140 # @cache
141 def cap_buses(self) -> set[str]:
142 flag = self.dss.Capacitors.First()
143 cap_buses = set()
144 while flag:
145 cap_buses.add(self.dss.CktElement.BusNames()[0].split(".")[0])
146 flag = self.dss.Capacitors.Next()
147 return cap_buses
149 @property
150 # @cache
151 def load_buses(self) -> set[str]:
152 flag = self.dss.Loads.First()
153 load_buses = set()
154 while flag:
155 load_buses.add(self.dss.CktElement.BusNames()[0].split(".")[0])
156 flag = self.dss.Loads.Next()
157 return load_buses
159 @property
160 def num_phase_map(self) -> dict[str, str]:
161 # opendss provides nodes phase in number format so we convert it to letter format
162 num_phase_mapper = {
163 "[1]": "a",
164 "[2]": "b",
165 "[3]": "c",
166 "[1, 2]": "ab",
167 "[1, 3]": "ac",
168 "[2, 3]": "bc",
169 "[1, 2, 3]": "abc",
170 "[1, 2, 3, 4]": "abc", # excluding 4th node
171 }
172 return num_phase_mapper
174 def get_v_solved(self) -> pd.DataFrame:
175 va = pd.DataFrame(
176 {
177 "name": [
178 name.split(".")[0]
179 for name in self.dss.Circuit.AllNodeNamesByPhase(1)
180 ],
181 "a": self.dss.Circuit.AllNodeVmagPUByPhase(1),
182 }
183 )
184 vb = pd.DataFrame(
185 {
186 "name": [
187 name.split(".")[0]
188 for name in self.dss.Circuit.AllNodeNamesByPhase(2)
189 ],
190 "b": self.dss.Circuit.AllNodeVmagPUByPhase(2),
191 }
192 )
193 vc = pd.DataFrame(
194 {
195 "name": [
196 name.split(".")[0]
197 for name in self.dss.Circuit.AllNodeNamesByPhase(3)
198 ],
199 "c": self.dss.Circuit.AllNodeVmagPUByPhase(3),
200 }
201 )
202 v_df = pd.merge(va, vb, on="name", how="outer")
203 v_df = pd.merge(v_df, vc, on="name", how="outer")
204 v_df.index = v_df.name.apply(self.bus_names_to_index_map_fun)
205 v_df = v_df.sort_index()
206 return v_df
208 def get_apparent_power_flows(self) -> pd.DataFrame:
209 s_base = self.s_base
210 flag = self.dss.PDElements.First()
211 power_data = []
212 while flag:
213 element_type = self.dss.CktElement.Name().lower().split(".")[0]
214 is_open = [
215 self.dss.CktElement.IsOpen(0, ph)
216 for ph in range(self.dss.CktElement.NumPhases())
217 ]
218 if all(is_open):
219 flag = self.dss.PDElements.Next()
220 continue
221 s_out = self._get_powers() * 1000 / self.s_base
222 if element_type not in ["line", "transformer", "reactor"]:
223 flag = self.dss.PDElements.Next()
224 continue
225 bus1 = self.dss.CktElement.BusNames()[0].split(".")[0]
226 bus2 = self.dss.CktElement.BusNames()[1].split(".")[0]
227 self.dss.Circuit.SetActiveBus(bus2)
229 each_power = dict(
230 fb=self.bus_names_to_index_map[bus1],
231 tb=self.bus_names_to_index_map[bus2],
232 from_name=bus1,
233 to_name=bus2,
234 a=s_out[0],
235 b=s_out[1],
236 c=s_out[2],
237 )
239 power_data.append(each_power)
240 flag = self.dss.PDElements.Next()
242 # combine lines between identical buses.
243 power_df = pd.DataFrame(power_data)
244 power_df.fb = power_df.fb.astype(int)
245 power_df.tb = power_df.tb.astype(int)
246 power_df = (
247 power_df.groupby(by=["fb", "tb"], as_index=False)
248 .agg(
249 {
250 "fb": "first",
251 "tb": "first",
252 "from_name": "first",
253 "to_name": "first",
254 "a": "sum",
255 "b": "sum",
256 "c": "sum",
257 }
258 )
259 .reset_index(drop=True)
260 .sort_values(by=["fb"], ignore_index=True)
261 .sort_values(by=["tb"], ignore_index=True)
262 )
264 return power_df
266 def _get_line_zmatrix(self) -> tuple[np.ndarray, np.ndarray]:
267 """Returns the z_matrix of a specified line element.
269 Returns:
270 real z_matrix, imag z_matrix (np.ndarray, np.ndarray): 3x3 numpy array of the z_matrix corresponding to the each of the phases(real,imag)
271 """
272 n_phases = self.dss.Lines.Phases()
274 z_matrix_real = np.zeros((3, 3))
275 z_matrix_imag = np.zeros((3, 3))
276 if n_phases > 3:
277 pass
278 if (len(self.dss.CktElement.BusNames()[0].split(".")) == 4) or (
279 len(self.dss.CktElement.BusNames()[0].split(".")) == 1
280 ):
281 # this is the condition check for three phase since three phase is either represented by bus_name.1.2.3 or bus_name
282 z_matrix = (
283 np.array(self.dss.Lines.RMatrix())
284 + 1j * np.array(self.dss.Lines.XMatrix())
285 ) * self.dss.Lines.Length()
287 z_matrix = z_matrix.reshape(3, 3)
289 return np.real(z_matrix), np.imag(z_matrix)
291 else:
292 # for other than 3 phases
293 active_phases = [
294 int(phase) for phase in self.dss.CktElement.BusNames()[0].split(".")[1:]
295 ]
296 z_matrix = np.zeros((3, 3), dtype=complex)
297 r_matrix = self.dss.Lines.RMatrix()
298 x_matrix = self.dss.Lines.XMatrix()
299 counter = 0
300 for _, row in enumerate(active_phases):
301 for _, col in enumerate(active_phases):
302 z_matrix[row - 1, col - 1] = (
303 complex(r_matrix[counter], x_matrix[counter])
304 * self.dss.Lines.Length()
305 )
306 counter = counter + 1
308 return np.real(z_matrix), np.imag(z_matrix)
310 def _get_reactor_zmatrix(self) -> tuple[np.ndarray, np.ndarray]:
311 """Returns the z_matrix of a specified reactor element.
313 Returns:
314 real z_matrix, imag z_matrix (np.ndarray, np.ndarray): 3x3 numpy array of the z_matrix corresponding to the each of the phases(real,imag)
315 """
316 n_phases = self.dss.Reactors.Phases()
317 if n_phases == 3:
318 return np.eye(3) * self.dss.Reactors.R(), np.eye(3) * self.dss.Reactors.X()
320 else:
321 # for other than 3 phases
322 raise NotImplementedError(
323 "Parsing reactors with phases other than 3 not implemented"
324 )
325 # active_phases = [
326 # int(phase) for phase in self.dss.CktElement.BusNames()[0].split(".")[1:]
327 # ]
328 # z_matrix = np.zeros((3, 3), dtype=complex)
329 # r_matrix = self.dss.Reactors.R()
330 # x_matrix = self.dss.Reactors.X()
331 # counter = 0
332 # for _, row in enumerate(active_phases):
333 # for _, col in enumerate(active_phases):
334 # z_matrix[row - 1, col - 1] = (
335 # complex(r_matrix[counter], x_matrix[counter])
336 # * self.dss.Lines.Length()
337 # )
338 # counter = counter + 1
339 #
340 # return np.real(z_matrix), np.imag(z_matrix)
342 def _get_powers(self):
343 n_phases = self.dss.CktElement.NumPhases()
344 pq = np.array(self.dss.CktElement.Powers())
345 n_terminals = self.dss.CktElement.NumTerminals()
346 n_pq_phases = len(pq) // n_terminals // 2
347 pq = pq.reshape(int(n_pq_phases * n_terminals), 2)
348 s_out = np.zeros(3, dtype=complex)
349 active_phases = np.array([0, 1, 2])
350 if n_phases < 3:
351 active_phases = (
352 np.array(self.dss.CktElement.BusNames()[0].split(".")[1:]).astype(int)
353 - 1
354 )
356 p = pq[:, 0]
357 q = pq[:, 1]
358 s = p + 1j * q
359 s_out_ = -s[n_pq_phases:]
360 s_out[active_phases] = s_out_[:n_phases]
361 return s_out
363 def get_branch_data(self) -> pd.DataFrame:
364 s_base = self.s_base
365 flag = self.dss.PDElements.First()
366 line_data = []
367 power_data = []
368 while flag:
369 switch_status = None
370 element_type = self.dss.CktElement.Name().lower().split(".")[0]
371 element_name = self.dss.CktElement.Name().lower().split(".")[1]
372 z_matrix_real = np.zeros((3, 3))
373 z_matrix_imag = np.zeros((3, 3))
374 if element_type not in ["line", "transformer", "reactor"]:
375 flag = self.dss.PDElements.Next()
376 continue
377 if element_type == "transformer":
378 is_delta = self.dss.Transformers.IsDelta()
379 n_windings = self.dss.Transformers.NumWindings()
380 r_xfmr = 0
381 x_xfmr = 0
382 n_phases = self.dss.CktElement.NumPhases()
383 n_terminals = self.dss.CktElement.NumTerminals()
384 y_prime_flat = np.array(self.dss.CktElement.YPrim())
385 y_prim = y_prime_flat[::2] + 1j * y_prime_flat[1::2]
386 y_shape = int(np.sqrt(len(y_prim)))
387 y_prim = np.reshape(y_prim, (y_shape, y_shape))
388 n_y11 = int(y_shape / 2)
389 v_all = np.array(self.dss.CktElement.Voltages())
390 v_all = v_all[::2] + 1j * v_all[1::2]
391 v1 = v_all[: len(v_all) // 2]
392 v2 = v_all[len(v_all) // 2 :]
393 i_all = np.array(self.dss.CktElement.Currents())
394 i_all = i_all[::2] + 1j * i_all[1::2]
395 i1 = i_all[: len(i_all) // 2]
396 i2 = i_all[len(i_all) // 2 :]
397 self.dss.Transformers.Wdg(1)
398 # v1 = np.array(self.dss.Transformers.WdgVoltages())
399 # v1 = v1[::2] + 1j * v1[1::2]
400 kv_h = self.dss.Transformers.kV()
401 self.dss.Transformers.Wdg(2)
402 # v2 = np.array(self.dss.Transformers.WdgVoltages())
403 # v2 = v2[::2] + 1j * v2[1::2]
404 kv_l = self.dss.Transformers.kV()
405 n = kv_h / kv_l
406 y11 = y_prim[:n_y11, :n_y11] * n**2
407 y12 = y_prim[:n_y11, n_y11:] * n
408 y21 = y_prim[n_y11:, :n_y11]
409 y22 = y_prim[n_y11:, n_y11:]
410 y_prim_l = np.r_[np.c_[y11, y12], np.c_[y21, y22]]
411 # z_prim_l = np.linalg.inv(y_prim_l)
412 # z11 = z_prim_l[:n_y11, :n_y11]
413 # z12 = z_prim_l[:n_y11, n_y11:]
414 # z21 = z_prim_l[n_y11:, :n_y11]
415 # z22 = z_prim_l[n_y11:, n_y11:]
416 i_all = np.array(self.dss.Transformers.WdgCurrents())
417 i_in = i_all[::2]
418 i_all = i_all[::2] + i_all[1::2] * 1j
419 i_in = i_all[::2]
420 i_out = i_all[1::2]
421 # i1_in = i_in[::2]
422 # i2_in = i_in[1::2]
423 # i2_out = i_out[1::2]
424 # zabc = (v1[:n_phases] / n - v2[:n_phases]) / -i2[:n_phases]
425 # TODO: tranformer model may be wrong but the 13bus results look better with it.
426 # if is_delta:
427 # raise Warning("Delta transformer not implemented")
428 # if n_windings != 2:
429 #
430 # for i_wdg in range(1, n_windings + 1):
431 # self.dss.Transformers.Wdg(i_wdg)
432 v_base_xfmr = self.dss.Transformers.kV() / np.sqrt(3) * 1000
433 s_base_xfmr = self.dss.Transformers.kVA() * 1000 / 3
434 z_base_xfmr = v_base_xfmr**2 / s_base_xfmr
436 x_xfmr = self.dss.Transformers.Xhl() / 100 * z_base_xfmr
437 r_xfmr = self.dss.Transformers.R() / 100 * z_base_xfmr * 2
438 z_matrix_real[0, 0] = r_xfmr
439 z_matrix_real[1, 1] = r_xfmr
440 z_matrix_real[2, 2] = r_xfmr
441 z_matrix_imag[0, 0] = x_xfmr
442 z_matrix_imag[1, 1] = x_xfmr
443 z_matrix_imag[2, 2] = x_xfmr
444 pass
445 if element_type == "line":
446 element_name = self.dss.Lines.Name()
447 z_matrix_real, z_matrix_imag = self._get_line_zmatrix()
449 if self.dss.Lines.IsSwitch():
450 element_type = "switch"
451 switch_status = (
452 "OPEN"
453 if (
454 self.dss.CktElement.IsOpen(1, 0)
455 or self.dss.CktElement.IsOpen(2, 0)
456 )
457 else "CLOSED"
458 )
459 if element_type == "reactor":
460 element_name = self.dss.Reactors.Name()
461 z_matrix_real, z_matrix_imag = self._get_reactor_zmatrix()
462 bus1 = self.dss.CktElement.BusNames()[0].split(".")[0]
463 bus2 = self.dss.CktElement.BusNames()[1].split(".")[0]
464 fb = self.bus_names_to_index_map[bus1]
465 tb = self.bus_names_to_index_map[bus2]
466 if fb > tb:
467 fb, tb = tb, fb
468 bus1, bus2 = bus2, bus1
469 self.dss.Circuit.SetActiveBus(bus2)
470 base_kv_ln = self.dss.Bus.kVBase()
471 z_base = (base_kv_ln * 1000) ** 2 / s_base
472 line_phases = self.dss.CktElement.BusNames()[0].split(".")[1:]
473 line_phases = sorted(line_phases)
474 phases = "abc"
475 n_phases = self.dss.CktElement.NumPhases()
476 if n_phases < 3:
477 active_phases = self.dss.CktElement.BusNames()[0].split(".")[1:]
478 active_phases = np.array(active_phases).astype(int) - 1
479 phases = "".join("abc"[i] for i in active_phases)
480 each_line = dict(
481 fb=fb,
482 tb=tb,
483 from_name=bus1,
484 to_name=bus2,
485 raa=z_matrix_real[0, 0] / z_base,
486 rab=z_matrix_real[0, 1] / z_base,
487 rac=z_matrix_real[0, 2] / z_base,
488 rbb=z_matrix_real[1, 1] / z_base,
489 rbc=z_matrix_real[1, 2] / z_base,
490 rcc=z_matrix_real[2, 2] / z_base,
491 xaa=z_matrix_imag[0, 0] / z_base,
492 xab=z_matrix_imag[0, 1] / z_base,
493 xac=z_matrix_imag[0, 2] / z_base,
494 xbb=z_matrix_imag[1, 1] / z_base,
495 xbc=z_matrix_imag[1, 2] / z_base,
496 xcc=z_matrix_imag[2, 2] / z_base,
497 type=element_type,
498 name=element_name,
499 status=switch_status,
500 s_base=s_base,
501 v_ln_base=base_kv_ln * 1000,
502 z_base=z_base,
503 phases=phases,
504 )
505 line_data.append(each_line)
506 flag = self.dss.PDElements.Next()
508 # combine lines between identical buses.
509 branch_df = pd.DataFrame(line_data)
510 branch_df = (
511 branch_df.groupby(by=["fb", "tb"], as_index=False)
512 .agg(
513 {
514 "fb": "max",
515 "tb": "max",
516 "from_name": "first",
517 "to_name": "first",
518 "raa": "sum",
519 "rab": "sum",
520 "rac": "sum",
521 "rbb": "sum",
522 "rbc": "sum",
523 "rcc": "sum",
524 "xaa": "sum",
525 "xab": "sum",
526 "xac": "sum",
527 "xbb": "sum",
528 "xbc": "sum",
529 "xcc": "sum",
530 "type": "first",
531 "name": "sum",
532 "status": "first",
533 "s_base": "first",
534 "v_ln_base": "first",
535 "z_base": "first",
536 "phases": "sum",
537 }
538 )
539 .sort_values(by=["tb", "fb"], ignore_index=True)
540 .reset_index(drop=True)
541 )
542 return branch_df
544 def get_bus_data(self) -> pd.DataFrame:
545 """Extract the bus data from the distribution model.
547 Args:
548 source_voltage (float, optional): Voltage of the source (for all phases) in per unit (pu). Defaults to 1.0.
549 s_base (float, optional): MVA base of the system (in VA). Defaults to 1000000 (or 1 MVA).
550 v_min (float, optional): minimum voltage limit of the system in pu. Defaults to 0.95.
551 v_max (float, optional): maximum voltage limit of the system in pu. Defaults to 1.05.
552 cvr_p (float, optional): conservative voltage reduction parameter for p (0 means no voltage dependence). Defaults to 0.
553 cvr_q (float, optional): conservative voltage reduction parameter for q (0 means no voltage dependence). Defaults to 0.
555 Returns:
556 pd.DataFrame: bus data in DataFrame format
557 """
558 source_voltage = self.dss.Vsources.PU()
559 s_base = self.s_base
560 v_min = self.v_min
561 v_max = self.v_max
562 cvr_p = self.cvr_p
563 cvr_q = self.cvr_q
564 all_buses_names = self.dss.Circuit.AllBusNames()
565 # all_loads = self.get_loads()
566 load_df = self._get_loads()
567 bus_data = []
568 for bus_id, bus in enumerate(all_buses_names):
569 # need to set the nodes active before extracting their info
570 self.dss.Circuit.SetActiveBus(bus)
571 bus_type = "PQ"
572 v = 1
573 if (
574 len(self.dss.Bus.AllPCEatBus()) > 0
575 and "Vsource" in self.dss.Bus.AllPCEatBus()[0]
576 ):
577 v = source_voltage
578 bus_type = "SWING"
579 active_bus_name = self.dss.Bus.Name()
580 v_ln_base = self.dss.Bus.kVBase() * 1000
581 each_bus = dict(
582 id=self.bus_names_to_index_map[bus], # bus id for each active bus
583 name=active_bus_name, # name of the active bus
584 bus_type=bus_type, # SWING if source else PQ
585 v_a=v, # p.u. voltage of the active bus in phase a
586 v_b=v, # p.u. voltage of the active bus in phase b
587 v_c=v, # p.u. voltage of the active bus in phase c
588 v_ln_base=v_ln_base, # line-to-phase voltage base of the active bus
589 s_base=s_base, # s_base of the system
590 v_min=v_min, # minimum p.u. voltage for the bus
591 v_max=v_max, # maximum p.u. voltage for the bus
592 cvr_p=cvr_p, # conservative voltage reduction parameter for active power
593 cvr_q=cvr_q, # conservative voltage reduction parameter for reactive power
594 phases=self.num_phase_map[
595 str(self.dss.Bus.Nodes())
596 ], # bus phases a,b,c
597 has_gen=(
598 True if active_bus_name in self.gen_buses else False
599 ), # if the bus has a generator or not
600 has_load=(
601 True if active_bus_name in self.load_buses else False
602 ), # if the bus has a load or not
603 has_cap=(
604 True if active_bus_name in self.cap_buses else False
605 ), # if the bus has a capacitor or not
606 # be careful that X gives you lon and Y gives you lat
607 # extra elements
608 latitude=self.dss.Bus.Y(), # latitude of the bus location (Y)
609 longitude=self.dss.Bus.X(),
610 ) # longitude of the bus location (X)
611 bus_data.append(each_bus)
612 bus_df = pd.DataFrame(bus_data)
613 bus_df = (
614 pd.merge(load_df, bus_df, on=["id"], how="outer")
615 .sort_values(by="id", ignore_index=True)
616 .fillna(0)
617 )
618 return bus_df
620 def get_gen_data(self) -> pd.DataFrame:
621 s_base = self.s_base
622 generator_flag = self.dss.Generators.First()
623 gen_data = []
625 while generator_flag:
626 bus_phases = self.dss.CktElement.BusNames()[0].split(".")[1:]
627 n_phases = len(bus_phases)
628 if len(bus_phases) == 0 or len(bus_phases) >= 3:
629 n_phases = 3
630 active_phases = np.array([0, 1, 2])
631 if n_phases < 3:
632 active_phases = (
633 np.array(self.dss.CktElement.BusNames()[0].split(".")[1:]).astype(
634 int
635 )
636 - 1
637 )
639 active_power_per_phase = (
640 self.dss.Generators.kW() / n_phases * 1000
641 ) / s_base
642 reactive_power_per_phase = (
643 self.dss.Generators.kvar() / n_phases * 1000
644 ) / s_base
645 apparent_power_rating = (
646 self.dss.Generators.kVARated() / n_phases * 1000 / s_base
647 )
648 gen_name = self.dss.Generators.Name()
649 bus_name = self.dss.Generators.Bus1().split(".")[0]
650 each_gen = dict(
651 id=self.bus_names_to_index_map[bus_name],
652 name=gen_name,
653 )
654 phases = ""
655 for phase_id in active_phases:
656 ph = "abc"[phase_id]
657 each_gen[f"p{ph}"] = active_power_per_phase
658 phases = phases + ph
659 for phase_id in active_phases:
660 ph = "abc"[phase_id]
661 each_gen[f"q{ph}"] = reactive_power_per_phase
662 for phase_id in active_phases:
663 ph = "abc"[phase_id]
664 each_gen[f"s{ph}_max"] = apparent_power_rating
665 for ph in "abc":
666 if ph not in phases:
667 each_gen[f"p{ph}"] = 0
668 each_gen[f"q{ph}"] = 0
669 each_gen[f"s{ph}_max"] = 0
671 each_gen["phases"] = phases
673 each_gen["qa_max"] = (None,)
674 each_gen["qb_max"] = (None,)
675 each_gen["qc_max"] = (None,)
676 each_gen["qa_min"] = (None,)
677 each_gen["qb_min"] = (None,)
678 each_gen["qc_min"] = (None,)
680 gen_data.append(each_gen)
681 generator_flag = self.dss.Generators.Next()
683 pv_flag = self.dss.PVsystems.First()
684 while pv_flag:
685 bus_phases = self.dss.CktElement.BusNames()[0].split(".")[1:]
686 n_phases = len(bus_phases)
687 if len(bus_phases) == 0 or len(bus_phases) >= 3:
688 n_phases = 3
689 active_phases = np.array([0, 1, 2])
690 if n_phases < 3:
691 active_phases = (
692 np.array(self.dss.CktElement.BusNames()[0].split(".")[1:]).astype(
693 int
694 )
695 - 1
696 )
698 active_power_per_phase = (
699 self.dss.PVsystems.Pmpp() / n_phases * 1000
700 ) / s_base
701 reactive_power_per_phase = (
702 self.dss.PVsystems.kvar() / n_phases * 1000
703 ) / s_base
704 apparent_power_rating = (
705 self.dss.PVsystems.kVARated() / n_phases * 1000 / s_base
706 )
707 gen_name = self.dss.PVsystems.Name()
708 bus_name = self.dss.CktElement.BusNames()[0].split(".")[0]
709 each_gen = dict(
710 id=self.bus_names_to_index_map[bus_name],
711 name=gen_name,
712 )
713 phases = ""
714 for phase_id in active_phases:
715 ph = "abc"[phase_id]
716 each_gen[f"p{ph}"] = active_power_per_phase
717 phases = phases + ph
718 for phase_id in active_phases:
719 ph = "abc"[phase_id]
720 each_gen[f"q{ph}"] = reactive_power_per_phase
721 for phase_id in active_phases:
722 ph = "abc"[phase_id]
723 each_gen[f"s{ph}_max"] = apparent_power_rating
724 for ph in "abc":
725 if ph not in phases:
726 each_gen[f"p{ph}"] = 0
727 each_gen[f"q{ph}"] = 0
728 each_gen[f"s{ph}_max"] = 0
730 each_gen["phases"] = phases
732 each_gen["qa_max"] = (None,)
733 each_gen["qb_max"] = (None,)
734 each_gen["qc_max"] = (None,)
735 each_gen["qa_min"] = (None,)
736 each_gen["qb_min"] = (None,)
737 each_gen["qc_min"] = (None,)
739 gen_data.append(each_gen)
740 pv_flag = self.dss.PVsystems.Next()
742 gen_df = pd.DataFrame(gen_data)
743 if len(gen_data) < 1:
744 gen_df = pd.DataFrame(
745 {
746 "id": [],
747 "name": [],
748 "pa": [],
749 "pb": [],
750 "pc": [],
751 "qa": [],
752 "qb": [],
753 "qc": [],
754 "sa_max": [],
755 "sb_max": [],
756 "sc_max": [],
757 "phases": [],
758 "qa_max": [],
759 "qb_max": [],
760 "qc_max": [],
761 "qa_min": [],
762 "qb_min": [],
763 "qc_min": [],
764 }
765 )
766 gen_df = gen_df.groupby(by=["id"], as_index=False).agg(
767 dict(
768 id="first",
769 name="first",
770 pa="sum",
771 pb="sum",
772 pc="sum",
773 qa="sum",
774 qb="sum",
775 qc="sum",
776 sa_max="sum",
777 sb_max="sum",
778 sc_max="sum",
779 phases="sum",
780 qa_max="sum",
781 qb_max="sum",
782 qc_max="sum",
783 qa_min="sum",
784 qb_min="sum",
785 qc_min="sum",
786 )
787 )
788 return gen_df
790 def get_cap_data(self) -> pd.DataFrame:
791 s_base = self.s_base
792 flag = self.dss.Capacitors.First()
793 cap_data = []
794 while flag:
795 cap_bus_name = self.dss.CktElement.BusNames()[0].split(".")[0]
796 cap_bus_phases = self.dss.CktElement.BusNames()[0].split(".")[1:]
798 # convert this to string to be consistent with how we conver num to phase letters
799 cap_bus_phases = str([int(phase) for phase in cap_bus_phases])
800 if cap_bus_phases == "[]":
801 # three phases are usually represented by either .1.2.3 or nothing in opendss
802 # for second case we should ensure that 3 phase is actually represented
803 cap_bus_phases = "[1, 2, 3]"
805 cap_phase = self.num_phase_map[cap_bus_phases]
807 if cap_phase != "abc":
808 each_cap = dict(
809 id=self.bus_names_to_index_map[cap_bus_name],
810 name=cap_bus_name,
811 qa=(
812 self.dss.Capacitors.kvar() * 1000 / s_base
813 if cap_phase in {"a"}
814 else 0
815 ),
816 qb=(
817 self.dss.Capacitors.kvar() * 1000 / s_base
818 if cap_phase in {"b"}
819 else 0
820 ),
821 qc=(
822 self.dss.Capacitors.kvar() * 1000 / s_base
823 if cap_phase in {"c"}
824 else 0
825 ),
826 phases=cap_phase,
827 )
828 else:
829 each_cap = dict(
830 id=self.bus_names_to_index_map[cap_bus_name],
831 name=cap_bus_name,
832 qa=(self.dss.Capacitors.kvar() * 1000 / 3) / s_base,
833 qb=(self.dss.Capacitors.kvar() * 1000 / 3) / s_base,
834 qc=(self.dss.Capacitors.kvar() * 1000 / 3) / s_base,
835 phases=cap_phase,
836 )
838 cap_data.append(each_cap)
839 flag = self.dss.Capacitors.Next()
840 cap_df = pd.DataFrame(cap_data)
841 if len(cap_data) < 1:
842 cap_df = pd.DataFrame(
843 {
844 "id": [],
845 "name": [],
846 "qa": [],
847 "qb": [],
848 "qc": [],
849 "phases": [],
850 }
851 )
853 cap_df = cap_df.groupby(by=["id"], as_index=False).agg(
854 dict(
855 id="first",
856 name="first",
857 qa="sum",
858 qb="sum",
859 qc="sum",
860 phases="sum",
861 )
862 )
863 return cap_df
865 def get_reg_data(self) -> pd.DataFrame:
866 s_base = self.s_base
867 reg_data = []
868 reg_control_names = self.dss.RegControls.AllNames()
869 reg_names = []
870 if len(reg_control_names) != 0:
871 dss_reg_df = self.dss.utils.regcontrols_to_dataframe()
872 reg_names = dss_reg_df.Transformer.to_list()
873 flag = self.dss.Transformers.First()
874 while flag:
875 element_type = self.dss.CktElement.Name().lower().split(".")[0]
876 element_name = self.dss.CktElement.Name().lower().split(".")[1]
877 if element_type not in ["transformer"]:
878 flag = self.dss.Transformers.Next()
879 continue
880 if element_name not in reg_names:
881 flag = self.dss.Transformers.Next()
882 continue
883 bus1 = self.dss.CktElement.BusNames()[0].split(".")[0]
884 bus2 = self.dss.CktElement.BusNames()[-1].split(".")[0]
885 fb = self.bus_names_to_index_map[bus1]
886 tb = self.bus_names_to_index_map[bus2]
887 tap_direction = 1
888 if fb > tb:
889 fb, tb = tb, fb
890 bus1, bus2 = bus2, bus1
891 tap_direction = -1
892 self.dss.Circuit.SetActiveBus(bus2)
893 line_phases = self.dss.CktElement.BusNames()[0].split(".")[1:]
894 line_phases = sorted(line_phases)
896 # convert this to string to be consistent with how we conver num to phase letters
897 line_phases = str([int(phase) for phase in line_phases])
898 if line_phases == "[]":
899 # three phases are usually represented by either .1.2.3 or nothing in opendss
900 # for second case we should ensure that 3 phase is actually represented
901 line_phases = "[1, 2, 3]"
902 line_phase = self.num_phase_map[line_phases]
903 ratio = self.dss.Transformers.Tap()
904 tap = (ratio - 1) / 0.00625 * tap_direction
905 each_reg = {}
906 each_reg["fb"] = self.bus_names_to_index_map[bus1]
907 each_reg["tb"] = self.bus_names_to_index_map[bus2]
908 each_reg["from_name"] = bus1
909 each_reg["to_name"] = bus2
910 for ph in line_phase:
911 each_reg[f"tap_{ph}"] = int(round(tap))
912 each_reg["phases"] = line_phase
913 reg_data.append(each_reg)
915 flag = self.dss.Transformers.Next()
917 # combine lines between identical buses.
918 reg_df = pd.DataFrame(reg_data)
919 if len(reg_data) < 1:
920 reg_df = pd.DataFrame(
921 {
922 "fb": [],
923 "tb": [],
924 "from_name": [],
925 "to_name": [],
926 "tap_a": [],
927 "tap_b": [],
928 "tap_c": [],
929 "phases": [],
930 }
931 )
932 reg_df = reg_df.groupby(["fb", "tb"]).agg(
933 {
934 "fb": "first",
935 "tb": "first",
936 "from_name": "first",
937 "to_name": "first",
938 "tap_a": "max",
939 "tap_b": "max",
940 "tap_c": "max",
941 "phases": "sum",
942 }
943 )
944 reg_df = reg_df.reset_index(drop=True)
945 reg_df = reg_df.sort_values(by="tb", ignore_index=True).fillna(1)
946 # reg_df["tap_a"] = (reg_df["ratio_a"] - 1) / 0.00625
947 # reg_df["tap_b"] = (reg_df["ratio_b"] - 1) / 0.00625
948 # reg_df["tap_c"] = (reg_df["ratio_c"] - 1) / 0.00625
949 return reg_df
951 def _get_loads(self) -> dict[str, list[float]]:
952 """Extract load information for each node for each phase. This method extracts load on the exact bus(node) as
953 modeled in the distribution model, including secondary.
955 Returns:
956 load_per_phase(pd.DataFrame): Per phase load data in a pandas dataframe
957 """
958 s_base = self.s_base
959 load_df = pd.DataFrame(
960 [], columns=["id", "name", "pl_a", "ql_a", "pl_b", "ql_b", "pl_c", "ql_c"]
961 )
962 loads_flag = self.dss.Loads.First()
963 load_data = []
964 model_to_cvr_map = {
965 1: (0, 0),
966 2: (2, 2),
967 3: (0, 2),
968 5: (1, 1),
969 6: (0, 0),
970 7: (0, 2),
971 }
972 while loads_flag:
973 connected_buses = self.dss.CktElement.BusNames()
974 if len(connected_buses) > 1:
975 raise Exception("Multiple connected buses")
976 model = self.dss.Loads.Model()
977 cvr_p, cvr_q = model_to_cvr_map.get(model, 0)
978 if model == 4: # exponential model
979 cvr_p = self.dss.Loads.CVRwatts()
980 cvr_q = self.dss.Loads.CVRvars()
981 if model == 8: # zip model
982 zipv = self.dss.Loads.ZipV()
983 cvr_p = 2 * zipv[0] + zipv[1]
984 cvr_q = 2 * zipv[3] + zipv[4]
985 bus = connected_buses[0]
986 bus_name = bus.split(".")[0]
987 each_load = {
988 "id": 0,
989 "pl_a": 0,
990 "ql_a": 0,
991 "pl_b": 0,
992 "ql_b": 0,
993 "pl_c": 0,
994 "ql_c": 0,
995 "cvr_p": cvr_p,
996 "cvr_q": cvr_q,
997 }
998 bus_split = bus.split(".")
999 each_load["id"] = self.bus_names_to_index_map[bus_name]
1000 connected_phase_secondary = bus_split[1:]
1002 # conductor power contains info on active and reactive power
1003 conductor_power = np.array(self.dss.CktElement.Powers())
1004 # nonzero_power_indices = np.where(conductor_power != 0)[0]
1005 # nonzero_power = conductor_power[nonzero_power_indices]
1006 # Extract P and Q values (every alternate elements)
1007 a1 = np.exp(-1 / 6 * 1j * np.pi)
1008 a2 = np.exp(-5 / 6 * 1j * np.pi)
1009 p_values = conductor_power[::2]
1010 q_values = conductor_power[1::2]
1011 phases = "abc"
1012 n_phases = self.dss.Loads.Phases()
1013 pf = self.dss.Loads.PF()
1014 kw = self.dss.Loads.kW()
1015 kv = self.dss.Loads.kV()
1016 kvar = self.dss.Loads.kvar()
1017 is_delta = self.dss.Loads.IsDelta()
1018 if len(connected_phase_secondary) > 0:
1019 phases = "".join("abc"[int(n) - 1] for n in connected_phase_secondary)
1020 # if len(phases) != n_phases:
1021 # raise Exception("Number of load phases does not match with bus phases")
1022 for phase_index, ph in enumerate(phases):
1023 each_load[f"pl_{ph}"] = p_values[phase_index] * 1000 / s_base
1024 each_load[f"ql_{ph}"] = q_values[phase_index] * 1000 / s_base
1025 load_data.append(each_load)
1026 loads_flag = self.dss.Loads.Next()
1027 load_df = pd.DataFrame(load_data)
1028 load_df = load_df.groupby("id").agg(
1029 {
1030 "id": "first",
1031 "pl_a": "sum",
1032 "ql_a": "sum",
1033 "pl_b": "sum",
1034 "ql_b": "sum",
1035 "pl_c": "sum",
1036 "ql_c": "sum",
1037 }
1038 )
1039 load_df = load_df.fillna(0)
1040 return load_df.reset_index(drop=True)
1042 def to_csv(self, dir_name: str = None, overwrite: bool = True) -> None:
1043 if dir_name is None:
1044 dir_name = "testfiles"
1046 Path(dir_name).mkdir(parents=True, exist_ok=overwrite)
1047 self.branch_data.to_csv(f"{dir_name}/branch_data.csv", index=False)
1048 self.bus_data.to_csv(f"{dir_name}/bus_data.csv", index=False)
1049 self.cap_data.to_csv(f"{dir_name}/cap_data.csv", index=False)
1050 self.gen_data.to_csv(f"{dir_name}/gen_data.csv", index=False)
1051 self.reg_data.to_csv(f"{dir_name}/reg_data.csv", index=False)
1053 def update_gen_q(self, q: pd.DataFrame):
1054 flag = self.dss.Generators.First()
1055 while flag:
1056 bus_phases = np.array(
1057 self.dss.CktElement.BusNames()[0].split(".")[1:]
1058 ).astype(int)
1059 n_phases = len(bus_phases)
1060 if len(bus_phases) == 0 or len(bus_phases) >= 3:
1061 n_phases = 3
1062 active_phases = np.array([0, 1, 2])
1063 if n_phases < 3:
1064 active_phases = bus_phases - 1
1065 phase_columns = ["abc"[ph_idx] for ph_idx in active_phases]
1066 bus = self.dss.Generators.Bus1().split(".")[0]
1067 bus_id = self.bus_names_to_index_map[bus]
1068 kvar = q.loc[bus_id, phase_columns].sum() * self.s_base / 1000
1069 self.dss.Generators.kvar(kvar)
1070 flag = self.dss.Generators.Next()
1073def main() -> None:
1074 # dss_data = DSSParser(r'ieee9500_dss/Master-unbal-initial-config.dss')
1075 dss_data = DSSParser(r"ieee13Bus/IEEE13Nodeckt.dss")
1076 print(dss_data.get_branches())
1079if __name__ == "__main__":
1080 main()