Coverage for tests/cim_converter/integration/test_impedance_comparison.py: 82%

173 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1# tests/integration/test_impedance_comparison.py 

2from pathlib import Path 

3import numpy as np 

4import pandas as pd 

5import pytest 

6from distopf.cim_importer import CIMToCSVConverter 

7 

8 

9def _find_matching_row(conv_branch_df: pd.DataFrame, from_name: str, to_name: str): 

10 """Find a branch row in conv_branch_df that matches from/to names (either direction).""" 

11 mask = (conv_branch_df["from_name"] == from_name) & ( 

12 conv_branch_df["to_name"] == to_name 

13 ) 

14 if mask.any(): 

15 return conv_branch_df.loc[mask].iloc[0] 

16 mask2 = (conv_branch_df["from_name"] == to_name) & ( 

17 conv_branch_df["to_name"] == from_name 

18 ) 

19 if mask2.any(): 

20 return conv_branch_df.loc[mask2].iloc[0] 

21 return None 

22 

23 

24def _find_two_hop_candidate(conv_branch_df: pd.DataFrame, from_name: str, to_name: str): 

25 """ 

26 Try to find an intermediate node X such that conv has edges (from_name <-> X) and (X <-> to_name). 

27 Returns (edge1_row, edge2_row, intermediate_name) for the first candidate found, or None. 

28 """ 

29 nodes = set(conv_branch_df["from_name"].dropna().unique()).union( 

30 set(conv_branch_df["to_name"].dropna().unique()) 

31 ) 

32 for x in nodes: 

33 if x == from_name or x == to_name: 

34 continue 

35 e1 = _find_matching_row(conv_branch_df, from_name, x) 

36 if e1 is None: 

37 continue 

38 e2 = _find_matching_row(conv_branch_df, x, to_name) 

39 if e2 is None: 

40 continue 

41 return e1, e2, x 

42 return None 

43 

44 

45def _phase_set(s): 

46 if pd.isna(s): 

47 return set() 

48 if not isinstance(s, str): 

49 return set() 

50 return set(s) 

51 

52 

53@pytest.mark.integration 

54def test_branch_impedances_against_reference(tmp_path): 

55 """ 

56 Compare impedance columns in converted branch_data to reference cases/ieee13/branch_data.csv. 

57 This enhanced version attempts two-hop matching if direct branch isn't present (extra bus inserted). 

58 Also requires phases to match exactly (not subset). Writes a CSV report of failures to tmp_path. 

59 """ 

60 repo_root = Path(__file__).resolve().parents[3] 

61 ref_branch_path = ( 

62 repo_root / "tests" / "cim_converter" / "data" / "ieee13" / "branch_data.csv" 

63 ) 

64 

65 ref_bus_path = ( 

66 repo_root / "tests" / "cim_converter" / "data" / "ieee13" / "bus_data.csv" 

67 ) 

68 cim_path = repo_root / "tests" / "cim_converter" / "data" / "IEEE13.xml" 

69 

70 assert ref_branch_path.exists(), f"Reference branch file missing: {ref_branch_path}" 

71 assert ref_bus_path.exists(), f"Reference bus file missing: {ref_bus_path}" 

72 assert cim_path.exists(), f"CIM file missing: {cim_path}" 

73 

74 ref_branch_df = pd.read_csv(ref_branch_path) 

75 ref_bus_df = pd.read_csv(ref_bus_path) 

76 

77 # Map ref id -> name 

78 ref_id_to_name = {int(r["id"]): r["name"] for _, r in ref_bus_df.iterrows()} 

79 

80 out_dir = tmp_path / "csv_imp" 

81 conv = CIMToCSVConverter(cim_file=str(cim_path)) 

82 results = conv.convert(validate=False) 

83 conv.save(results, output_dir=str(out_dir)) 

84 conv_branch_df = results["branch_data"] 

85 

86 impedance_cols = ["raa", "rbb", "rcc", "xaa", "xbb", "xcc"] 

87 matches = 0 

88 total = 0 

89 failures = [] 

90 

91 for _, ref_row in ref_branch_df.iterrows(): 

