Coverage for src/distopf/cim_importer/cim_to_csv_converter.py: 90%

231 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1# cim_converter/cim_to_csv_converter.py 

2import logging 

3from pathlib import Path 

4import pandas as pd 

5import networkx as nx 

6from cimgraph.models import FeederModel 

7import cimgraph.utils as utils 

8from distopf.cim_importer.processors import ( 

9 LineProcessor, 

10 SwitchProcessor, 

11 TransformerProcessor, 

12 RegulatorProcessor, 

13 CapacitorProcessor, 

14 GeneratorProcessor, 

15 BusProcessor, 

16) 

17from distopf.cim_importer.validators import TopologyValidator 

18 

19_log = logging.getLogger(__name__) 

20 

21 

22def load_cim_model( 

23 cim_file: str | Path, s_base: float = 1e6 

24) -> dict[str, pd.DataFrame]: 

25 converter = CIMToCSVConverter(cim_file, s_base=s_base) 

26 return converter.convert(validate=True) 

27 

28 

29class CIMToCSVConverter: 

30 """Main converter class that orchestrates the conversion process.""" 

31 

32 def __init__(self, cim_file: str | Path, s_base: float = 1e6): 

33 self.cim_file = Path(cim_file) 

34 self.s_base = s_base 

35 self.network = None 

36 self.processors = { 

37 "line": LineProcessor(s_base), 

38 "switch": SwitchProcessor(s_base), 

39 "xfm": TransformerProcessor(s_base), 

40 "reg": RegulatorProcessor(s_base), 

41 "cap": CapacitorProcessor(s_base), 

42 "gen": GeneratorProcessor(s_base), 

43 "bus": BusProcessor(s_base), 

44 } 

45 self.validator = TopologyValidator() 

46 

47 def load_network(self): 

48 """Load CIM network from file.""" 

49 import os 

50 

51 os.environ["CIMG_CIM_PROFILE"] = "cimhub_2023" 

52 from cimgraph.databases import XMLFile 

53 import cimgraph.data_profile.cimhub_2023 as cim 

54 

55 file = XMLFile(filename=self.cim_file) 

56 self.network = FeederModel(container=cim.Feeder(), connection=file) 

57 utils.get_all_data(self.network) 

58 _log.info("Network loaded successfully") 

59 

60 def convert(self, validate: bool = True) -> dict: 

61 """Main conversion method.""" 

62 if self.network is None: 

63 self.load_network() 

64 assert self.network is not None 

65 _log.info("Starting conversion process...") 

66 

67 # Process all components into dataframes 

68 data = dict( 

69 bus_data=pd.DataFrame(self.processors["bus"].process(self.network)), 

70 branch_data=self._process_branch_data(), 

71 gen_data=pd.DataFrame(self.processors["gen"].process(self.network)), 

72 cap_data=pd.DataFrame(self.processors["cap"].process(self.network)), 

73 reg_data=pd.DataFrame(self.processors["reg"].process(self.network)), 

74 ) 

75 

76 validation_result = None 

77 if validate: 

78 validation_result = self.validator.validate_tree_topology( 

79 data["branch_data"] 

80 ) 

81 if validation_result and not validation_result["valid"]: 

82 _log.warning( 

83 f"Topology validation failed: {validation_result['issues']}" 

84 ) 

85 if validation_result and len(validation_result["warnings"]) > 0: 

86 _log.warning(f"Topology warnings: {validation_result['warnings']}") 

87 

88 data = self._link_dataframes(data) 

89 if len(data["gen_data"]) > 0: 

90 data["gen_data"] = self._correct_generator_phases( 

91 data["bus_data"], data["gen_data"] 

92 ) 

93 data["gen_data"] = self._aggregate_generators(data["gen_data"]) 

94 data["bus_data"] = self._convert_secondary_loads(data["bus_data"]) 

95 

96 return data 

97 

98 def _process_branch_data(self) -> pd.DataFrame: 

99 """Process all branch data (lines, switches, transformers).""" 

100 _log.info("Processing branch data...") 

101 all_data = [] 

102 all_data.extend(self.processors["line"].process_branch(self.network)) 

103 all_data.extend(self.processors["switch"].process_branch(self.network)) 

104 all_data.extend(self.processors["xfm"].process_branch(self.network)) 

105 all_data.extend(self.processors["reg"].process_branch(self.network)) 

106 _log.info(f"Processed {len(all_data)} branch entries") 

