Coverage for src/distopf/cim_converter/cim_to_csv_converter.py: 90%
231 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-09 17:44 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-09 17:44 -0700
1# cim_converter/cim_to_csv_converter.py
2import logging
3from pathlib import Path
4import pandas as pd
5import networkx as nx
6from cimgraph.models import FeederModel
7import cimgraph.utils as utils
8from distopf.cim_converter.processors import (
9 LineProcessor,
10 SwitchProcessor,
11 TransformerProcessor,
12 RegulatorProcessor,
13 CapacitorProcessor,
14 GeneratorProcessor,
15 BusProcessor,
16)
17from distopf.cim_converter.validators import TopologyValidator
19_log = logging.getLogger(__name__)
22def load_cim_model(
23 cim_file: str | Path, s_base: float = 1e6
24) -> dict[str, pd.DataFrame]:
25 converter = CIMToCSVConverter(cim_file, s_base=s_base)
26 return converter.convert(validate=True)
29class CIMToCSVConverter:
30 """Main converter class that orchestrates the conversion process."""
32 def __init__(self, cim_file: str | Path, s_base: float = 1e6):
33 self.cim_file = Path(cim_file)
34 self.s_base = s_base
35 self.network = None
36 self.processors = {
37 "line": LineProcessor(s_base),
38 "switch": SwitchProcessor(s_base),
39 "xfm": TransformerProcessor(s_base),
40 "reg": RegulatorProcessor(s_base),
41 "cap": CapacitorProcessor(s_base),
42 "gen": GeneratorProcessor(s_base),
43 "bus": BusProcessor(s_base),
44 }
45 self.validator = TopologyValidator()
47 def load_network(self):
48 """Load CIM network from file."""
49 import os
51 os.environ["CIMG_CIM_PROFILE"] = "cimhub_2023"
52 from cimgraph.databases import XMLFile
53 import cimgraph.data_profile.cimhub_2023 as cim
55 file = XMLFile(filename=self.cim_file)
56 self.network = FeederModel(container=cim.Feeder(), connection=file)
57 utils.get_all_data(self.network)
58 _log.info("Network loaded successfully")
60 def convert(self, validate: bool = True) -> dict:
61 """Main conversion method."""
62 if self.network is None:
63 self.load_network()
64 assert self.network is not None
65 _log.info("Starting conversion process...")
67 # Process all components into dataframes
68 data = dict(
69 bus_data=pd.DataFrame(self.processors["bus"].process(self.network)),
70 branch_data=self._process_branch_data(),
71 gen_data=pd.DataFrame(self.processors["gen"].process(self.network)),
72 cap_data=pd.DataFrame(self.processors["cap"].process(self.network)),
73 reg_data=pd.DataFrame(self.processors["reg"].process(self.network)),
74 )
76 validation_result = None
77 if validate:
78 validation_result = self.validator.validate_tree_topology(
79 data["branch_data"]
80 )
81 if validation_result and not validation_result["valid"]:
82 _log.warning(
83 f"Topology validation failed: {validation_result['issues']}"
84 )
85 if validation_result and len(validation_result["warnings"]) > 0:
86 _log.warning(f"Topology warnings: {validation_result['warnings']}")
88 data = self._link_dataframes(data)
89 if len(data["gen_data"]) > 0:
90 data["gen_data"] = self._correct_generator_phases(
91 data["bus_data"], data["gen_data"]
92 )
93 data["gen_data"] = self._aggregate_generators(data["gen_data"])
94 data["bus_data"] = self._convert_secondary_loads(data["bus_data"])
96 return data
98 def _process_branch_data(self) -> pd.DataFrame:
99 """Process all branch data (lines, switches, transformers)."""
100 _log.info("Processing branch data...")
101 all_data = []
102 all_data.extend(self.processors["line"].process_branch(self.network))
103 all_data.extend(self.processors["switch"].process_branch(self.network))
104 all_data.extend(self.processors["xfm"].process_branch(self.network))
105 all_data.extend(self.processors["reg"].process_branch(self.network))
106 _log.info(f"Processed {len(all_data)} branch entries")
107 return pd.DataFrame(all_data)
109 def _fix_bus_phases_from_branches(
110 self, bus_data: pd.DataFrame, branch_data: pd.DataFrame
111 ) -> pd.DataFrame:
112 """Post-process bus phases based on connected branch phases using pandas operations."""
113 if branch_data.empty:
114 return bus_data
116 # Create phase mapping from 'to' buses
117 to_phases = branch_data[["tb", "phases"]].rename(columns={"tb": "id"})
118 # For bus id 1 (swing bus), use 'from' connection phases
119 fb_1 = branch_data[branch_data["fb"] == 1][["fb", "phases"]].rename(
120 columns={"fb": "id"}
121 )
122 # Combine and merge with bus data
123 phase_updates = pd.concat([fb_1, to_phases]).drop_duplicates("id", keep="last")
124 bus_data = bus_data.merge(
125 phase_updates, on="id", how="left", suffixes=("", "_new")
126 )
127 bus_data["phases"] = bus_data["phases_new"]
128 bus_data.drop("phases_new", axis=1, inplace=True)
129 return bus_data
131 def _fix_downstream_phase_consistency(
132 self, branch_data: pd.DataFrame
133 ) -> pd.DataFrame:
134 """Fix cases where downstream branches have more phases than upstream branches."""
135 if branch_data.empty:
136 return branch_data
138 # Create upstream mapping (fb -> upstream_phases)
139 upstream_map = branch_data.set_index("tb")["phases"]
140 # Map upstream phases and apply intersection
141 branch_data["upstream_phases"] = branch_data["fb"].map(upstream_map)
142 mask = branch_data["upstream_phases"].notna()
143 branch_data.loc[mask, "phases"] = branch_data.loc[mask].apply(
144 lambda x: "".join(
145 sorted(set(x["phases"]).intersection(set(x["upstream_phases"])))
146 ),
147 axis=1,
148 )
149 branch_data.drop("upstream_phases", axis=1, inplace=True)
150 return branch_data
152 def _link_dataframes(self, data):
153 bus_df = data["bus_data"]
154 branch_df = data["branch_data"]
155 gen_df = data["gen_data"]
156 cap_df = data["cap_data"]
157 reg_df = data["reg_data"]
158 """Create DFS-based integer IDs and link all dataframes."""
159 if bus_df.empty or branch_df.empty:
160 _log.warning("Bus or branch data is empty, cannot perform linking.")
161 return bus_df, branch_df, reg_df, cap_df, gen_df
163 swing_bus_series = bus_df[bus_df["bus_type"] == "SWING"]
164 if swing_bus_series.empty:
165 raise ValueError(
166 "No SWING bus found. Cannot determine network root for DFS."
167 )
169 root_bus_name = swing_bus_series["name"].iloc[0]
170 graph = nx.Graph()
171 for _, row in branch_df.iterrows():
172 graph.add_edge(row["from_name"], row["to_name"])
174 if root_bus_name not in graph:
175 _log.warning(
176 f"Swing bus '{root_bus_name}' not found in the graph constructed from branches. "
177 "Using arbitrary node order."
178 )
179 all_nodes = list(bus_df["name"])
180 dfs_nodes = [node for node in all_nodes if node in graph]
181 if root_bus_name not in dfs_nodes:
182 dfs_nodes.insert(0, root_bus_name)
183 if root_bus_name in graph:
184 dfs_nodes = list(nx.dfs_preorder_nodes(graph, source=root_bus_name))
186 bus_id_map = {name: i + 1 for i, name in enumerate(dfs_nodes)}
187 bus_df["id"] = bus_df["name"].map(bus_id_map)
188 bus_df.dropna(subset=["id"], inplace=True)
189 bus_df["id"] = bus_df["id"].astype(int)
191 if not branch_df.empty:
192 branch_df["fb"] = branch_df["from_name"].map(bus_id_map).astype(int)
193 branch_df["tb"] = branch_df["to_name"].map(bus_id_map).astype(int)
195 if not reg_df.empty:
196 reg_df["fb"] = reg_df["from_name"].map(bus_id_map).astype(int)
197 reg_df["tb"] = reg_df["to_name"].map(bus_id_map).astype(int)
199 if not cap_df.empty:
200 cap_df["id"] = cap_df["id"].map(bus_id_map).astype(int)
202 if not gen_df.empty:
203 gen_df["id"] = gen_df["id"].map(bus_id_map).astype(int)
204 try:
205 branch_df = self._fix_downstream_phase_consistency(branch_df)
206 except pd.errors.InvalidIndexError:
207 _log.warning("Cannot fix phase consistency due to network having loops.")
208 bus_df = self._fix_bus_phases_from_branches(bus_df, branch_df)
209 bus_df = bus_df.sort_values("id")
210 data["bus_data"] = bus_df
211 data["branch_data"] = branch_df
212 data["gen_data"] = gen_df
213 data["cap_data"] = cap_df
214 data["reg_data"] = reg_df
215 return data
217 def _aggregate_generators(self, gen_df) -> pd.DataFrame:
218 if len(gen_df) == 0:
219 return gen_df
220 """Aggregate generators by bus using pandas groupby."""
221 original_count = len(gen_df)
222 # Define aggregation functions for each column
223 agg_dict = {
224 "mrid": lambda x: "|".join(x), # Combine mRIDs with pipe separator
225 "name": lambda x: f"AggGen_{x.iloc[0]}_{len(x)}units"
226 if len(x) > 1
227 else x.iloc[0],
228 "pa": "sum",
229 "pb": "sum",
230 "pc": "sum",
231 "qa": "sum",
232 "qb": "sum",
233 "qc": "sum",
234 "s_base": "sum", # Sum the individual capacities
235 "sa_max": "sum",
236 "sb_max": "sum",
237 "sc_max": "sum",
238 "phases": lambda x: "".join(
239 sorted(set("".join(x)))
240 ), # Combine unique phases
241 "qa_max": "sum",
242 "qb_max": "sum",
243 "qc_max": "sum",
244 "qa_min": "sum",
245 "qb_min": "sum",
246 "qc_min": "sum", # Note: sums minimums (typically negative)
247 "control_variable": "first", # Use first control variable
248 }
249 # Only aggregate columns that exist in the dataframe
250 available_agg_dict = {
251 col: agg_func for col, agg_func in agg_dict.items() if col in gen_df.columns
252 }
253 # Group by bus ID and aggregate
254 aggregated_gen_df = gen_df.groupby("id", as_index=False).agg(available_agg_dict)
255 _log.info(
256 f"Generator aggregation: {original_count} individual -> {len(aggregated_gen_df)} aggregated"
257 )
258 return aggregated_gen_df
260 def _get_generator_columns(self):
261 """Get standard generator data columns for ordering."""
262 return [
263 "mrid",
264 "id",
265 "name",
266 "pa",
267 "pb",
268 "pc",
269 "qa",
270 "qb",
271 "qc",
272 "s_base",
273 "sa_max",
274 "sb_max",
275 "sc_max",
276 "phases",
277 "qa_max",
278 "qb_max",
279 "qc_max",
280 "qa_min",
281 "qb_min",
282 "qc_min",
283 "ps1",
284 "ps2",
285 "qs1",
286 "qs2",
287 "ss1_max",
288 "ss2_max",
289 "qs1_max",
290 "qs2_max",
291 "qs1_min",
292 "qs2_min",
293 "control_variable",
294 ]
296 def _correct_generator_phases(self, bus_df: pd.DataFrame, gen_df: pd.DataFrame):
297 gen_df = self._convert_secondary_gens(bus_df, gen_df)
298 gen_df = self._correct_generators_without_phases(bus_df, gen_df)
299 gen_df = self._distribute_phase_parameters(gen_df)
300 return gen_df
302 def _correct_generators_without_phases(
303 self, bus_df: pd.DataFrame, gen_df: pd.DataFrame
304 ):
305 if len(gen_df) == 0:
306 return gen_df
307 mask = gen_df.phases.str.len() == 0
308 gen_df.loc[mask, "phases"] = gen_df.loc[mask, "id"].apply(
309 lambda x: bus_df.loc[bus_df.id == x, "phases"].to_list()[0]
310 )
311 num_phases = gen_df.loc[mask, "phases"].str.len()
312 mask_a = mask & gen_df.phases.str.contains("a")
313 mask_b = mask & gen_df.phases.str.contains("b")
314 mask_c = mask & gen_df.phases.str.contains("c")
315 gen_df.loc[mask_a, "pa"] = gen_df.loc[mask_a, "p"] / num_phases
316 gen_df.loc[mask_b, "pb"] = gen_df.loc[mask_b, "p"] / num_phases
317 gen_df.loc[mask_c, "pc"] = gen_df.loc[mask_c, "p"] / num_phases
318 gen_df.loc[mask_a, "qa"] = gen_df.loc[mask_a, "q"] / num_phases
319 gen_df.loc[mask_b, "qb"] = gen_df.loc[mask_b, "q"] / num_phases
320 gen_df.loc[mask_c, "qc"] = gen_df.loc[mask_c, "q"] / num_phases
321 return gen_df
323 def _distribute_phase_parameters(self, gen_df: pd.DataFrame):
324 if len(gen_df) == 0:
325 return gen_df
326 num_phases = gen_df.loc[:, "phases"].str.len()
327 mask_a = gen_df.phases.str.contains("a")
328 mask_b = gen_df.phases.str.contains("b")
329 mask_c = gen_df.phases.str.contains("c")
330 gen_df.loc[mask_a, "sa_max"] = gen_df.loc[mask_a, "s_max"] / num_phases
331 gen_df.loc[mask_b, "sb_max"] = gen_df.loc[mask_b, "s_max"] / num_phases
332 gen_df.loc[mask_c, "sc_max"] = gen_df.loc[mask_c, "s_max"] / num_phases
333 gen_df.loc[mask_a, "qa_max"] = gen_df.loc[mask_a, "q_max"] / num_phases
334 gen_df.loc[mask_b, "qb_max"] = gen_df.loc[mask_b, "q_max"] / num_phases
335 gen_df.loc[mask_c, "qc_max"] = gen_df.loc[mask_c, "q_max"] / num_phases
336 gen_df.loc[mask_a, "qa_min"] = gen_df.loc[mask_a, "q_min"] / num_phases
337 gen_df.loc[mask_b, "qb_min"] = gen_df.loc[mask_b, "q_min"] / num_phases
338 gen_df.loc[mask_c, "qc_min"] = gen_df.loc[mask_c, "q_min"] / num_phases
339 return gen_df
341 def _convert_secondary_gens(self, bus_df: pd.DataFrame, gen_df: pd.DataFrame):
342 if len(gen_df) == 0:
343 return gen_df
344 if not (
345 "ps1" in gen_df.keys()
346 and "ps2" in gen_df.keys()
347 and "qs1" in gen_df.keys()
348 and "qs2" in gen_df.keys()
349 ):
350 return gen_df
351 mask = (
352 (gen_df.ps1.abs() > 0)
353 | (gen_df.ps2.abs() > 0)
354 | (gen_df.qs1.abs() > 0)
355 | (gen_df.qs2.abs() > 0)
356 )
357 # change s1s2 to a, b, or c primary phase
358 gen_df.loc[mask, "phases"] = gen_df.loc[mask, "id"].apply(
359 lambda x: bus_df.loc[bus_df.id == x, "phases"].to_list()[0]
360 )
361 # move values from s1 s2 to primary phase columns
362 mask_a = mask & gen_df.phases.str.contains("a")
363 mask_b = mask & gen_df.phases.str.contains("b")
364 mask_c = mask & gen_df.phases.str.contains("c")
365 gen_df.loc[mask_a, "pa"] = gen_df.loc[mask_a, ["ps1", "ps2"]].sum(axis=1)
366 gen_df.loc[mask_b, "pb"] = gen_df.loc[mask_b, ["ps1", "ps2"]].sum(axis=1)
367 gen_df.loc[mask_c, "pc"] = gen_df.loc[mask_c, ["ps1", "ps2"]].sum(axis=1)
368 gen_df.loc[mask_a, "qa"] = gen_df.loc[mask_a, ["qs1", "qs2"]].sum(axis=1)
369 gen_df.loc[mask_b, "qb"] = gen_df.loc[mask_b, ["qs1", "qs2"]].sum(axis=1)
370 gen_df.loc[mask_c, "qc"] = gen_df.loc[mask_c, ["qs1", "qs2"]].sum(axis=1)
371 return gen_df
373 def _convert_secondary_loads(self, bus_df: pd.DataFrame):
374 """Transfer loads from s1, and s2 secondary phases to the appropriate phase."""
375 mask = (
376 (bus_df.pl_s1 > 0)
377 | (bus_df.pl_s2 > 0)
378 | (bus_df.ql_s1 > 0)
379 | (bus_df.ql_s2 > 0)
380 )
381 _df = bus_df.loc[mask]
382 for phase, _id, p1, p2, q1, q2 in zip(
383 _df.phases, _df.id, _df.pl_s1, _df.pl_s2, _df.ql_s1, _df.ql_s2
384 ):
385 bus_df.loc[bus_df.id == int(_id), [f"pl_{phase}"]] = p1 + p2
386 bus_df.loc[bus_df.id == int(_id), [f"ql_{phase}"]] = q1 + q2
387 return bus_df
389 def save(self, results: dict[str, pd.DataFrame], output_dir: str | Path):
390 """Save processed data to CSV files."""
391 self.output_dir = Path(output_dir)
392 self.output_dir.mkdir(parents=True, exist_ok=True)
394 bus_cols = self._get_bus_columns()
395 branch_cols = self._get_branch_columns()
397 bus_data = results["bus_data"]
398 # Save bus data
399 if bus_data.empty:
400 pd.DataFrame(columns=bus_cols).to_csv(
401 self.output_dir / "bus_data.csv", index=False
402 )
403 if not bus_data.empty:
404 bus_data.to_csv(
405 self.output_dir / "bus_data.csv", index=False, columns=bus_cols
406 )
407 _log.info(f"Saved bus data to {self.output_dir / 'bus_data.csv'}")
409 branch_data = results["branch_data"]
410 # Save branch data
411 if branch_data.empty:
412 pd.DataFrame(columns=branch_cols).to_csv(
413 self.output_dir / "branch_data.csv", index=False
414 )
415 if not branch_data.empty:
416 branch_data.to_csv(
417 self.output_dir / "branch_data.csv", index=False, columns=branch_cols
418 )
419 _log.info(f"Saved branch data to {self.output_dir / 'branch_data.csv'}")
421 regulator_data = results["reg_data"]
422 if not regulator_data.empty:
423 regulator_data.to_csv(self.output_dir / "reg_data.csv", index=False)
424 capacitor_data = results["cap_data"]
425 if not capacitor_data.empty:
426 capacitor_data.to_csv(self.output_dir / "cap_data.csv", index=False)
427 generator_data = results["gen_data"]
428 if not generator_data.empty:
429 generator_data.to_csv(self.output_dir / "gen_data.csv", index=False)
430 _log.info(f"Saved generator data to {self.output_dir / 'gen_data.csv'}")
432 def _get_branch_columns(self):
433 """Get standard branch data columns for ordering."""
434 return [
435 "name",
436 "fb",
437 "tb",
438 "from_name",
439 "to_name",
440 "raa",
441 "rab",
442 "rac",
443 "rbb",
444 "rbc",
445 "rcc",
446 "xaa",
447 "xab",
448 "xac",
449 "xbb",
450 "xbc",
451 "xcc",
452 "type",
453 "status",
454 "s_base",
455 "v_ln_base",
456 "z_base",
457 "phases",
458 "length",
459 ]
461 def _get_bus_columns(self):
462 """Get standard bus data columns for ordering."""
463 return [
464 "mrid",
465 "id",
466 "name",
467 "pl_a",
468 "ql_a",
469 "pl_b",
470 "ql_b",
471 "pl_c",
472 "ql_c",
473 "bus_type",
474 "v_a",
475 "v_b",
476 "v_c",
477 "v_ln_base",
478 "s_base",
479 "v_min",
480 "v_max",
481 "cvr_p",
482 "cvr_q",
483 "phases",
484 "latitude",
485 "longitude",
486 ]