Coverage for tests/cim_converter/integration/test_full_conversion.py: 98%
87 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
1# tests/integration/test_full_conversion.py
2from pathlib import Path
3import pandas as pd
4import pytest
5from distopf.cim_importer import CIMToCSVConverter
8@pytest.mark.integration
9def test_full_conversion_writes_expected_files_and_columns(tmp_path):
10 """
11 Run the converter on the repository's CIM and assert expected CSV files are produced
12 and that required columns exist.
13 """
14 repo_root = Path(__file__).resolve().parents[3]
15 cim_path = repo_root / "tests" / "cim_converter" / "data" / "IEEE13.xml"
16 assert cim_path.exists(), f"Expected CIM file at {cim_path}"
18 out_dir = tmp_path / "csv_out"
19 conv = CIMToCSVConverter(cim_file=str(cim_path))
20 results = conv.convert(validate=True)
21 conv.save(results, output_dir=out_dir)
23 # Basic returned structure
24 assert isinstance(results, dict)
25 assert "branch_data" in results and "bus_data" in results
27 # Required files
28 bus_file = out_dir / "bus_data.csv"
29 branch_file = out_dir / "branch_data.csv"
30 assert bus_file.exists(), "bus_data.csv not written"
31 assert branch_file.exists(), "branch_data.csv not written"
33 bus_df = pd.read_csv(bus_file)
34 branch_df = pd.read_csv(branch_file)
36 # Check required columns exist (use converter helpers to be resilient to ordering)
37 bus_cols = conv._get_bus_columns()
38 branch_cols = conv._get_branch_columns()
39 # The output may include extra columns; require at least the standard ones
40 assert set(bus_cols).issubset(set(bus_df.columns))
41 assert set(branch_cols).issubset(set(branch_df.columns))
43 # IDs should be present and integer-like
44 assert "id" in bus_df.columns
45 # no missing ids
46 assert bus_df["id"].notna().all()
47 # ids should be convertible to ints and should form a contiguous sequence starting at 1
48 ids = bus_df["id"].astype(int).tolist()
49 sorted_ids = sorted(ids)
50 assert sorted_ids[0] == 1
51 assert sorted_ids == list(range(1, len(sorted_ids) + 1))
53 # Branch fb/tb should map to valid bus ids (if present)
54 if {"fb", "tb"}.issubset(branch_df.columns):
55 max_id = max(sorted_ids)
56 # allow branches without mapping (NaN), but if present must be in range
57 for col in ("fb", "tb"):
58 if branch_df[col].notna().any():
59 vals = branch_df[col].dropna().astype(int)
60 assert vals.min() >= 1 and vals.max() <= max_id
63@pytest.mark.integration
64def test_branch_and_bus_phase_strings_and_basic_invariants(tmp_path):
65 """
66 Ensure phase strings are sensible and that numeric fields are non-negative
67 where applicable.
68 """
69 repo_root = Path(__file__).resolve().parents[3]
70 cim_path = repo_root / "tests" / "cim_converter" / "data" / "IEEE13.xml"
71 out_dir = tmp_path / "csv_out2"
72 conv = CIMToCSVConverter(cim_file=str(cim_path))
73 results = conv.convert(validate=False)
74 conv.save(results, output_dir=str(out_dir))
75 branch_df = results["branch_data"]
76 bus_df = results["bus_data"]
78 # phases contain only letters a/b/c (or combinations) or are empty/null
79 def valid_phase_str(s):
80 if pd.isna(s):
81 return True
82 if not isinstance(s, str):
83 return False
84 # allow '', 'a', 'ab', 'abc', 'ac', etc.
85 return all(ch in "abc" for ch in s)
87 assert branch_df["phases"].apply(valid_phase_str).all()
88 assert bus_df["phases"].apply(valid_phase_str).all()
90 # basic numeric invariants: v_ln_base and z_base (if present) are non-negative
91 for col in ("v_ln_base", "z_base"):
92 if col in branch_df.columns:
93 # ignore NaNs but check non-negative where present
94 if branch_df[col].notna().any():
95 assert (branch_df.loc[branch_df[col].notna(), col] >= 0).all()
98@pytest.mark.integration
99def test_regulator_and_capacitor_output_consistency(tmp_path):
100 """
101 If reg_data.csv or cap_data.csv are produced, ensure they are readable and contain
102 expected keys and numeric tap/ratio values (when present).
103 """
104 repo_root = Path(__file__).resolve().parents[3]
105 cim_path = repo_root / "tests" / "cim_converter" / "data" / "IEEE13.xml"
106 out_dir = tmp_path / "csv_out3"
107 conv = CIMToCSVConverter(cim_file=str(cim_path))
108 results = conv.convert(validate=False)
109 conv.save(results, output_dir=str(out_dir))
110 reg_file = out_dir / "reg_data.csv"
111 cap_file = out_dir / "cap_data.csv"
113 if reg_file.exists():
114 reg_df = pd.read_csv(reg_file)
115 # expected reg columns (subset)
116 for required in ("fb", "tb", "ratio_a", "tap_a", "phases"):
117 assert required in reg_df.columns
118 # tap/ratio numeric where not null
119 numeric_cols = [
120 c
121 for c in ("tap_a", "tap_b", "tap_c", "ratio_a", "ratio_b", "ratio_c")
122 if c in reg_df.columns
123 ]
124 if numeric_cols and not reg_df.empty:
125 # any present numeric column values should be finite numbers or NaN
126 import numpy as np
128 for col in numeric_cols:
129 vals = reg_df[col].dropna().astype(float)
130 if not vals.empty:
131 assert np.isfinite(vals).all()
133 if cap_file.exists():
134 cap_df = pd.read_csv(cap_file)
135 for required in ("id", "name", "phases"):
136 assert required in cap_df.columns
137 # qa/qb/qc numeric and non-negative (capacitors stored as reactive support)
138 for qcol in ("qa", "qb", "qc"):
139 if qcol in cap_df.columns and cap_df[qcol].notna().any():
140 assert (cap_df.loc[cap_df[qcol].notna(), qcol].astype(float) >= 0).all()