Coverage for src/distopf/cim_importer/processors/regulator_processor.py: 74%

247 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1import numpy as np 

2from distopf.cim_importer.processors.base_processor import BaseProcessor 

3from distopf.cim_importer.utils import PhaseUtils 

4import cimgraph.data_profile.cimhub_2023 as cim 

5 

6 

7class RegulatorProcessor(BaseProcessor): 

8 """Processor for voltage regulators.""" 

9 

10 def process_branch(self, network) -> list[dict]: 

11 """Process regulator branch entries (zero impedance entries for branch_data.csv).""" 

12 results = [] 

13 processed_tanks = set() 

14 

15 # Process PowerTransformer regulators 

16 for xfmr in network.list_by_class(cim.PowerTransformer): 

17 if self.is_regulator(xfmr): 

18 results.append(self._create_regulator_branch_entry(xfmr)) 

19 for tank in xfmr.TransformerTanks: 

20 processed_tanks.add(tank.mRID) 

21 

22 # Process standalone TransformerTank regulators 

23 for tank in network.list_by_class(cim.TransformerTank): 

24 if tank.mRID not in processed_tanks and self._is_regulator_tank(tank): 

25 results.append(self._create_regulator_branch_entry_from_tank(tank)) 

26 

27 return results 

28 

29 def process(self, network) -> list[dict]: 

30 """Process regulator data for reg_data.csv.""" 

31 results = [] 

32 processed_tanks = set() 

33 

34 # Process PowerTransformer regulators 

35 for xfmr in network.list_by_class(cim.PowerTransformer): 

36 if self.is_regulator(xfmr): 

37 results.append(self._extract_regulator_data(xfmr)) 

38 for tank in xfmr.TransformerTanks: 

39 processed_tanks.add(tank.mRID) 

40 

41 # Process standalone TransformerTank regulators 

42 for tank in network.list_by_class(cim.TransformerTank): 

43 if tank.mRID not in processed_tanks and self._is_regulator_tank(tank): 

44 results.append(self._extract_regulator_data_from_tank(tank)) 

45 

46 return results 

47 

48 def is_regulator(self, xfmr) -> bool: 

49 """Determine if a PowerTransformer is a voltage regulator.""" 

50 # Check PowerTransformerEnds for RatioTapChanger 

51 for pte in xfmr.PowerTransformerEnd: 

52 if hasattr(pte, "RatioTapChanger") and pte.RatioTapChanger: 

53 return True 

54 

55 # Check TransformerTanks for RatioTapChanger 

56 for tank in xfmr.TransformerTanks: 

57 if self._is_regulator_tank(tank): 

58 return True 

59 

60 return False 

61 

62 def _is_regulator_tank(self, tank) -> bool: 

63 """Check if a TransformerTank is a regulator.""" 

64 if hasattr(tank, "TransformerTankEnds") and tank.TransformerTankEnds: 

65 for tank_end in tank.TransformerTankEnds: 

66 if hasattr(tank_end, "RatioTapChanger") and tank_end.RatioTapChanger: 

67 return True 

68 return False 

69 

70 def _create_regulator_branch_entry(self, xfmr) -> dict: 

71 """Create branch entry for regulator (zero impedance).""" 

72 data = self._create_base_branch_dict() 

73 

74 terminals = xfmr.Terminals 

75 buses = [terminal.ConnectivityNode.name for terminal in terminals] 

76 

77 data.update( 

78 { 

79 "name": xfmr.name, 

80 "from_name": buses[0] if len(buses) > 0 else None, 

81 "to_name": buses[1] if len(buses) > 1 else None, 

82 "type": "regulator", 

83 "phases": "abc", # Default, will be updated if phase-specific info available 

84 } 

85 ) 

86 

87 # Get voltage base from TransformerTankEnd 

88 v_base, v_ln_base, z_base = self._get_regulator_voltage_base(xfmr) 

