Coverage for tests/cim_converter/integration/test_full_conversion.py: 98%

87 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1# tests/integration/test_full_conversion.py 

2from pathlib import Path 

3import pandas as pd 

4import pytest 

5from distopf.cim_importer import CIMToCSVConverter 

6 

7 

8@pytest.mark.integration 

9def test_full_conversion_writes_expected_files_and_columns(tmp_path): 

10 """ 

11 Run the converter on the repository's CIM and assert expected CSV files are produced 

12 and that required columns exist. 

13 """ 

14 repo_root = Path(__file__).resolve().parents[3] 

15 cim_path = repo_root / "tests" / "cim_converter" / "data" / "IEEE13.xml" 

16 assert cim_path.exists(), f"Expected CIM file at {cim_path}" 

17 

18 out_dir = tmp_path / "csv_out" 

19 conv = CIMToCSVConverter(cim_file=str(cim_path)) 

20 results = conv.convert(validate=True) 

21 conv.save(results, output_dir=out_dir) 

22 

23 # Basic returned structure 

24 assert isinstance(results, dict) 

25 assert "branch_data" in results and "bus_data" in results 

26 

27 # Required files 

28 bus_file = out_dir / "bus_data.csv" 

29 branch_file = out_dir / "branch_data.csv" 

30 assert bus_file.exists(), "bus_data.csv not written" 

31 assert branch_file.exists(), "branch_data.csv not written" 

32 

33 bus_df = pd.read_csv(bus_file) 

34 branch_df = pd.read_csv(branch_file) 

35 

36 # Check required columns exist (use converter helpers to be resilient to ordering) 

37 bus_cols = conv._get_bus_columns() 

38 branch_cols = conv._get_branch_columns() 

39 # The output may include extra columns; require at least the standard ones 

40 assert set(bus_cols).issubset(set(bus_df.columns)) 

41 assert set(branch_cols).issubset(set(branch_df.columns)) 

42 

43 # IDs should be present and integer-like 

44 assert "id" in bus_df.columns 

45 # no missing ids 

46 assert bus_df["id"].notna().all() 

47 # ids should be convertible to ints and should form a contiguous sequence starting at 1 

48 ids = bus_df["id"].astype(int).tolist() 

49 sorted_ids = sorted(ids) 

50 assert sorted_ids[0] == 1 

51 assert sorted_ids == list(range(1, len(sorted_ids) + 1)) 

52 

53 # Branch fb/tb should map to valid bus ids (if present) 

54 if {"fb", "tb"}.issubset(branch_df.columns): 

55 max_id = max(sorted_ids) 

56 # allow branches without mapping (NaN), but if present must be in range 

57 for col in ("fb", "tb"): 

58 if branch_df[col].notna().any(): 

59 vals = branch_df[col].dropna().astype(int) 

60 assert vals.min() >= 1 and vals.max() <= max_id 

61 

62 

63@pytest.mark.integration 

64def test_branch_and_bus_phase_strings_and_basic_invariants(tmp_path): 

65 """ 

66 Ensure phase strings are sensible and that numeric fields are non-negative 

67 where applicable. 

68 """ 

69 repo_root = Path(__file__).resolve().parents[3] 

70 cim_path = repo_root / "tests" / "cim_converter" / "data" / "IEEE13.xml" 

71 out_dir = tmp_path / "csv_out2" 

72 conv = CIMToCSVConverter(cim_file=str(cim_path)) 

73 results = conv.convert(validate=False) 

74 conv.save(results, output_dir=str(out_dir)) 

75 branch_df = results["branch_data"] 

76 bus_df = results["bus_data"] 

77 

78 # phases contain only letters a/b/c (or combinations) or are empty/null 

79 def valid_phase_str(s): 

80 if pd.isna(s): 

81 return True 

82 if not isinstance(s, str): 

83 return False 

84 # allow '', 'a', 'ab', 'abc', 'ac', etc. 

85 return all(ch in "abc" for ch in s) 

86 

87 assert branch_df["phases"].apply(valid_phase_str).all() 

88 assert bus_df["phases"].apply(valid_phase_str).all() 

89 

90 # basic numeric invariants: v_ln_base and z_base (if present) are non-negative 

91 for col in ("v_ln_base", "z_base"): 

92 if col in branch_df.columns: 

93 # ignore NaNs but check non-negative where present 

94 if branch_df[col].notna().any(): 

95 assert (branch_df.loc[branch_df[col].notna(), col] >= 0).all() 

96 

97 

98@pytest.mark.integration 

99def test_regulator_and_capacitor_output_consistency(tmp_path): 

100 """ 

101 If reg_data.csv or cap_data.csv are produced, ensure they are readable and contain 

102 expected keys and numeric tap/ratio values (when present). 

103 """ 

104 repo_root = Path(__file__).resolve().parents[3] 

105 cim_path = repo_root / "tests" / "cim_converter" / "data" / "IEEE13.xml" 

106 out_dir = tmp_path / "csv_out3" 

107 conv = CIMToCSVConverter(cim_file=str(cim_path)) 

108 results = conv.convert(validate=False) 

109 conv.save(results, output_dir=str(out_dir)) 

110 reg_file = out_dir / "reg_data.csv" 

111 cap_file = out_dir / "cap_data.csv" 

112 

113 if reg_file.exists(): 

114 reg_df = pd.read_csv(reg_file) 

115 # expected reg columns (subset) 

116 for required in ("fb", "tb", "ratio_a", "tap_a", "phases"): 

117 assert required in reg_df.columns 

118 # tap/ratio numeric where not null 

119 numeric_cols = [ 

120 c 

121 for c in ("tap_a", "tap_b", "tap_c", "ratio_a", "ratio_b", "ratio_c") 

122 if c in reg_df.columns 

123 ] 

124 if numeric_cols and not reg_df.empty: 

125 # any present numeric column values should be finite numbers or NaN 

126 import numpy as np 

127 

128 for col in numeric_cols: 

129 vals = reg_df[col].dropna().astype(float) 

130 if not vals.empty: 

131 assert np.isfinite(vals).all() 

132 

133 if cap_file.exists(): 

134 cap_df = pd.read_csv(cap_file) 

135 for required in ("id", "name", "phases"): 

136 assert required in cap_df.columns 

137 # qa/qb/qc numeric and non-negative (capacitors stored as reactive support) 

138 for qcol in ("qa", "qb", "qc"): 

139 if qcol in cap_df.columns and cap_df[qcol].notna().any(): 

140 assert (cap_df.loc[cap_df[qcol].notna(), qcol].astype(float) >= 0).all()