107 return pd.DataFrame(all_data) 

108 

109 def _fix_bus_phases_from_branches( 

110 self, bus_data: pd.DataFrame, branch_data: pd.DataFrame 

111 ) -> pd.DataFrame: 

112 """Post-process bus phases based on connected branch phases using pandas operations.""" 

113 if branch_data.empty: 

114 return bus_data 

115 

116 # Create phase mapping from 'to' buses 

117 to_phases = branch_data[["tb", "phases"]].rename(columns={"tb": "id"}) 

118 # For bus id 1 (swing bus), use 'from' connection phases 

119 fb_1 = branch_data[branch_data["fb"] == 1][["fb", "phases"]].rename( 

120 columns={"fb": "id"} 

121 ) 

122 # Combine and merge with bus data 

123 phase_updates = pd.concat([fb_1, to_phases]).drop_duplicates("id", keep="last") 

124 bus_data = bus_data.merge( 

125 phase_updates, on="id", how="left", suffixes=("", "_new") 

126 ) 

127 bus_data["phases"] = bus_data["phases_new"] 

128 bus_data.drop("phases_new", axis=1, inplace=True) 

129 return bus_data 

130 

131 def _fix_downstream_phase_consistency( 

132 self, branch_data: pd.DataFrame 

133 ) -> pd.DataFrame: 

134 """Fix cases where downstream branches have more phases than upstream branches.""" 

135 if branch_data.empty: 

136 return branch_data 

137 

138 # Create upstream mapping (fb -> upstream_phases) 

139 upstream_map = branch_data.set_index("tb")["phases"] 

140 # Map upstream phases and apply intersection 

141 branch_data["upstream_phases"] = branch_data["fb"].map(upstream_map) 

142 mask = branch_data["upstream_phases"].notna() 

143 branch_data.loc[mask, "phases"] = branch_data.loc[mask].apply( 

144 lambda x: "".join( 

145 sorted(set(x["phases"]).intersection(set(x["upstream_phases"]))) 

146 ), 

147 axis=1, 

148 ) 

149 branch_data.drop("upstream_phases", axis=1, inplace=True) 

150 return branch_data 

151 

152 def _link_dataframes(self, data): 

153 bus_df = data["bus_data"] 

154 branch_df = data["branch_data"] 

155 gen_df = data["gen_data"] 

156 cap_df = data["cap_data"] 

157 reg_df = data["reg_data"] 

158 """Create DFS-based integer IDs and link all dataframes.""" 

159 if bus_df.empty or branch_df.empty: 

160 _log.warning("Bus or branch data is empty, cannot perform linking.") 

161 return bus_df, branch_df, reg_df, cap_df, gen_df 

162 

163 swing_bus_series = bus_df[bus_df["bus_type"] == "SWING"] 

164 if swing_bus_series.empty: 

165 raise ValueError( 

166 "No SWING bus found. Cannot determine network root for DFS." 

167 ) 

168 

169 root_bus_name = swing_bus_series["name"].iloc[0] 

170 graph = nx.Graph() 

171 for _, row in branch_df.iterrows(): 

172 graph.add_edge(row["from_name"], row["to_name"]) 

173 

174 if root_bus_name not in graph: 

175 _log.warning( 

176 f"Swing bus '{root_bus_name}' not found in the graph constructed from branches. " 

177 "Using arbitrary node order." 

178 ) 

179 all_nodes = list(bus_df["name"]) 

180 dfs_nodes = [node for node in all_nodes if node in graph] 

181 if root_bus_name not in dfs_nodes: 

182 dfs_nodes.insert(0, root_bus_name) 

183 if root_bus_name in graph: 

184 dfs_nodes = list(nx.dfs_preorder_nodes(graph, source=root_bus_name)) 

185 

186 bus_id_map = {name: i + 1 for i, name in enumerate(dfs_nodes)} 

187 bus_df["id"] = bus_df["name"].map(bus_id_map) 

188 bus_df.dropna(subset=["id"], inplace=True) 

189 bus_df["id"] = bus_df["id"].astype(int) 

190 

191 if not branch_df.empty: 

192 branch_df["fb"] = branch_df["from_name"].map(bus_id_map).astype(int) 

193 branch_df["tb"] = branch_df["to_name"].map(bus_id_map).astype(int) 

194 

195 if not reg_df.empty: 

196 reg_df["fb"] = reg_df["from_name"].map(bus_id_map).astype(int) 