92 total += 1 

93 fb = int(ref_row["fb"]) 

94 tb = int(ref_row["tb"]) 

95 from_name = ref_id_to_name.get(fb) 

96 to_name = ref_id_to_name.get(tb) 

97 row_meta = { 

98 "fb": fb, 

99 "tb": tb, 

100 "ref_name": ref_row.get("name", ""), 

101 "ref_from": from_name, 

102 "ref_to": to_name, 

103 } 

104 

105 if from_name is None or to_name is None: 

106 failures.append({**row_meta, "reason": "missing_ref_bus_name"}) 

107 continue 

108 

109 # Try direct match first 

110 match_row = _find_matching_row(conv_branch_df, from_name, to_name) 

111 used_mode = "direct" 

112 e1 = match_row 

113 e2 = None 

114 

115 if match_row is None: 

116 candidate = _find_two_hop_candidate(conv_branch_df, from_name, to_name) 

117 if candidate: 

118 e1, e2, intermediate = candidate 

119 used_mode = "two_hop" 

120 else: 

121 failures.append({**row_meta, "reason": "no_matching_conv_branch"}) 

122 continue 

123 

124 # Compile impedance sums for direct or two-hop (sum two segments) 

125 edges = [e1] if used_mode == "direct" else [e1, e2] 

126 summed = {} 

127 for col in impedance_cols: 

128 vals = [] 

129 for e in edges: 

130 v = e.get(col) if col in e.index else np.nan 

131 try: 

132 vals.append(float(v)) 

133 except Exception: 

134 vals.append(np.nan) 

135 if all(np.isnan(v) for v in vals): 

136 summed[col] = np.nan 

137 else: 

138 summed[col] = sum(0.0 if np.isnan(v) else v for v in vals) 

139 

140 # Phase handling (exact match required) 

141 ref_phases = _phase_set(ref_row.get("phases", "")) 

142 if used_mode == "direct": 

143 conv_phases = _phase_set(e1.get("phases")) if e1 is not None else set() 

144 phase_ok = ref_phases == conv_phases 

145 else: 

146 phases_e1 = _phase_set(e1.get("phases")) 

147 phases_e2 = _phase_set(e2.get("phases")) 

148 conv_phases_union = phases_e1.union(phases_e2) 

149 phase_ok = ref_phases == conv_phases_union 

150 

151 # Compare impedances 

152 impedance_ok = True 

153 details = {} 

154 max_rel_err = 0.0 

155 for col in impedance_cols: 

156 ref_val = ref_row.get(col, np.nan) 

157 conv_val = summed.get(col, np.nan) 

158 try: 

159 ref_num = float(ref_val) 

160 except Exception: 

161 ref_num = np.nan 

162 try: 

163 conv_num = float(conv_val) 

164 except Exception: 

165 conv_num = np.nan 

166 if np.isnan(ref_num) and np.isnan(conv_num): 

167 details[col] = { 

168 "status": "both_nan", 

169 "ref": ref_num, 

170 "conv": conv_num, 

171 "rel_err": 0.0, 

172 } 

173 continue 

174 if np.isnan(ref_num) or np.isnan(conv_num): 

175 impedance_ok = False 

176 details[col] = { 

177 "status": "nan_mismatch", 

178 "ref": ref_num, 

179 "conv": conv_num, 

180 "rel_err": np.inf, 

181 } 

182 max_rel_err = np.inf 

183 continue 

184 denom = max(abs(ref_num), 1e-12) 

185 rel_err = abs((conv_num - ref_num) / denom) 

186 if rel_err > max_rel_err: 

187 max_rel_err = rel_err 

188 # Updated tolerance per your request: use absolute tolerance 1e-3 as well as rtol 1e-3 

189 if not np.isclose(ref_num, conv_num, rtol=1e-3, atol=1e-3): 

190 impedance_ok = False 

191 details[col] = { 

192 "status": "diff", 

193 "ref": ref_num, 

194 "conv": conv_num, 

195 "rel_err": rel_err, 

196 } 

197 else: 

