Coverage for tests/cim_converter/integration/test_impedance_comparison.py: 82%
173 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
1# tests/integration/test_impedance_comparison.py
2from pathlib import Path
3import numpy as np
4import pandas as pd
5import pytest
6from distopf.cim_importer import CIMToCSVConverter
9def _find_matching_row(conv_branch_df: pd.DataFrame, from_name: str, to_name: str):
10 """Find a branch row in conv_branch_df that matches from/to names (either direction)."""
11 mask = (conv_branch_df["from_name"] == from_name) & (
12 conv_branch_df["to_name"] == to_name
13 )
14 if mask.any():
15 return conv_branch_df.loc[mask].iloc[0]
16 mask2 = (conv_branch_df["from_name"] == to_name) & (
17 conv_branch_df["to_name"] == from_name
18 )
19 if mask2.any():
20 return conv_branch_df.loc[mask2].iloc[0]
21 return None
24def _find_two_hop_candidate(conv_branch_df: pd.DataFrame, from_name: str, to_name: str):
25 """
26 Try to find an intermediate node X such that conv has edges (from_name <-> X) and (X <-> to_name).
27 Returns (edge1_row, edge2_row, intermediate_name) for the first candidate found, or None.
28 """
29 nodes = set(conv_branch_df["from_name"].dropna().unique()).union(
30 set(conv_branch_df["to_name"].dropna().unique())
31 )
32 for x in nodes:
33 if x == from_name or x == to_name:
34 continue
35 e1 = _find_matching_row(conv_branch_df, from_name, x)
36 if e1 is None:
37 continue
38 e2 = _find_matching_row(conv_branch_df, x, to_name)
39 if e2 is None:
40 continue
41 return e1, e2, x
42 return None
45def _phase_set(s):
46 if pd.isna(s):
47 return set()
48 if not isinstance(s, str):
49 return set()
50 return set(s)
53@pytest.mark.integration
54def test_branch_impedances_against_reference(tmp_path):
55 """
56 Compare impedance columns in converted branch_data to reference cases/ieee13/branch_data.csv.
57 This enhanced version attempts two-hop matching if direct branch isn't present (extra bus inserted).
58 Also requires phases to match exactly (not subset). Writes a CSV report of failures to tmp_path.
59 """
60 repo_root = Path(__file__).resolve().parents[3]
61 ref_branch_path = (
62 repo_root / "tests" / "cim_converter" / "data" / "ieee13" / "branch_data.csv"
63 )
65 ref_bus_path = (
66 repo_root / "tests" / "cim_converter" / "data" / "ieee13" / "bus_data.csv"
67 )
68 cim_path = repo_root / "tests" / "cim_converter" / "data" / "IEEE13.xml"
70 assert ref_branch_path.exists(), f"Reference branch file missing: {ref_branch_path}"
71 assert ref_bus_path.exists(), f"Reference bus file missing: {ref_bus_path}"
72 assert cim_path.exists(), f"CIM file missing: {cim_path}"
74 ref_branch_df = pd.read_csv(ref_branch_path)
75 ref_bus_df = pd.read_csv(ref_bus_path)
77 # Map ref id -> name
78 ref_id_to_name = {int(r["id"]): r["name"] for _, r in ref_bus_df.iterrows()}
80 out_dir = tmp_path / "csv_imp"
81 conv = CIMToCSVConverter(cim_file=str(cim_path))
82 results = conv.convert(validate=False)
83 conv.save(results, output_dir=str(out_dir))
84 conv_branch_df = results["branch_data"]
86 impedance_cols = ["raa", "rbb", "rcc", "xaa", "xbb", "xcc"]
87 matches = 0
88 total = 0
89 failures = []
91 for _, ref_row in ref_branch_df.iterrows():
92 total += 1
93 fb = int(ref_row["fb"])
94 tb = int(ref_row["tb"])
95 from_name = ref_id_to_name.get(fb)
96 to_name = ref_id_to_name.get(tb)
97 row_meta = {
98 "fb": fb,
99 "tb": tb,
100 "ref_name": ref_row.get("name", ""),
101 "ref_from": from_name,
102 "ref_to": to_name,
103 }
105 if from_name is None or to_name is None:
106 failures.append({**row_meta, "reason": "missing_ref_bus_name"})
107 continue
109 # Try direct match first
110 match_row = _find_matching_row(conv_branch_df, from_name, to_name)
111 used_mode = "direct"
112 e1 = match_row
113 e2 = None
115 if match_row is None:
116 candidate = _find_two_hop_candidate(conv_branch_df, from_name, to_name)
117 if candidate:
118 e1, e2, intermediate = candidate
119 used_mode = "two_hop"
120 else:
121 failures.append({**row_meta, "reason": "no_matching_conv_branch"})
122 continue
124 # Compile impedance sums for direct or two-hop (sum two segments)
125 edges = [e1] if used_mode == "direct" else [e1, e2]
126 summed = {}
127 for col in impedance_cols:
128 vals = []
129 for e in edges:
130 v = e.get(col) if col in e.index else np.nan
131 try:
132 vals.append(float(v))
133 except Exception:
134 vals.append(np.nan)
135 if all(np.isnan(v) for v in vals):
136 summed[col] = np.nan
137 else:
138 summed[col] = sum(0.0 if np.isnan(v) else v for v in vals)
140 # Phase handling (exact match required)
141 ref_phases = _phase_set(ref_row.get("phases", ""))
142 if used_mode == "direct":
143 conv_phases = _phase_set(e1.get("phases")) if e1 is not None else set()
144 phase_ok = ref_phases == conv_phases
145 else:
146 phases_e1 = _phase_set(e1.get("phases"))
147 phases_e2 = _phase_set(e2.get("phases"))
148 conv_phases_union = phases_e1.union(phases_e2)
149 phase_ok = ref_phases == conv_phases_union
151 # Compare impedances
152 impedance_ok = True
153 details = {}
154 max_rel_err = 0.0
155 for col in impedance_cols:
156 ref_val = ref_row.get(col, np.nan)
157 conv_val = summed.get(col, np.nan)
158 try:
159 ref_num = float(ref_val)
160 except Exception:
161 ref_num = np.nan
162 try:
163 conv_num = float(conv_val)
164 except Exception:
165 conv_num = np.nan
166 if np.isnan(ref_num) and np.isnan(conv_num):
167 details[col] = {
168 "status": "both_nan",
169 "ref": ref_num,
170 "conv": conv_num,
171 "rel_err": 0.0,
172 }
173 continue
174 if np.isnan(ref_num) or np.isnan(conv_num):
175 impedance_ok = False
176 details[col] = {
177 "status": "nan_mismatch",
178 "ref": ref_num,
179 "conv": conv_num,
180 "rel_err": np.inf,
181 }
182 max_rel_err = np.inf
183 continue
184 denom = max(abs(ref_num), 1e-12)
185 rel_err = abs((conv_num - ref_num) / denom)
186 if rel_err > max_rel_err:
187 max_rel_err = rel_err
188 # Updated tolerance per your request: use absolute tolerance 1e-3 as well as rtol 1e-3
189 if not np.isclose(ref_num, conv_num, rtol=1e-3, atol=1e-3):
190 impedance_ok = False
191 details[col] = {
192 "status": "diff",
193 "ref": ref_num,
194 "conv": conv_num,
195 "rel_err": rel_err,
196 }
197 else:
198 details[col] = {
199 "status": "ok",
200 "ref": ref_num,
201 "conv": conv_num,
202 "rel_err": rel_err,
203 }
205 # Also record phase details
206 if used_mode == "direct":
207 conv_phase_string = "".join(sorted(conv_phases))
208 else:
209 conv_phase_string = "".join(sorted(conv_phases_union))
210 details["phases"] = {
211 "ref": "".join(sorted(ref_phases)) if ref_phases else "",
212 "conv": conv_phase_string,
213 "ok": phase_ok,
214 }
216 if impedance_ok and phase_ok:
217 matches += 1
218 else:
219 # Reason: prefer phase mismatch as primary if it fails
220 if not phase_ok:
221 reason = "phase_mismatch"
222 else:
223 reason = "impedance_mismatch"
224 record = {
225 **row_meta,
226 "reason": reason,
227 "mode": used_mode,
228 "max_rel_err": max_rel_err,
229 }
230 if used_mode == "direct":
231 record["conv_from"] = e1.get("from_name")
232 record["conv_to"] = e1.get("to_name")
233 else:
234 record["conv_from"] = e1.get("from_name")
235 record["conv_to"] = e2.get("to_name")
236 record["intermediate"] = intermediate
237 record["conv_edge1_name"] = e1.get("name")
238 record["conv_edge2_name"] = e2.get("name")
239 record["details"] = details
240 failures.append(record)
242 match_ratio = matches / total if total else 0.0
243 min_ratio = 0.6
245 # Build CSV report for failures (if any)
246 failures_csv = None
247 if failures:
248 flattened = []
249 for f in failures:
250 base = {
251 k: f.get(k)
252 for k in (
253 "fb",
254 "tb",
255 "ref_name",
256 "ref_from",
257 "ref_to",
258 "reason",
259 "mode",
260 "conv_from",
261 "conv_to",
262 "intermediate",
263 "conv_edge1_name",
264 "conv_edge2_name",
265 "max_rel_err",
266 )
267 }
268 if f.get("details"):
269 for col, info in f["details"].items():
270 if col == "phases":
271 base.update(
272 {
273 "phases_ref": info.get("ref"),
274 "phases_conv": info.get("conv"),
275 "phases_ok": info.get("ok"),
276 }
277 )
278 else:
279 base.update(
280 {
281 f"{col}_status": info.get("status"),
282 f"{col}_ref": info.get("ref"),
283 f"{col}_conv": info.get("conv"),
284 f"{col}_rel_err": info.get("rel_err"),
285 }
286 )
287 flattened.append(base)
288 failures_df = pd.DataFrame(flattened)
289 failures_csv = tmp_path / "impedance_mismatch_report.csv"
290 failures_df.to_csv(failures_csv, index=False)
292 if match_ratio < min_ratio:
293 if failures:
294 failures_df_sorted = failures_df.sort_values(
295 by=["max_rel_err"], ascending=False, na_position="last"
296 )
297 topN = failures_df_sorted.head(10)
298 preview = topN.to_string(index=False, float_format="{:.6g}".format)
299 else:
300 preview = "(no detailed failures captured)"
301 msg_lines = [
302 f"Impedance + phase match ratio {match_ratio:.3f} below threshold {min_ratio:.3f}.",
303 f"Total reference branches: {total}, matched: {matches}, failures: {len(failures)}.",
304 "",
305 "Top mismatches (up to 10):",
306 preview,
307 "",
308 ]
309 if failures_csv:
310 msg_lines.append(f"Full failures CSV written to: {failures_csv}")
311 msg_lines.append(
312 "To reproduce full conversion run: python -m cim_converter.main OR pytest -k integration"
313 )
314 final_msg = "\n".join(msg_lines)
315 pytest.fail(final_msg)