197 reg_df["tb"] = reg_df["to_name"].map(bus_id_map).astype(int) 

198 

199 if not cap_df.empty: 

200 cap_df["id"] = cap_df["id"].map(bus_id_map).astype(int) 

201 

202 if not gen_df.empty: 

203 gen_df["id"] = gen_df["id"].map(bus_id_map).astype(int) 

204 try: 

205 branch_df = self._fix_downstream_phase_consistency(branch_df) 

206 except pd.errors.InvalidIndexError: 

207 _log.warning("Cannot fix phase consistency due to network having loops.") 

208 bus_df = self._fix_bus_phases_from_branches(bus_df, branch_df) 

209 bus_df = bus_df.sort_values("id") 

210 data["bus_data"] = bus_df 

211 data["branch_data"] = branch_df 

212 data["gen_data"] = gen_df 

213 data["cap_data"] = cap_df 

214 data["reg_data"] = reg_df 

215 return data 

216 

217 def _aggregate_generators(self, gen_df) -> pd.DataFrame: 

218 if len(gen_df) == 0: 

219 return gen_df 

220 """Aggregate generators by bus using pandas groupby.""" 

221 original_count = len(gen_df) 

222 # Define aggregation functions for each column 

223 agg_dict = { 

224 "mrid": lambda x: "|".join(x), # Combine mRIDs with pipe separator 

225 "name": lambda x: f"AggGen_{x.iloc[0]}_{len(x)}units" 

226 if len(x) > 1 

227 else x.iloc[0], 

228 "pa": "sum", 

229 "pb": "sum", 

230 "pc": "sum", 

231 "qa": "sum", 

232 "qb": "sum", 

233 "qc": "sum", 

234 "s_base": "sum", # Sum the individual capacities 

235 "sa_max": "sum", 

236 "sb_max": "sum", 

237 "sc_max": "sum", 

238 "phases": lambda x: "".join( 

239 sorted(set("".join(x))) 

240 ), # Combine unique phases 

241 "qa_max": "sum", 

242 "qb_max": "sum", 

243 "qc_max": "sum", 

244 "qa_min": "sum", 

245 "qb_min": "sum", 

246 "qc_min": "sum", # Note: sums minimums (typically negative) 

247 "control_variable": "first", # Use first control variable 

248 } 

249 # Only aggregate columns that exist in the dataframe 

250 available_agg_dict = { 

251 col: agg_func for col, agg_func in agg_dict.items() if col in gen_df.columns 

252 } 

253 # Group by bus ID and aggregate 

254 aggregated_gen_df = gen_df.groupby("id", as_index=False).agg(available_agg_dict) 

255 _log.info( 

256 f"Generator aggregation: {original_count} individual -> {len(aggregated_gen_df)} aggregated" 

257 ) 

258 return aggregated_gen_df 

259 

260 def _get_generator_columns(self): 

261 """Get standard generator data columns for ordering.""" 

262 return [ 

263 "mrid", 

264 "id", 

265 "name", 

266 "pa", 

267 "pb", 

268 "pc", 

269 "qa", 

270 "qb", 

271 "qc", 

272 "s_base", 

273 "sa_max", 

274 "sb_max", 

275 "sc_max", 

276 "phases", 

277 "qa_max", 

278 "qb_max", 

279 "qc_max", 

280 "qa_min", 

281 "qb_min", 

282 "qc_min", 

283 "ps1", 

284 "ps2", 

285 "qs1", 

286 "qs2", 

287 "ss1_max", 

288 "ss2_max", 

289 "qs1_max", 

290 "qs2_max", 

291 "qs1_min", 

292 "qs2_min", 

293 "control_variable", 

294 ] 

295 

296 def _correct_generator_phases(self, bus_df: pd.DataFrame, gen_df: pd.DataFrame): 

297 gen_df = self._convert_secondary_gens(bus_df, gen_df) 

298 gen_df = self._correct_generators_without_phases(bus_df, gen_df) 

299 gen_df = self._distribute_phase_parameters(gen_df) 

300 return gen_df 

301 

302 def _correct_generators_without_phases( 

303 self, bus_df: pd.DataFrame, gen_df: pd.DataFrame 

304 ): 

305 if len(gen_df) == 0: 

306 return gen_df 

307 mask = gen_df.phases.str.len() == 0 

308 gen_df.loc[mask, "phases"] = gen_df.loc[mask, "id"].apply( 

309 lambda x: bus_df.loc[bus_df.id == x, "phases"].to_list()[0] 

310 ) 

