Coverage for src/distopf/cim_converter/processors/transformer_processor.py: 83%

211 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-09 17:44 -0700

1import numpy as np 

2import logging 

3from distopf.cim_converter.processors.base_processor import BaseProcessor 

4from distopf.cim_converter.processors.regulator_processor import RegulatorProcessor 

5from distopf.cim_converter.utils import PhaseUtils 

6import cimgraph.data_profile.cimhub_2023 as cim # type: ignore 

7 

8_log = logging.getLogger(__name__) 

9 

10 

11class TransformerProcessor(BaseProcessor): 

12 """Processor for PowerTransformer objects (excluding regulators).""" 

13 

14 def __init__(self, s_base: float = 1e6): 

15 super().__init__(s_base) 

16 self.regulator_processor = RegulatorProcessor(s_base) 

17 

18 def process(self, network) -> list[dict]: 

19 """Process all PowerTransformer objects that are not regulators.""" 

20 results = [] 

21 for xfmr in network.list_by_class(cim.PowerTransformer): 

22 if not self.regulator_processor.is_regulator(xfmr): 

23 results.extend(self._process_transformer(xfmr)) 

24 return results 

25 

26 def _process_transformer(self, xfmr) -> list[dict]: 

27 """Process individual transformer (may return multiple entries for 3-winding).""" 

28 terminals = xfmr.Terminals 

29 buses = [terminal.ConnectivityNode.name for terminal in terminals] 

30 # Remove duplicates while preserving order 

31 unique_buses = [] 

32 for bus in buses: 

33 if bus not in unique_buses: 

34 unique_buses.append(bus) 

35 buses = unique_buses 

36 

37 if len(buses) == 2: 

38 return [self._process_2winding_transformer(xfmr, buses)] 

39 if len(buses) == 3: 

40 return self._process_3winding_transformer(xfmr, buses) 

41 

42 raise NotImplementedError( 

43 f"Transformers with {len(buses)} windings not implemented: {xfmr.name}" 

44 ) 

45 

46 def _process_2winding_transformer(self, xfmr, buses: list) -> dict: 

47 """Process 2-winding transformer.""" 

48 data = self._create_base_branch_dict() 

49 data.update( 

50 { 

51 "name": xfmr.name, 

52 "from_name": buses[0], 

53 "to_name": buses[1], 

54 "type": "transformer", 

55 } 

56 ) 

57 

58 # Process impedance based on transformer structure 

59 if len(xfmr.PowerTransformerEnd) > 0: 

60 self._process_power_transformer_end_impedance(xfmr, data) 

61 return data 

62 

63 self._process_transformer_tank_impedance(xfmr, data) 

64 return data 

65 

66 def _process_3winding_transformer(self, xfmr, buses: list) -> list[dict]: 

67 """Process 3-winding transformer (creates two entries: 1→2 and 1→3).""" 

68 # Entry 1: buses[0] → buses[1] 

69 data1 = self._create_base_branch_dict() 

70 data1.update( 

71 { 

72 "name": f"{xfmr.name}_12", 

73 "from_name": buses[0], 

74 "to_name": buses[1], 

75 "type": "transformer", 

76 } 

77 ) 

78 

79 # Entry 2: buses[0] → buses[2] 

80 data2 = self._create_base_branch_dict() 

81 data2.update( 

82 { 

83 "name": f"{xfmr.name}_13", 

84 "from_name": buses[0], 

85 "to_name": buses[2], 

86 "type": "transformer", 

87 } 

88 ) 

89 

90 # Process impedances 

91 if len(xfmr.PowerTransformerEnd) > 0: 

92 self._process_3winding_end_impedance(xfmr, data1, (1, 2)) 

93 self._process_3winding_end_impedance(xfmr, data2, (1, 3)) 

94 return [data1, data2] 

95 

96 self._process_transformer_tank_impedance(xfmr, data1) 

97 self._process_transformer_tank_impedance(xfmr, data2) 

98 return [data1, data2] 

99 

100 def _update_impedance_data(self, data: dict, r: float, x: float, v_ln_base: float): 

101 """Update data dict with impedance values.""" 

102 z_base = v_ln_base**2 / self.s_base 

