Coverage for src/distopf/cim_converter/processors/generator_processor.py: 70%
93 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-09 17:44 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-09 17:44 -0700
1from distopf.cim_converter.processors.base_processor import BaseProcessor
2import cimgraph.data_profile.cimhub_2023 as cim
3from distopf.cim_converter.utils import PhaseUtils
6class GeneratorProcessor(BaseProcessor):
7 """Processor for generator objects (PowerElectronicsConnection, EnergySource, etc.)."""
9 def process(self, network) -> list[dict]:
10 """Process all generator types."""
11 results = []
12 # Process PowerElectronicsConnection (PV, Battery, etc.)
13 results.extend(self._process_power_electronics_connections(network))
14 # Process EnergySource (may be generators or sources)
15 results.extend(self._process_energy_sources(network))
16 return results
18 def _process_power_electronics_connections(self, network) -> list[dict]:
19 """Process PowerElectronicsConnection objects."""
20 results = []
21 for pec in network.graph.get(cim.PowerElectronicsConnection, {}).values():
22 gen_data = self._process_power_electronics_connection(pec)
23 if gen_data:
24 results.append(gen_data)
25 return results
27 def _process_power_electronics_connection(self, pec) -> dict | None:
28 """Process individual PowerElectronicsConnection."""
29 if not pec.Terminals or len(pec.Terminals) == 0:
30 return None
32 s_base_gen = (
33 float(pec.ratedS) if hasattr(pec, "ratedS") and pec.ratedS else self.s_base
34 )
36 bus_name = pec.Terminals[0].ConnectivityNode.name
37 gen_name = pec.name
39 # Convert power and limits to system per-unit
40 total_p_pu = float(pec.p) / self.s_base if hasattr(pec, "p") and pec.p else 0.0
41 total_q_pu = float(pec.q) / self.s_base if hasattr(pec, "q") and pec.q else 0.0
42 max_q_pu = (
43 (float(pec.maxQ) / self.s_base)
44 if hasattr(pec, "maxQ") and pec.maxQ
45 else 0.0
46 )
47 min_q_pu = (
48 (float(pec.minQ) / self.s_base)
49 if hasattr(pec, "minQ") and pec.minQ
50 else 0.0
51 )
52 rated_s_pu = (
53 (float(pec.ratedS) / self.s_base)
54 if hasattr(pec, "ratedS") and pec.ratedS
55 else 0.0
56 )
58 gen_data = {
59 "mrid": pec.mRID,
60 "id": bus_name, # Use bus name (will be mapped to integer later)
61 "name": gen_name,
62 "pa": 0.0,
63 "pb": 0.0,
64 "pc": 0.0,
65 "qa": 0.0,
66 "qb": 0.0,
67 "qc": 0.0,
68 "s_base": s_base_gen,
69 "sa_max": 0.0,
70 "sb_max": 0.0,
71 "sc_max": 0.0,
72 "phases": "",
73 "qa_max": 0.0,
74 "qb_max": 0.0,
75 "qc_max": 0.0,
76 "qa_min": 0.0,
77 "qb_min": 0.0,
78 "qc_min": 0.0,
79 "p": total_p_pu,
80 "q": total_q_pu,
81 "s_max": rated_s_pu,
82 "s_min": -rated_s_pu,
83 "q_max": max_q_pu,
84 "q_min": min_q_pu,
85 "ps1": 0.0,
86 "ps2": 0.0,
87 "qs1": 0.0,
88 "qs2": 0.0,
89 "ss1_max": 0.0,
90 "ss2_max": 0.0,
91 "qs1_max": 0.0,
92 "qs2_max": 0.0,
93 "qs1_min": 0.0,
94 "qs2_min": 0.0,
95 "control_variable": "PQ",
96 }
98 # Process phase-specific data
99 phase_data = {}
100 active_phases_for_limits = set()
101 if (
102 hasattr(pec, "PowerElectronicsConnectionPhases")
103 and pec.PowerElectronicsConnectionPhases
104 ):
105 phase_conn: cim.PowerElectronicsConnectionPhase
106 for phase_conn in pec.PowerElectronicsConnectionPhases:
107 phase_letter = PhaseUtils.get_phase_str(phase_conn.phase)
108 if not phase_letter:
109 continue
110 phase_p = (
111 float(phase_conn.p) / self.s_base
112 if hasattr(phase_conn, "p") and phase_conn.p
113 else 0.0
114 )
115 phase_q = (
116 float(phase_conn.q) / self.s_base
117 if hasattr(phase_conn, "q") and phase_conn.q
118 else 0.0
119 )
120 phase_data[phase_letter] = {"p": phase_p, "q": phase_q}
122 active_phases_for_limits.add(phase_letter)
124 # Distribute power across phases
125 if phase_data:
126 abc_phases, s_phases = set(), set()
127 for phase_letter, data in phase_data.items():
128 if phase_letter in ["s1", "s2"]:
129 s_phases.add(phase_letter)
130 if phase_letter in "abc":
131 abc_phases.add(phase_letter)
132 gen_data[f"p{phase_letter}"], gen_data[f"q{phase_letter}"] = (
133 data["p"],
134 data["q"],
135 )
136 gen_data["phases"] = (
137 f"{''.join(sorted(abc_phases))}{''.join(sorted(s_phases))}"
138 )
139 # else:
140 # gen_data["pa"] = gen_data["pb"] = gen_data["pc"] = total_p_pu / 3.0
141 # gen_data["qa"] = gen_data["qb"] = gen_data["qc"] = total_q_pu / 3.0
142 # gen_data["phases"] = "abc"
143 # if total_p_pu != 0 or total_q_pu != 0:
144 # active_phases_for_limits = {"a", "b", "c"}
146 # Distribute limits (in system p.u.) across active phases
147 num_phases = len(active_phases_for_limits)
148 if rated_s_pu > 0 and num_phases > 0:
149 s_max_pu_per_phase = rated_s_pu / num_phases
150 q_max_pu_per_phase = max_q_pu / num_phases
151 q_min_pu_per_phase = min_q_pu / num_phases
152 for phase_char in active_phases_for_limits:
153 gen_data[f"s{phase_char}_max"] = s_max_pu_per_phase
154 gen_data[f"q{phase_char}_max"] = q_max_pu_per_phase
155 gen_data[f"q{phase_char}_min"] = q_min_pu_per_phase
157 return gen_data
159 def _process_energy_sources(self, network) -> list[dict]:
160 results = []
161 for source in network.graph.get(cim.EnergySource, {}).values():
162 if hasattr(source, "Terminals") and source.Terminals:
163 if "sourcebus" in source.Terminals[0].ConnectivityNode.name.lower():
164 continue
165 gen_data = self._process_energy_source(source)
166 if gen_data:
167 results.append(gen_data)
168 return results
170 def _process_energy_source(self, source) -> dict | None:
171 if not source.Terminals or len(source.Terminals) == 0:
172 return None
173 bus_name = source.Terminals[0].ConnectivityNode.name
175 gen_data = {
176 "mrid": source.mRID,
177 "id": bus_name, # Use bus name (will be mapped to integer later)
178 "name": f"Generator_{source.mRID[:8]}",
179 "pa": 0.0,
180 "pb": 0.0,
181 "pc": 0.0,
182 "qa": 0.0,
183 "qb": 0.0,
184 "qc": 0.0,
185 "s_base": self.s_base,
186 "sa_max": 0.0,
187 "sb_max": 0.0,
188 "sc_max": 0.0,
189 "phases": "abc",
190 "qa_max": 0.0,
191 "qb_max": 0.0,
192 "qc_max": 0.0,
193 "qa_min": 0.0,
194 "qb_min": 0.0,
195 "qc_min": 0.0,
196 "ps1": 0.0,
197 "ps2": 0.0,
198 "qs1": 0.0,
199 "qs2": 0.0,
200 "ss1_max": 0.0,
201 "ss2_max": 0.0,
202 "qs1_max": 0.0,
203 "qs2_max": 0.0,
204 "qs1_min": 0.0,
205 "qs2_min": 0.0,
206 "control_variable": "PQ",
207 }
208 if hasattr(source, "activePower") and source.activePower:
209 total_p = float(source.activePower) / self.s_base
210 gen_data["pa"] = gen_data["pb"] = gen_data["pc"] = total_p / 3.0
211 if hasattr(source, "reactivePower") and source.reactivePower:
212 total_q = float(source.reactivePower) / self.s_base
213 gen_data["qa"] = gen_data["qb"] = gen_data["qc"] = total_q / 3.0
214 return gen_data
216 def _get_phase_str(self, phase_code) -> str | None:
217 phase_str = str(phase_code.value).lower()
218 if "s1" in phase_str:
219 return "s1"
220 elif "s2" in phase_str:
221 return "s2"
222 elif "a" in phase_str:
223 return "a"
224 elif "b" in phase_str:
225 return "b"
226 elif "c" in phase_str:
227 return "c"
228 return None