311 num_phases = gen_df.loc[mask, "phases"].str.len() 

312 mask_a = mask & gen_df.phases.str.contains("a") 

313 mask_b = mask & gen_df.phases.str.contains("b") 

314 mask_c = mask & gen_df.phases.str.contains("c") 

315 gen_df.loc[mask_a, "pa"] = gen_df.loc[mask_a, "p"] / num_phases 

316 gen_df.loc[mask_b, "pb"] = gen_df.loc[mask_b, "p"] / num_phases 

317 gen_df.loc[mask_c, "pc"] = gen_df.loc[mask_c, "p"] / num_phases 

318 gen_df.loc[mask_a, "qa"] = gen_df.loc[mask_a, "q"] / num_phases 

319 gen_df.loc[mask_b, "qb"] = gen_df.loc[mask_b, "q"] / num_phases 

320 gen_df.loc[mask_c, "qc"] = gen_df.loc[mask_c, "q"] / num_phases 

321 return gen_df 

322 

323 def _distribute_phase_parameters(self, gen_df: pd.DataFrame): 

324 if len(gen_df) == 0: 

325 return gen_df 

326 num_phases = gen_df.loc[:, "phases"].str.len() 

327 mask_a = gen_df.phases.str.contains("a") 

328 mask_b = gen_df.phases.str.contains("b") 

329 mask_c = gen_df.phases.str.contains("c") 

330 gen_df.loc[mask_a, "sa_max"] = gen_df.loc[mask_a, "s_max"] / num_phases 

331 gen_df.loc[mask_b, "sb_max"] = gen_df.loc[mask_b, "s_max"] / num_phases 

332 gen_df.loc[mask_c, "sc_max"] = gen_df.loc[mask_c, "s_max"] / num_phases 

333 gen_df.loc[mask_a, "qa_max"] = gen_df.loc[mask_a, "q_max"] / num_phases 

334 gen_df.loc[mask_b, "qb_max"] = gen_df.loc[mask_b, "q_max"] / num_phases 

335 gen_df.loc[mask_c, "qc_max"] = gen_df.loc[mask_c, "q_max"] / num_phases 

336 gen_df.loc[mask_a, "qa_min"] = gen_df.loc[mask_a, "q_min"] / num_phases 

337 gen_df.loc[mask_b, "qb_min"] = gen_df.loc[mask_b, "q_min"] / num_phases 

338 gen_df.loc[mask_c, "qc_min"] = gen_df.loc[mask_c, "q_min"] / num_phases 

339 return gen_df 

340 

341 def _convert_secondary_gens(self, bus_df: pd.DataFrame, gen_df: pd.DataFrame): 

342 if len(gen_df) == 0: 

343 return gen_df 

344 if not ( 

345 "ps1" in gen_df.keys() 

346 and "ps2" in gen_df.keys() 

347 and "qs1" in gen_df.keys() 

348 and "qs2" in gen_df.keys() 

349 ): 

350 return gen_df 

351 mask = ( 

352 (gen_df.ps1.abs() > 0) 

353 | (gen_df.ps2.abs() > 0) 

354 | (gen_df.qs1.abs() > 0) 

355 | (gen_df.qs2.abs() > 0) 

356 ) 

357 # change s1s2 to a, b, or c primary phase 

358 gen_df.loc[mask, "phases"] = gen_df.loc[mask, "id"].apply( 

359 lambda x: bus_df.loc[bus_df.id == x, "phases"].to_list()[0] 

360 ) 

361 # move values from s1 s2 to primary phase columns 

362 mask_a = mask & gen_df.phases.str.contains("a") 

363 mask_b = mask & gen_df.phases.str.contains("b") 

364 mask_c = mask & gen_df.phases.str.contains("c") 

365 gen_df.loc[mask_a, "pa"] = gen_df.loc[mask_a, ["ps1", "ps2"]].sum(axis=1) 

366 gen_df.loc[mask_b, "pb"] = gen_df.loc[mask_b, ["ps1", "ps2"]].sum(axis=1) 

367 gen_df.loc[mask_c, "pc"] = gen_df.loc[mask_c, ["ps1", "ps2"]].sum(axis=1) 

368 gen_df.loc[mask_a, "qa"] = gen_df.loc[mask_a, ["qs1", "qs2"]].sum(axis=1) 

369 gen_df.loc[mask_b, "qb"] = gen_df.loc[mask_b, ["qs1", "qs2"]].sum(axis=1) 