103 data.update( 

104 { 

105 "raa": r, 

106 "rbb": r, 

107 "rcc": r, 

108 "xaa": x, 

109 "xbb": x, 

110 "xcc": x, 

111 "phases": "abc", 

112 "v_ln_base": v_ln_base, 

113 "z_base": z_base, 

114 } 

115 ) 

116 

117 def _process_power_transformer_end_impedance(self, xfmr, data: dict): 

118 """Process impedance from PowerTransformerEnd structure.""" 

119 for power_transformer_end in xfmr.PowerTransformerEnd: 

120 end_number = int(power_transformer_end.endNumber) 

121 if end_number != 1: # Only process primary side 

122 continue 

123 

124 v_rated = float(power_transformer_end.ratedU) 

125 v_ln_base = v_rated / np.sqrt(3) 

126 z_base = v_ln_base**2 / self.s_base 

127 

128 # Try mesh impedance first 

129 if ( 

130 hasattr(power_transformer_end, "FromMeshImpedance") 

131 and power_transformer_end.FromMeshImpedance 

132 ): 

133 mesh_imp = power_transformer_end.FromMeshImpedance[0] 

134 r = float(mesh_imp.r) / z_base if mesh_imp.r else 0.0 

135 x = float(mesh_imp.x) / z_base if mesh_imp.x else 0.0 

136 self._update_impedance_data(data, r, x, v_ln_base) 

137 return 

138 

139 # Try star impedance 

140 if ( 

141 hasattr(power_transformer_end, "StarImpedance") 

142 and power_transformer_end.StarImpedance 

143 ): 

144 star_imp = power_transformer_end.StarImpedance 

145 r = float(star_imp.r) / z_base if star_imp.r else 0.0 

146 x = float(star_imp.x) / z_base if star_imp.x else 0.0 

147 self._update_impedance_data(data, r, x, v_ln_base) 

148 return 

149 

150 # Try direct impedance values 

151 if hasattr(power_transformer_end, "r") and power_transformer_end.r: 

152 r = float(power_transformer_end.r) / z_base 

153 x = ( 

154 float(power_transformer_end.x) / z_base 

155 if power_transformer_end.x 

156 else 0.0 

157 ) 

158 self._update_impedance_data(data, r, x, v_ln_base) 

159 return 

160 

161 # No impedance found, use defaults 

162 self._set_default_transformer_impedance(data, v_ln_base) 

163 return 

164 

165 # No primary end found, use defaults 

166 self._set_default_transformer_impedance(data) 

167 

168 def _process_3winding_end_impedance(self, xfmr, data: dict, winding_pair: tuple): 

169 """Process impedance for 3-winding transformer.""" 

170 primary_end = None 

171 # Find primary end 

172 for pte in xfmr.PowerTransformerEnd: 

173 if int(pte.endNumber) == winding_pair[0]: 

174 primary_end = pte 

175 break 

176 

177 if not ( 

178 primary_end 

179 and hasattr(primary_end, "FromMeshImpedance") 

180 and primary_end.FromMeshImpedance 

181 ): 

182 self._process_transformer_tank_impedance(xfmr, data) 

183 return 

184 

185 for mesh_impedance in primary_end.FromMeshImpedance: 

186 if not ( 

187 hasattr(mesh_impedance, "ToTransformerEnd") 

188 and mesh_impedance.ToTransformerEnd 

189 ): 

190 continue 

191 

192 to_ends = mesh_impedance.ToTransformerEnd 

193 if not isinstance(to_ends, list): 

194 to_ends = [to_ends] 

195 

196 for to_end in to_ends: 

197 if int(to_end.endNumber) != winding_pair[1]: 

198 continue 

199 

200 # Found the correct winding pair 

201 v_rated = float(primary_end.ratedU) 

202 v_ln_base = v_rated / np.sqrt(3) 

203 z_base = v_ln_base**2 / self.s_base 

204 r = float(mesh_impedance.r) / z_base if mesh_impedance.r else 0.0 

205 x = float(mesh_impedance.x) / z_base if mesh_impedance.x else 0.0 

206 self._update_impedance_data(data, r, x, v_ln_base) 

207 return 

208 

209 # Fallback to tank impedance 

210 self._process_transformer_tank_impedance(xfmr, data) 

211 

212 def _process_transformer_tank_impedance(self, xfmr, data: dict): 

213 """Process impedance from TransformerTank structure.""" 