89 data.update({"v_ln_base": v_ln_base, "z_base": z_base}) 

90 

91 # Determine actual phases from regulator structure 

92 actual_phases = self._get_regulator_phases(xfmr) 

93 data["phases"] = actual_phases 

94 

95 self._process_regulator_impedance(xfmr, data, z_base) 

96 

97 return data 

98 

99 def _get_regulator_phases(self, xfmr) -> str: 

100 """Determine actual phases for regulator from tank ends.""" 

101 phases = set() 

102 

103 # Check tank ends for phase information 

104 for tank in xfmr.TransformerTanks: 

105 for tank_end in tank.TransformerTankEnds: 

106 if hasattr(tank_end, "orderedPhases") and tank_end.orderedPhases: 

107 phase_letter = PhaseUtils.get_phase_str(tank_end.orderedPhases) 

108 if phase_letter: 

109 phases.add(phase_letter) 

110 

111 # Fallback to general equipment phase detection 

112 if not phases: 

113 return PhaseUtils.get_equipment_phases(xfmr) 

114 

115 return "".join(sorted(phases)) 

116 

117 def _create_regulator_branch_entry_from_tank(self, tank) -> dict: 

118 """Create branch entry for regulator tank.""" 

119 data = self._create_base_branch_dict() 

120 

121 # Get terminals from tank ends 

122 terminals = [] 

123 for tank_end in tank.TransformerTankEnds: 

124 if tank_end.Terminal: 

125 terminals.append(tank_end.Terminal) 

126 

127 buses = [terminal.ConnectivityNode.name for terminal in terminals] 

128 

129 data.update( 

130 { 

131 "name": tank.name, 

132 "from_name": buses[0] if len(buses) > 0 else None, 

133 "to_name": buses[1] if len(buses) > 1 else None, 

134 "type": "regulator", 

135 "phases": "abc", # Default, will be updated 

136 } 

137 ) 

138 

139 # Get voltage base 

140 v_base, v_ln_base, z_base = self._get_tank_voltage_base(tank) 

141 data.update({"v_ln_base": v_ln_base, "z_base": z_base}) 

142 

143 # Determine actual phases from tank 

144 actual_phases = self._get_tank_phases(tank) 

145 data["phases"] = actual_phases 

146 

147 self._process_regulator_tank_impedance(tank, data, z_base) 

148 

149 return data 

150 

151 def _get_tank_phases(self, tank) -> str: 

152 """Determine phases for transformer tank.""" 

153 phases = set() 

154 

155 for tank_end in tank.TransformerTankEnds: 

156 if hasattr(tank_end, "orderedPhases") and tank_end.orderedPhases: 

157 phase_letter = PhaseUtils.get_phase_str(tank_end.orderedPhases) 

158 if phase_letter: 

159 phases.add(phase_letter) 

160 

161 # Fallback to general equipment phase detection 

162 if not phases: 

163 return PhaseUtils.get_equipment_phases(tank) 

164 

165 return "".join(sorted(phases)) 

166 

167 def _extract_regulator_data(self, xfmr: cim.PowerTransformer) -> dict: 

168 """Extract regulator tap and ratio data.""" 

169 terminals = xfmr.Terminals 

170 buses = [terminal.ConnectivityNode.name for terminal in terminals] 

171 phases = PhaseUtils.get_equipment_phases(xfmr) 

172 reg_data = { 

173 "name": xfmr.name, 

174 "fb": None, 

175 "tb": None, 

176 "from_name": buses[0] if len(buses) > 0 else None, 

177 "to_name": buses[1] if len(buses) > 1 else None, 

178 "ratio_a": 1.0, 

179 "ratio_b": 1.0, 

180 "ratio_c": 1.0, 

181 "phases": phases, 

182 "tap_a": 0.0, 

183 "tap_b": 0.0, 

184 "tap_c": 0.0, 

185 } 

186 

187 regulator_phases = set() 