370 gen_df.loc[mask_c, "qc"] = gen_df.loc[mask_c, ["qs1", "qs2"]].sum(axis=1) 

371 return gen_df 

372 

373 def _convert_secondary_loads(self, bus_df: pd.DataFrame): 

374 """Transfer loads from s1, and s2 secondary phases to the appropriate phase.""" 

375 mask = ( 

376 (bus_df.pl_s1 > 0) 

377 | (bus_df.pl_s2 > 0) 

378 | (bus_df.ql_s1 > 0) 

379 | (bus_df.ql_s2 > 0) 

380 ) 

381 _df = bus_df.loc[mask] 

382 for phase, _id, p1, p2, q1, q2 in zip( 

383 _df.phases, _df.id, _df.pl_s1, _df.pl_s2, _df.ql_s1, _df.ql_s2 

384 ): 

385 bus_df.loc[bus_df.id == int(_id), [f"pl_{phase}"]] = p1 + p2 

386 bus_df.loc[bus_df.id == int(_id), [f"ql_{phase}"]] = q1 + q2 

387 return bus_df 

388 

389 def save(self, results: dict[str, pd.DataFrame], output_dir: str | Path): 

390 """Save processed data to CSV files.""" 

391 self.output_dir = Path(output_dir) 

392 self.output_dir.mkdir(parents=True, exist_ok=True) 

393 

394 bus_cols = self._get_bus_columns() 

395 branch_cols = self._get_branch_columns() 

396 

397 bus_data = results["bus_data"] 

398 # Save bus data 

399 if bus_data.empty: 

400 pd.DataFrame(columns=bus_cols).to_csv( 

401 self.output_dir / "bus_data.csv", index=False 

402 ) 

403 if not bus_data.empty: 

404 bus_data.to_csv( 

405 self.output_dir / "bus_data.csv", index=False, columns=bus_cols 

406 ) 

407 _log.info(f"Saved bus data to {self.output_dir / 'bus_data.csv'}") 

408 

409 branch_data = results["branch_data"] 

410 # Save branch data 

411 if branch_data.empty: 

412 pd.DataFrame(columns=branch_cols).to_csv( 

413 self.output_dir / "branch_data.csv", index=False 

414 ) 

415 if not branch_data.empty: 

416 branch_data.to_csv( 

417 self.output_dir / "branch_data.csv", index=False, columns=branch_cols 

418 ) 

419 _log.info(f"Saved branch data to {self.output_dir / 'branch_data.csv'}") 

420 

421 regulator_data = results["reg_data"] 

422 if not regulator_data.empty: 

423 regulator_data.to_csv(self.output_dir / "reg_data.csv", index=False) 

424 capacitor_data = results["cap_data"] 

425 if not capacitor_data.empty: 

426 capacitor_data.to_csv(self.output_dir / "cap_data.csv", index=False) 

427 generator_data = results["gen_data"] 

428 if not generator_data.empty: 

429 generator_data.to_csv(self.output_dir / "gen_data.csv", index=False) 

430 _log.info(f"Saved generator data to {self.output_dir / 'gen_data.csv'}") 

431 

432 def _get_branch_columns(self): 

433 """Get standard branch data columns for ordering.""" 

434 return [ 

435 "name", 

436 "fb", 

437 "tb", 

438 "from_name", 

439 "to_name", 

440 "raa", 

441 "rab", 

442 "rac", 

443 "rbb", 

444 "rbc", 

445 "rcc", 

446 "xaa", 

447 "xab", 

448 "xac", 

449 "xbb", 

450 "xbc", 

451 "xcc", 

452 "type", 

453 "status", 

454 "s_base", 

455 "v_ln_base", 

456 "z_base", 

457 "phases", 

458 "length", 

459 ] 

460 

461 def _get_bus_columns(self): 

462 """Get standard bus data columns for ordering.""" 

463 return [ 

464 "mrid", 

465 "id", 

466 "name", 

467 "pl_a", 

468 "ql_a", 

469 "pl_b", 

470 "ql_b", 

471 "pl_c", 

472 "ql_c", 

473 "bus_type", 

474 "v_a", 

475 "v_b", 

476 "v_c", 

477 "v_ln_base", 

478 "s_base", 

479 "v_min", 

480 "v_max", 

481 "cvr_p", 

482 "cvr_q", 

483 "phases", 

484 "latitude", 

485 "longitude", 

486 ]