198 details[col] = { 

199 "status": "ok", 

200 "ref": ref_num, 

201 "conv": conv_num, 

202 "rel_err": rel_err, 

203 } 

204 

205 # Also record phase details 

206 if used_mode == "direct": 

207 conv_phase_string = "".join(sorted(conv_phases)) 

208 else: 

209 conv_phase_string = "".join(sorted(conv_phases_union)) 

210 details["phases"] = { 

211 "ref": "".join(sorted(ref_phases)) if ref_phases else "", 

212 "conv": conv_phase_string, 

213 "ok": phase_ok, 

214 } 

215 

216 if impedance_ok and phase_ok: 

217 matches += 1 

218 else: 

219 # Reason: prefer phase mismatch as primary if it fails 

220 if not phase_ok: 

221 reason = "phase_mismatch" 

222 else: 

223 reason = "impedance_mismatch" 

224 record = { 

225 **row_meta, 

226 "reason": reason, 

227 "mode": used_mode, 

228 "max_rel_err": max_rel_err, 

229 } 

230 if used_mode == "direct": 

231 record["conv_from"] = e1.get("from_name") 

232 record["conv_to"] = e1.get("to_name") 

233 else: 

234 record["conv_from"] = e1.get("from_name") 

235 record["conv_to"] = e2.get("to_name") 

236 record["intermediate"] = intermediate 

237 record["conv_edge1_name"] = e1.get("name") 

238 record["conv_edge2_name"] = e2.get("name") 

239 record["details"] = details 

240 failures.append(record) 

241 

242 match_ratio = matches / total if total else 0.0 

243 min_ratio = 0.6 

244 

245 # Build CSV report for failures (if any) 

246 failures_csv = None 

247 if failures: 

248 flattened = [] 

249 for f in failures: 

250 base = { 

251 k: f.get(k) 

252 for k in ( 

253 "fb", 

254 "tb", 

255 "ref_name", 

256 "ref_from", 

257 "ref_to", 

258 "reason", 

259 "mode", 

260 "conv_from", 

261 "conv_to", 

262 "intermediate", 

263 "conv_edge1_name", 

264 "conv_edge2_name", 

265 "max_rel_err", 

266 ) 

267 } 

268 if f.get("details"): 

269 for col, info in f["details"].items(): 

270 if col == "phases": 

271 base.update( 

272 { 

273 "phases_ref": info.get("ref"), 

274 "phases_conv": info.get("conv"), 

275 "phases_ok": info.get("ok"), 

276 } 

277 ) 

278 else: 

279 base.update( 

280 { 

281 f"{col}_status": info.get("status"), 

282 f"{col}_ref": info.get("ref"), 

283 f"{col}_conv": info.get("conv"), 

284 f"{col}_rel_err": info.get("rel_err"), 

285 } 

286 ) 

287 flattened.append(base) 

288 failures_df = pd.DataFrame(flattened) 

289 failures_csv = tmp_path / "impedance_mismatch_report.csv" 

290 failures_df.to_csv(failures_csv, index=False) 

291 

292 if match_ratio < min_ratio: 

293 if failures: 

294 failures_df_sorted = failures_df.sort_values( 

295 by=["max_rel_err"], ascending=False, na_position="last" 

296 ) 

297 topN = failures_df_sorted.head(10) 

298 preview = topN.to_string(index=False, float_format="{:.6g}".format) 

299 else: 

300 preview = "(no detailed failures captured)" 

301 msg_lines = [ 

302 f"Impedance + phase match ratio {match_ratio:.3f} below threshold {min_ratio:.3f}.", 

303 f"Total reference branches: {total}, matched: {matches}, failures: {len(failures)}.", 

304 "", 

305 "Top mismatches (up to 10):", 

306 preview, 

307 "", 

308 ] 

309 if failures_csv: 

310 msg_lines.append(f"Full failures CSV written to: {failures_csv}") 

311 msg_lines.append( 

312 "To reproduce full conversion run: python -m cim_converter.main OR pytest -k integration" 

313 ) 

314 final_msg = "\n".join(msg_lines) 

315 pytest.fail(final_msg)