188 

189 # Process TransformerTankEnds for tap changer data 

190 for tank in xfmr.TransformerTanks: 

191 for end in tank.TransformerTankEnds: 

192 if not hasattr(end, "RatioTapChanger") or end.RatioTapChanger is None: 

193 continue 

194 phase_letter = PhaseUtils.get_phase_str(end.orderedPhases) 

195 if not phase_letter: 

196 continue 

197 regulator_phases.add(phase_letter) 

198 tap_changer = end.RatioTapChanger 

199 

200 reg_data = self._extract_tap_changer_data( 

201 tap_changer, reg_data, tap_phases=phase_letter 

202 ) 

203 

204 phases = PhaseUtils.get_equipment_phases(xfmr) 

205 for end in xfmr.PowerTransformerEnd: 

206 if not hasattr(end, "RatioTapChanger") or end.RatioTapChanger is None: 

207 continue 

208 

209 tap_changer = end.RatioTapChanger 

210 reg_data = self._extract_tap_changer_data( 

211 tap_changer, reg_data, tap_phases=phases 

212 ) 

213 if len(regulator_phases) > 0: 

214 reg_data["phases"] = "".join(sorted(regulator_phases)) 

215 return reg_data 

216 

217 def _extract_tap_changer_data( 

218 self, tap_changer: cim.RatioTapChanger, reg_data: dict, tap_phases: str = "abc" 

219 ): 

220 # Get tap position 

221 if hasattr(tap_changer, "step") and tap_changer.step is not None: 

222 for phase_letter in tap_phases: 

223 reg_data[f"tap_{phase_letter}"] = float(tap_changer.step) 

224 # Calculate ratio 

225 if ( 

226 hasattr(tap_changer, "stepVoltageIncrement") 

227 and tap_changer.stepVoltageIncrement is not None 

228 ): 

229 step_increment = float(tap_changer.stepVoltageIncrement) / 100.0 

230 current_step = reg_data[f"tap_{phase_letter}"] 

231 ratio = 1.0 + (current_step * step_increment) 

232 for phase_letter in tap_phases: 

233 reg_data[f"ratio_{phase_letter}"] = ratio 

234 return reg_data 

235 

236 def _extract_regulator_data_from_tank(self, tank) -> dict: 

237 """Extract regulator data from standalone tank.""" 

238 # Similar to _extract_regulator_data but for single tank 

239 terminals = [] 

240 for tank_end in tank.TransformerTankEnds: 

241 if tank_end.Terminal: 

242 terminals.append(tank_end.Terminal) 

243 

244 buses = [terminal.ConnectivityNode.name for terminal in terminals] 

245 

246 reg_data = { 

247 "name": tank.name, 

248 "fb": None, 

249 "tb": None, 

250 "from_name": buses[0] if len(buses) > 0 else None, 

251 "to_name": buses[1] if len(buses) > 1 else None, 

252 "ratio_a": 1.0, 

253 "ratio_b": 1.0, 

254 "ratio_c": 1.0, 

255 "phases": "", 

256 "tap_a": 0.0, 

257 "tap_b": 0.0, 

258 "tap_c": 0.0, 

259 } 

260 

261 regulator_phases = set() 

262 

263 for tank_end in tank.TransformerTankEnds: 

264 if not hasattr(tank_end, "RatioTapChanger") or not tank_end.RatioTapChanger: 

265 continue 

266 phase_letter = PhaseUtils.get_phase_str(tank_end.orderedPhases) 

267 if not phase_letter: 

268 continue 

269 regulator_phases.add(phase_letter) 

270 tap_changer = tank_end.RatioTapChanger 

271 

272 if hasattr(tap_changer, "step") and tap_changer.step is not None: 

273 reg_data[f"tap_{phase_letter}"] = float(tap_changer.step) 

274 

275 if ( 

276 hasattr(tap_changer, "stepVoltageIncrement") 

277 and tap_changer.stepVoltageIncrement is not None 

278 ): 