214 if not hasattr(xfmr, "TransformerTanks") or not xfmr.TransformerTanks: 

215 _log.warning(f"No TransformerTanks found for transformer {xfmr.name}") 

216 self._set_default_transformer_impedance(data) 

217 return 

218 

219 tank = xfmr.TransformerTanks[0] # Use first tank 

220 v_ln_base = self._get_tank_voltage_base(tank) 

221 z_base = v_ln_base**2 / self.s_base 

222 phases = PhaseUtils.get_equipment_phases(tank) 

223 

224 # Extract impedance values using multiple strategies 

225 r_pu, x_pu = self._extract_tank_impedance(tank, z_base) 

226 

227 data.update( 

228 { 

229 "raa": r_pu, 

230 "rbb": r_pu, 

231 "rcc": r_pu, 

232 "xaa": x_pu, 

233 "xbb": x_pu, 

234 "xcc": x_pu, 

235 "phases": phases, 

236 "v_ln_base": v_ln_base, 

237 "z_base": z_base, 

238 } 

239 ) 

240 

241 def _extract_tank_impedance( 

242 self, tank: cim.TransformerTank, z_base: float 

243 ) -> tuple[float, float]: 

244 """Comprehensive impedance extraction from transformer tank.""" 

245 r_ohms = 0.0 

246 x_ohms = 0.0 

247 

248 # Strategy 1: Check tank ends for mesh impedance 

249 if hasattr(tank, "TransformerTankEnds") and tank.TransformerTankEnds: 

250 r_ohms, x_ohms = self._extract_tank_end_impedance(tank.TransformerTankEnds) 

251 if r_ohms > 0.0 and x_ohms > 0.0: 

252 return self._convert_to_per_unit(r_ohms, x_ohms, z_base, tank) 

253 

254 # Strategy 2: Check tank itself for impedance attributes 

255 if r_ohms == 0.0 or x_ohms == 0.0: 

256 r_ohms, x_ohms = self._extract_tank_direct_impedance(tank, r_ohms, x_ohms) 

257 

258 # Strategy 3: Check TransformerTankInfo if available 

259 if (r_ohms == 0.0 or x_ohms == 0.0) and hasattr(tank, "TransformerTankInfo"): 

260 r_ohms, x_ohms = self._extract_tank_info_impedance( 

261 tank.TransformerTankInfo, r_ohms, x_ohms 

262 ) 

263 

264 return self._convert_to_per_unit(r_ohms, x_ohms, z_base, tank) 

265 

266 def _extract_tank_end_impedance(self, tank_ends) -> tuple[float, float]: 

267 """Extract impedance from tank ends.""" 

268 r_ohms = 0.0 

269 x_ohms = 0.0 

270 

271 try: 

272 for tank_end in tank_ends: 

273 # Check for mesh impedance 

274 if ( 

275 hasattr(tank_end, "FromMeshImpedance") 

276 and tank_end.FromMeshImpedance 

277 ): 

278 for mesh_imp in tank_end.FromMeshImpedance: 

279 if mesh_imp.r and r_ohms == 0.0: 

280 r_ohms = float(mesh_imp.r) 

281 if mesh_imp.x and x_ohms == 0.0: 

282 x_ohms = float(mesh_imp.x) 

283 

284 # Check for star impedance 

285 if ( 

286 hasattr(tank_end, "StarImpedance") 

287 and tank_end.StarImpedance 

288 and r_ohms == 0.0 

289 ): 

290 star_imp = tank_end.StarImpedance 

291 if star_imp.r: 

292 r_ohms = float(star_imp.r) 

293 if star_imp.x: 

294 x_ohms = float(star_imp.x) 

295 

296 # Check for direct impedance on tank end 

297 if hasattr(tank_end, "r") and tank_end.r and r_ohms == 0.0: 

298 r_ohms = float(tank_end.r) 

299 if hasattr(tank_end, "x") and tank_end.x: 

300 x_ohms = float(tank_end.x) 

301 

302 if r_ohms > 0.0 and x_ohms > 0.0: 

303 break 

304 except (TypeError, AttributeError, ValueError) as e: 

305 _log.debug(f"Error extracting tank end impedance: {e}") 

306 

307 return r_ohms, x_ohms 

308 

309 def _extract_tank_direct_impedance( 

310 self, tank, r_ohms: float, x_ohms: float 

311 ) -> tuple[float, float]: 

