Coverage for src/distopf/cim_importer/processors/transformer_processor.py: 83%
211 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
1import numpy as np
2import logging
3from distopf.cim_importer.processors.base_processor import BaseProcessor
4from distopf.cim_importer.processors.regulator_processor import RegulatorProcessor
5from distopf.cim_importer.utils import PhaseUtils
6import cimgraph.data_profile.cimhub_2023 as cim # type: ignore
8_log = logging.getLogger(__name__)
11class TransformerProcessor(BaseProcessor):
12 """Processor for PowerTransformer objects (excluding regulators)."""
14 def __init__(self, s_base: float = 1e6):
15 super().__init__(s_base)
16 self.regulator_processor = RegulatorProcessor(s_base)
18 def process(self, network) -> list[dict]:
19 """Process all PowerTransformer objects that are not regulators."""
20 results = []
21 for xfmr in network.list_by_class(cim.PowerTransformer):
22 if not self.regulator_processor.is_regulator(xfmr):
23 results.extend(self._process_transformer(xfmr))
24 return results
26 def _process_transformer(self, xfmr) -> list[dict]:
27 """Process individual transformer (may return multiple entries for 3-winding)."""
28 terminals = xfmr.Terminals
29 buses = [terminal.ConnectivityNode.name for terminal in terminals]
30 # Remove duplicates while preserving order
31 unique_buses = []
32 for bus in buses:
33 if bus not in unique_buses:
34 unique_buses.append(bus)
35 buses = unique_buses
37 if len(buses) == 2:
38 return [self._process_2winding_transformer(xfmr, buses)]
39 if len(buses) == 3:
40 return self._process_3winding_transformer(xfmr, buses)
42 raise NotImplementedError(
43 f"Transformers with {len(buses)} windings not implemented: {xfmr.name}"
44 )
46 def _process_2winding_transformer(self, xfmr, buses: list) -> dict:
47 """Process 2-winding transformer."""
48 data = self._create_base_branch_dict()
49 data.update(
50 {
51 "name": xfmr.name,
52 "from_name": buses[0],
53 "to_name": buses[1],
54 "type": "transformer",
55 }
56 )
58 # Process impedance based on transformer structure
59 if len(xfmr.PowerTransformerEnd) > 0:
60 self._process_power_transformer_end_impedance(xfmr, data)
61 return data
63 self._process_transformer_tank_impedance(xfmr, data)
64 return data
66 def _process_3winding_transformer(self, xfmr, buses: list) -> list[dict]:
67 """Process 3-winding transformer (creates two entries: 1→2 and 1→3)."""
68 # Entry 1: buses[0] → buses[1]
69 data1 = self._create_base_branch_dict()
70 data1.update(
71 {
72 "name": f"{xfmr.name}_12",
73 "from_name": buses[0],
74 "to_name": buses[1],
75 "type": "transformer",
76 }
77 )
79 # Entry 2: buses[0] → buses[2]
80 data2 = self._create_base_branch_dict()
81 data2.update(
82 {
83 "name": f"{xfmr.name}_13",
84 "from_name": buses[0],
85 "to_name": buses[2],
86 "type": "transformer",
87 }
88 )
90 # Process impedances
91 if len(xfmr.PowerTransformerEnd) > 0:
92 self._process_3winding_end_impedance(xfmr, data1, (1, 2))
93 self._process_3winding_end_impedance(xfmr, data2, (1, 3))
94 return [data1, data2]
96 self._process_transformer_tank_impedance(xfmr, data1)
97 self._process_transformer_tank_impedance(xfmr, data2)
98 return [data1, data2]
100 def _update_impedance_data(self, data: dict, r: float, x: float, v_ln_base: float):
101 """Update data dict with impedance values."""
102 z_base = v_ln_base**2 / self.s_base
103 data.update(
104 {
105 "raa": r,
106 "rbb": r,
107 "rcc": r,
108 "xaa": x,
109 "xbb": x,
110 "xcc": x,
111 "phases": "abc",
112 "v_ln_base": v_ln_base,
113 "z_base": z_base,
114 }
115 )
117 def _process_power_transformer_end_impedance(self, xfmr, data: dict):
118 """Process impedance from PowerTransformerEnd structure."""
119 for power_transformer_end in xfmr.PowerTransformerEnd:
120 end_number = int(power_transformer_end.endNumber)
121 if end_number != 1: # Only process primary side
122 continue
124 v_rated = float(power_transformer_end.ratedU)
125 v_ln_base = v_rated / np.sqrt(3)
126 z_base = v_ln_base**2 / self.s_base
128 # Try mesh impedance first
129 if (
130 hasattr(power_transformer_end, "FromMeshImpedance")
131 and power_transformer_end.FromMeshImpedance
132 ):
133 mesh_imp = power_transformer_end.FromMeshImpedance[0]
134 r = float(mesh_imp.r) / z_base if mesh_imp.r else 0.0
135 x = float(mesh_imp.x) / z_base if mesh_imp.x else 0.0
136 self._update_impedance_data(data, r, x, v_ln_base)
137 return
139 # Try star impedance
140 if (
141 hasattr(power_transformer_end, "StarImpedance")
142 and power_transformer_end.StarImpedance
143 ):
144 star_imp = power_transformer_end.StarImpedance
145 r = float(star_imp.r) / z_base if star_imp.r else 0.0
146 x = float(star_imp.x) / z_base if star_imp.x else 0.0
147 self._update_impedance_data(data, r, x, v_ln_base)
148 return
150 # Try direct impedance values
151 if hasattr(power_transformer_end, "r") and power_transformer_end.r:
152 r = float(power_transformer_end.r) / z_base
153 x = (
154 float(power_transformer_end.x) / z_base
155 if power_transformer_end.x
156 else 0.0
157 )
158 self._update_impedance_data(data, r, x, v_ln_base)
159 return
161 # No impedance found, use defaults
162 self._set_default_transformer_impedance(data, v_ln_base)
163 return
165 # No primary end found, use defaults
166 self._set_default_transformer_impedance(data)
168 def _process_3winding_end_impedance(self, xfmr, data: dict, winding_pair: tuple):
169 """Process impedance for 3-winding transformer."""
170 primary_end = None
171 # Find primary end
172 for pte in xfmr.PowerTransformerEnd:
173 if int(pte.endNumber) == winding_pair[0]:
174 primary_end = pte
175 break
177 if not (
178 primary_end
179 and hasattr(primary_end, "FromMeshImpedance")
180 and primary_end.FromMeshImpedance
181 ):
182 self._process_transformer_tank_impedance(xfmr, data)
183 return
185 for mesh_impedance in primary_end.FromMeshImpedance:
186 if not (
187 hasattr(mesh_impedance, "ToTransformerEnd")
188 and mesh_impedance.ToTransformerEnd
189 ):
190 continue
192 to_ends = mesh_impedance.ToTransformerEnd
193 if not isinstance(to_ends, list):
194 to_ends = [to_ends]
196 for to_end in to_ends:
197 if int(to_end.endNumber) != winding_pair[1]:
198 continue
200 # Found the correct winding pair
201 v_rated = float(primary_end.ratedU)
202 v_ln_base = v_rated / np.sqrt(3)
203 z_base = v_ln_base**2 / self.s_base
204 r = float(mesh_impedance.r) / z_base if mesh_impedance.r else 0.0
205 x = float(mesh_impedance.x) / z_base if mesh_impedance.x else 0.0
206 self._update_impedance_data(data, r, x, v_ln_base)
207 return
209 # Fallback to tank impedance
210 self._process_transformer_tank_impedance(xfmr, data)
212 def _process_transformer_tank_impedance(self, xfmr, data: dict):
213 """Process impedance from TransformerTank structure."""
214 if not hasattr(xfmr, "TransformerTanks") or not xfmr.TransformerTanks:
215 _log.warning(f"No TransformerTanks found for transformer {xfmr.name}")
216 self._set_default_transformer_impedance(data)
217 return
219 tank = xfmr.TransformerTanks[0] # Use first tank
220 v_ln_base = self._get_tank_voltage_base(tank)
221 z_base = v_ln_base**2 / self.s_base
222 phases = PhaseUtils.get_equipment_phases(tank)
224 # Extract impedance values using multiple strategies
225 r_pu, x_pu = self._extract_tank_impedance(tank, z_base)
227 data.update(
228 {
229 "raa": r_pu,
230 "rbb": r_pu,
231 "rcc": r_pu,
232 "xaa": x_pu,
233 "xbb": x_pu,
234 "xcc": x_pu,
235 "phases": phases,
236 "v_ln_base": v_ln_base,
237 "z_base": z_base,
238 }
239 )
241 def _extract_tank_impedance(
242 self, tank: cim.TransformerTank, z_base: float
243 ) -> tuple[float, float]:
244 """Comprehensive impedance extraction from transformer tank."""
245 r_ohms = 0.0
246 x_ohms = 0.0
248 # Strategy 1: Check tank ends for mesh impedance
249 if hasattr(tank, "TransformerTankEnds") and tank.TransformerTankEnds:
250 r_ohms, x_ohms = self._extract_tank_end_impedance(tank.TransformerTankEnds)
251 if r_ohms > 0.0 and x_ohms > 0.0:
252 return self._convert_to_per_unit(r_ohms, x_ohms, z_base, tank)
254 # Strategy 2: Check tank itself for impedance attributes
255 if r_ohms == 0.0 or x_ohms == 0.0:
256 r_ohms, x_ohms = self._extract_tank_direct_impedance(tank, r_ohms, x_ohms)
258 # Strategy 3: Check TransformerTankInfo if available
259 if (r_ohms == 0.0 or x_ohms == 0.0) and hasattr(tank, "TransformerTankInfo"):
260 r_ohms, x_ohms = self._extract_tank_info_impedance(
261 tank.TransformerTankInfo, r_ohms, x_ohms
262 )
264 return self._convert_to_per_unit(r_ohms, x_ohms, z_base, tank)
266 def _extract_tank_end_impedance(self, tank_ends) -> tuple[float, float]:
267 """Extract impedance from tank ends."""
268 r_ohms = 0.0
269 x_ohms = 0.0
271 try:
272 for tank_end in tank_ends:
273 # Check for mesh impedance
274 if (
275 hasattr(tank_end, "FromMeshImpedance")
276 and tank_end.FromMeshImpedance
277 ):
278 for mesh_imp in tank_end.FromMeshImpedance:
279 if mesh_imp.r and r_ohms == 0.0:
280 r_ohms = float(mesh_imp.r)
281 if mesh_imp.x and x_ohms == 0.0:
282 x_ohms = float(mesh_imp.x)
284 # Check for star impedance
285 if (
286 hasattr(tank_end, "StarImpedance")
287 and tank_end.StarImpedance
288 and r_ohms == 0.0
289 ):
290 star_imp = tank_end.StarImpedance
291 if star_imp.r:
292 r_ohms = float(star_imp.r)
293 if star_imp.x:
294 x_ohms = float(star_imp.x)
296 # Check for direct impedance on tank end
297 if hasattr(tank_end, "r") and tank_end.r and r_ohms == 0.0:
298 r_ohms = float(tank_end.r)
299 if hasattr(tank_end, "x") and tank_end.x:
300 x_ohms = float(tank_end.x)
302 if r_ohms > 0.0 and x_ohms > 0.0:
303 break
304 except (TypeError, AttributeError, ValueError) as e:
305 _log.debug(f"Error extracting tank end impedance: {e}")
307 return r_ohms, x_ohms
309 def _extract_tank_direct_impedance(
310 self, tank, r_ohms: float, x_ohms: float
311 ) -> tuple[float, float]:
312 """Extract impedance directly from tank attributes."""
313 try:
314 tank_r_attrs = ["r", "resistance", "r1", "positiveSequenceResistance"]
315 tank_x_attrs = ["x", "reactance", "x1", "positiveSequenceReactance"]
317 if r_ohms == 0.0:
318 for attr in tank_r_attrs:
319 if hasattr(tank, attr) and getattr(tank, attr) is not None:
320 r_ohms = float(getattr(tank, attr))
321 break
323 if x_ohms == 0.0:
324 for attr in tank_x_attrs:
325 if hasattr(tank, attr) and getattr(tank, attr) is not None:
326 x_ohms = float(getattr(tank, attr))
327 break
328 except (ValueError, TypeError) as e:
329 _log.debug(f"Error extracting tank impedance attributes: {e}")
331 return r_ohms, x_ohms
333 def _extract_tank_info_impedance(
334 self, tank_info, r_ohms: float, x_ohms: float
335 ) -> tuple[float, float]:
336 """Extract impedance from tank info."""
337 try:
338 if tank_info:
339 if hasattr(tank_info, "r") and tank_info.r and r_ohms == 0.0:
340 r_ohms = float(tank_info.r)
341 if hasattr(tank_info, "x") and tank_info.x and x_ohms == 0.0:
342 x_ohms = float(tank_info.x)
343 except (ValueError, TypeError, AttributeError) as e:
344 _log.debug(f"Error extracting tank info impedance: {e}")
346 return r_ohms, x_ohms
348 def _convert_to_per_unit(
349 self, r_ohms: float, x_ohms: float, z_base: float, tank
350 ) -> tuple[float, float]:
351 """Convert ohmic values to per-unit."""
352 r_pu = r_ohms / z_base if z_base > 0 and r_ohms > 0 else 0.0
353 x_pu = x_ohms / z_base if z_base > 0 and x_ohms > 0 else 0.0
355 # If still zero, use typical transformer values
356 if r_pu == 0.0 and x_pu == 0.0:
357 r_pu = 0.01 # 1% resistance
358 x_pu = 0.05 # 5% reactance
359 tank_name = tank.name if hasattr(tank, "name") else "unknown"
360 _log.info(
361 f"Using default impedance values for transformer tank {tank_name}"
362 )
364 return r_pu, x_pu
366 def _get_tank_voltage_base(self, tank) -> float:
367 """Get voltage base from transformer tank."""
368 if not (hasattr(tank, "TransformerTankEnds") and tank.TransformerTankEnds):
369 return 0.0
371 try:
372 for tank_end in tank.TransformerTankEnds:
373 if (
374 hasattr(tank_end, "BaseVoltage")
375 and tank_end.BaseVoltage
376 and hasattr(tank_end.BaseVoltage, "nominalVoltage")
377 and tank_end.BaseVoltage.nominalVoltage
378 ):
379 v_base = float(tank_end.BaseVoltage.nominalVoltage)
380 return v_base / np.sqrt(3)
382 # Also check for ratedU on tank end
383 if hasattr(tank_end, "ratedU") and tank_end.ratedU:
384 v_base = float(tank_end.ratedU)
385 return v_base / np.sqrt(3)
386 except (ValueError, TypeError, AttributeError):
387 pass
389 return 0.0
391 def _set_default_transformer_impedance(self, data: dict, v_ln_base: float = 0.0):
392 """Set default transformer impedance values."""
393 z_base = v_ln_base**2 / self.s_base
394 r_pu = 0.01 # 1%
395 x_pu = 0.05 # 5%
397 data.update(
398 {
399 "raa": r_pu,
400 "rbb": r_pu,
401 "rcc": r_pu,
402 "xaa": x_pu,
403 "xbb": x_pu,
404 "xcc": x_pu,
405 "phases": "abc",
406 "v_ln_base": v_ln_base,
407 "z_base": z_base,
408 }
409 )