279 step_increment = float(tap_changer.stepVoltageIncrement) / 100.0 

280 current_step = reg_data[f"tap_{phase_letter}"] 

281 ratio = 1.0 + (current_step * step_increment) 

282 reg_data[f"ratio_{phase_letter}"] = ratio 

283 

284 reg_data["phases"] = "".join(sorted(regulator_phases)) 

285 return reg_data 

286 

287 def _get_regulator_voltage_base(self, xfmr): 

288 """Get voltage base from regulator TransformerTankEnd.""" 

289 for tank in xfmr.TransformerTanks: 

290 for tank_end in tank.TransformerTankEnds: 

291 if not hasattr(tank_end, "BaseVoltage") or not tank_end.BaseVoltage: 

292 continue 

293 if ( 

294 not hasattr(tank_end.BaseVoltage, "nominalVoltage") 

295 or tank_end.BaseVoltage.nominalVoltage is None 

296 ): 

297 continue 

298 v_base = float(tank_end.BaseVoltage.nominalVoltage) 

299 v_ln_base = v_base / np.sqrt(3) 

300 z_base = v_ln_base**2 / self.s_base 

301 return v_base, v_ln_base, z_base 

302 

303 for pte in xfmr.PowerTransformerEnd: 

304 if not hasattr(pte, "ratedU") or pte.ratedU is None: 

305 continue 

306 v_base = pte.ratedU 

307 v_ln_base = v_base / np.sqrt(3) 

308 z_base = v_ln_base**2 / self.s_base 

309 return v_base, v_ln_base, z_base 

310 

311 raise ValueError(f"Could not determine voltage base for regulator {xfmr.name}") 

312 

313 def _get_tank_voltage_base(self, tank): 

314 """Get voltage base from tank ends.""" 

315 for tank_end in tank.TransformerTankEnds: 

316 if not hasattr(tank_end, "BaseVoltage") or not tank_end.BaseVoltage: 

317 continue 

318 if not hasattr(tank_end.BaseVoltage, "nominalVoltage"): 

319 continue 

320 v_base = float(tank_end.BaseVoltage.nominalVoltage) 

321 v_ln_base = v_base / np.sqrt(3) 

322 z_base = v_ln_base**2 / self.s_base 

323 return v_base, v_ln_base, z_base 

324 raise ValueError(f"Could not determine voltage base for tank {tank.name}") 

325 

326 def _process_regulator_impedance(self, xfmr, data: dict, z_base: float): 

327 """Process impedance for regulator transformer.""" 

328 r_pu, x_pu = 0.0, 0.0 

329 

330 # Strategy 1: Check PowerTransformerEnd for impedance 

331 for pte in xfmr.PowerTransformerEnd: 

332 if hasattr(pte, "FromMeshImpedance") and pte.FromMeshImpedance: 

333 for mesh_imp in pte.FromMeshImpedance: 

334 if mesh_imp.r and mesh_imp.x: 

335 r_pu = float(mesh_imp.r) / z_base 

336 x_pu = float(mesh_imp.x) / z_base 

337 break 

338 elif hasattr(pte, "StarImpedance") and pte.StarImpedance: 

339 star_imp = pte.StarImpedance 

340 if star_imp.r and star_imp.x: 

341 r_pu = float(star_imp.r) / z_base 

342 x_pu = float(star_imp.x) / z_base 

343 break 

344 elif hasattr(pte, "r") and pte.r: 

345 r_pu = float(pte.r) / z_base 

346 x_pu = float(pte.x) / z_base if pte.x else 0.0 

347 break 

348 if r_pu > 0 or x_pu > 0: 

349 break 

350 

351 # Strategy 2: Check TransformerTanks if no PowerTransformerEnd impedance 

352 if r_pu == 0.0 and x_pu == 0.0: 