312 """Extract impedance directly from tank attributes.""" 

313 try: 

314 tank_r_attrs = ["r", "resistance", "r1", "positiveSequenceResistance"] 

315 tank_x_attrs = ["x", "reactance", "x1", "positiveSequenceReactance"] 

316 

317 if r_ohms == 0.0: 

318 for attr in tank_r_attrs: 

319 if hasattr(tank, attr) and getattr(tank, attr) is not None: 

320 r_ohms = float(getattr(tank, attr)) 

321 break 

322 

323 if x_ohms == 0.0: 

324 for attr in tank_x_attrs: 

325 if hasattr(tank, attr) and getattr(tank, attr) is not None: 

326 x_ohms = float(getattr(tank, attr)) 

327 break 

328 except (ValueError, TypeError) as e: 

329 _log.debug(f"Error extracting tank impedance attributes: {e}") 

330 

331 return r_ohms, x_ohms 

332 

333 def _extract_tank_info_impedance( 

334 self, tank_info, r_ohms: float, x_ohms: float 

335 ) -> tuple[float, float]: 

336 """Extract impedance from tank info.""" 

337 try: 

338 if tank_info: 

339 if hasattr(tank_info, "r") and tank_info.r and r_ohms == 0.0: 

340 r_ohms = float(tank_info.r) 

341 if hasattr(tank_info, "x") and tank_info.x and x_ohms == 0.0: 

342 x_ohms = float(tank_info.x) 

343 except (ValueError, TypeError, AttributeError) as e: 

344 _log.debug(f"Error extracting tank info impedance: {e}") 

345 

346 return r_ohms, x_ohms 

347 

348 def _convert_to_per_unit( 

349 self, r_ohms: float, x_ohms: float, z_base: float, tank 

350 ) -> tuple[float, float]: 

351 """Convert ohmic values to per-unit.""" 

352 r_pu = r_ohms / z_base if z_base > 0 and r_ohms > 0 else 0.0 

353 x_pu = x_ohms / z_base if z_base > 0 and x_ohms > 0 else 0.0 

354 

355 # If still zero, use typical transformer values 

356 if r_pu == 0.0 and x_pu == 0.0: 

357 r_pu = 0.01 # 1% resistance 

358 x_pu = 0.05 # 5% reactance 

359 tank_name = tank.name if hasattr(tank, "name") else "unknown" 

360 _log.info( 

361 f"Using default impedance values for transformer tank {tank_name}" 

362 ) 

363 

364 return r_pu, x_pu 

365 

366 def _get_tank_voltage_base(self, tank) -> float: 

367 """Get voltage base from transformer tank.""" 

368 if not (hasattr(tank, "TransformerTankEnds") and tank.TransformerTankEnds): 

369 return 0.0 

370 

371 try: 

372 for tank_end in tank.TransformerTankEnds: 

373 if ( 

374 hasattr(tank_end, "BaseVoltage") 

375 and tank_end.BaseVoltage 

376 and hasattr(tank_end.BaseVoltage, "nominalVoltage") 

377 and tank_end.BaseVoltage.nominalVoltage 

378 ): 

379 v_base = float(tank_end.BaseVoltage.nominalVoltage) 

380 return v_base / np.sqrt(3) 

381 

382 # Also check for ratedU on tank end 

383 if hasattr(tank_end, "ratedU") and tank_end.ratedU: 

384 v_base = float(tank_end.ratedU) 

385 return v_base / np.sqrt(3) 

386 except (ValueError, TypeError, AttributeError): 

387 pass 

388 

389 return 0.0 

390 

391 def _set_default_transformer_impedance(self, data: dict, v_ln_base: float = 0.0): 

392 """Set default transformer impedance values.""" 

393 z_base = v_ln_base**2 / self.s_base 

394 r_pu = 0.01 # 1% 

395 x_pu = 0.05 # 5% 

396 

397 data.update( 

398 { 

399 "raa": r_pu, 

400 "rbb": r_pu, 

401 "rcc": r_pu, 

402 "xaa": x_pu, 

403 "xbb": x_pu, 

404 "xcc": x_pu, 

405 "phases": "abc", 

406 "v_ln_base": v_ln_base, 

407 "z_base": z_base, 

408 } 

409 )