353 for tank in xfmr.TransformerTanks: 

354 tank_r, tank_x = self._extract_tank_impedance_values(tank, z_base) 

355 if tank_r > 0 or tank_x > 0: 

356 r_pu, x_pu = tank_r, tank_x 

357 break 

358 

359 # Strategy 3: Use typical regulator impedance if none found 

360 if r_pu == 0.0 and x_pu == 0.0: 

361 r_pu = 0.005 # 0.5% - typical for regulators 

362 x_pu = 0.02 # 2% - typical for regulators 

363 

364 # Apply impedance to all phases 

365 data.update( 

366 { 

367 "raa": r_pu, 

368 "rbb": r_pu, 

369 "rcc": r_pu, 

370 "xaa": x_pu, 

371 "xbb": x_pu, 

372 "xcc": x_pu, 

373 } 

374 ) 

375 

376 def _process_regulator_tank_impedance(self, tank, data: dict, z_base: float): 

377 """Process impedance for regulator tank.""" 

378 r_pu, x_pu = self._extract_tank_impedance_values(tank, z_base) 

379 

380 # Use typical regulator impedance if none found 

381 if r_pu == 0.0 and x_pu == 0.0: 

382 r_pu = 0.005 # 0.5% 

383 x_pu = 0.02 # 2% 

384 

385 data.update( 

386 { 

387 "raa": r_pu, 

388 "rbb": r_pu, 

389 "rcc": r_pu, 

390 "xaa": x_pu, 

391 "xbb": x_pu, 

392 "xcc": x_pu, 

393 } 

394 ) 

395 

396 def _extract_tank_impedance_values( 

397 self, tank, z_base: float 

398 ) -> tuple[float, float]: 

399 """Extract impedance values from tank structure.""" 

400 r_ohms, x_ohms = 0.0, 0.0 

401 

402 # Check tank ends for impedance 

403 if hasattr(tank, "TransformerTankEnds") and tank.TransformerTankEnds: 

404 for tank_end in tank.TransformerTankEnds: 

405 # Check for mesh impedance 

406 if ( 

407 hasattr(tank_end, "FromMeshImpedance") 

408 and tank_end.FromMeshImpedance 

409 ): 

410 for mesh_imp in tank_end.FromMeshImpedance: 

411 if mesh_imp.r and r_ohms == 0.0: 

412 r_ohms = float(mesh_imp.r) 

413 if mesh_imp.x and x_ohms == 0.0: 

414 x_ohms = float(mesh_imp.x) 

415 # Check for star impedance 

416 elif hasattr(tank_end, "StarImpedance") and tank_end.StarImpedance: 

417 star_imp = tank_end.StarImpedance 

418 if star_imp.r and r_ohms == 0.0: 

419 r_ohms = float(star_imp.r) 

420 if star_imp.x and x_ohms == 0.0: 

421 x_ohms = float(star_imp.x) 

422 # Check for direct impedance attributes 

423 elif hasattr(tank_end, "r") and tank_end.r and r_ohms == 0.0: 

424 r_ohms = float(tank_end.r) 

425 if hasattr(tank_end, "x") and tank_end.x: 

426 x_ohms = float(tank_end.x) 

427 

428 if r_ohms > 0 and x_ohms > 0: 

429 break 

430 

431 # Check tank itself for impedance 

432 if r_ohms == 0.0 or x_ohms == 0.0: 

433 if hasattr(tank, "r") and tank.r and r_ohms == 0.0: 

434 r_ohms = float(tank.r) 

435 if hasattr(tank, "x") and tank.x and x_ohms == 0.0: 

436 x_ohms = float(tank.x) 

437 

438 # Convert to per-unit 

439 r_pu = r_ohms / z_base if z_base > 0 and r_ohms > 0 else 0.0 

440 x_pu = x_ohms / z_base if z_base > 0 and x_ohms > 0 else 0.0 

441 

442 return r_pu, x_pu