Coverage for src/distopf/pyomo_models/pyomo_lindist.py: 94%

107 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-10-08 11:30 -0700

1import pyomo.environ as pyo 

2from typing import Optional, Tuple, Dict, List 

3import pandas as pd 

4from dataclasses import dataclass 

5import distopf as opf 

6from numpy import sqrt, ones_like 

7from distopf.utils.utils import ( 

8 handle_branch_input, 

9 handle_bus_input, 

10 handle_gen_input, 

11 handle_cap_input, 

12 handle_reg_input, 

13) 

14 

15 

16@dataclass 

17class Case: 

18 branch_data: Optional[pd.DataFrame] = None 

19 bus_data: Optional[pd.DataFrame] = None 

20 gen_data: Optional[pd.DataFrame] = None 

21 cap_data: Optional[pd.DataFrame] = None 

22 reg_data: Optional[pd.DataFrame] = None 

23 

24 

25def _create_phase_tuples(df: pd.DataFrame, id_col: str = "id") -> List[Tuple[str, str]]: 

26 """Create (id, phase) tuples from dataframe with phases column""" 

27 result = [] 

28 for _, row in df.iterrows(): 

29 result.extend([(row[id_col], phase) for phase in row.phases]) 

30 return result 

31 

32 

33def _create_sets( 

34 m: pyo.ConcreteModel, 

35 bus: pd.DataFrame, 

36 branch: pd.DataFrame, 

37 gen: pd.DataFrame, 

38 cap: pd.DataFrame, 

39) -> None: 

40 """Create all Pyomo sets""" 

41 m.bus_set = pyo.Set(initialize=bus.id.tolist()) 

42 m.phase_set = pyo.Set(initialize=["a", "b", "c"]) 

43 m.swing_bus_set = pyo.Set(initialize=bus[bus.bus_type == "SWING"].id.tolist()) 

44 m.branch_set = pyo.Set(initialize=bus[bus.bus_type != "SWING"].id.tolist()) 

45 m.phase_pair_set = pyo.Set(initialize=["aa", "ab", "ac", "bb", "bc", "cc"]) 

46 

47 m.bus_phase_set = pyo.Set(initialize=_create_phase_tuples(bus), dimen=2) 

48 m.branch_phase_set = pyo.Set(initialize=_create_phase_tuples(branch, "tb"), dimen=2) 

49 m.gen_phase_set = pyo.Set( 

50 initialize=_create_phase_tuples(gen) if len(gen) > 0 else [], dimen=2 

51 ) 

52 m.cap_phase_set = pyo.Set( 

53 initialize=_create_phase_tuples(cap) if len(cap) > 0 else [], dimen=2 

54 ) 

55 

56 

57def _create_parameters(m: pyo.ConcreteModel, branch: pd.DataFrame) -> None: 

58 """Create resistance and reactance parameters""" 

59 r_data, x_data = {}, {} 

60 

61 for _, row in branch.iterrows(): 

62 for phase_pair in m.phase_pair_set: 

63 r_col, x_col = f"r{phase_pair}", f"x{phase_pair}" 

64 

65 if r_col in branch.columns and x_col in branch.columns: 

66 r_data[(phase_pair, row.tb)] = row[r_col] 

67 x_data[(phase_pair, row.tb)] = row[x_col] 

68 

69 m.r = pyo.Param(m.phase_pair_set, m.branch_set, initialize=r_data, default=0.0) 

70 m.x = pyo.Param(m.phase_pair_set, m.branch_set, initialize=x_data, default=0.0) 

71 

72 

73def _create_variables(m: pyo.ConcreteModel) -> None: 

74 """Create all variables without bounds""" 

75 m.v = pyo.Var(m.bus_phase_set) # Voltage magnitude squared 

76 m.p_flow = pyo.Var(m.branch_phase_set) 

77 m.q_flow = pyo.Var(m.branch_phase_set) 

78 m.p_gen = pyo.Var(m.gen_phase_set) 

79 m.q_gen = pyo.Var(m.gen_phase_set) 

80 m.q_cap = pyo.Var(m.cap_phase_set) 

81 

82 

83def add_voltage_bounds(m: pyo.ConcreteModel, bus: pd.DataFrame) -> None: 

84 """Add voltage bounds (for voltage magnitude squared)""" 

85 for bus_id, phase in m.bus_phase_set: 

86 bus_row = bus[bus.id == bus_id].iloc[0] 

87 v_min = getattr(bus_row, "v_min", 0.95) 

88 v_max = getattr(bus_row, "v_max", 1.05) 

89 

90 # Apply bounds to voltage squared 

91 m.v[bus_id, phase].setlb(v_min**2) 

92 m.v[bus_id, phase].setub(v_max**2) 

93 

94 

95def add_generator_bounds(m: pyo.ConcreteModel, gen: pd.DataFrame) -> None: 

96 """Add generator bounds following the original base.py logic""" 

97 if len(gen) == 0: 

98 return 

99 

100 for phase in "abc": 

101 # Get generators that have this phase 

102 gen_phase_buses = [bus_id for bus_id, p in m.gen_phase_set if p == phase] 

103 if not gen_phase_buses: 

104 continue 

105 

106 # Create lookup dictionaries for generator data by bus_id 

107 gen_lookup = {} 

108 for _, row in gen.iterrows(): 

109 gen_lookup[row.id] = row 

110 

111 # Get phase-specific arrays for generators with this phase 

112 s_rated_dict = {} 

113 p_out_dict = {} 

114 q_max_manual_dict = {} 

115 q_min_manual_dict = {} 

116 

117 for bus_id in gen_phase_buses: 

118 if bus_id in gen_lookup: 

119 row = gen_lookup[bus_id] 

120 s_rated_dict[bus_id] = getattr(row, f"s{phase}_max", 0) 

121 p_out_dict[bus_id] = getattr(row, f"p{phase}", 0) 

122 q_max_manual_dict[bus_id] = getattr(row, f"q{phase}_max", 100e3) 

123 q_min_manual_dict[bus_id] = getattr(row, f"q{phase}_min", -100e3) 

124 

125 for bus_id in gen_phase_buses: 

126 if bus_id not in gen_lookup: 

127 continue 

128 

129 gen_row = gen_lookup[bus_id] 

130 mode = getattr(gen_row, "control_variable", "") 

131 

132 s_rated = s_rated_dict[bus_id] 

133 p_out = p_out_dict[bus_id] 

134 q_max_manual = q_max_manual_dict[bus_id] 

135 q_min_manual = q_min_manual_dict[bus_id] 

136 

137 # Active power bounds 

138 m.p_gen[bus_id, phase].setlb(0) 

139 m.p_gen[bus_id, phase].setub(p_out) 

140 

141 # Reactive power bounds 

142 if mode == opf.CONSTANT_P: # mode == "Q" 

143 # Use capability curve limits 

144 q_max_cap = sqrt(max(0, s_rated**2 - p_out**2)) 

145 q_min_cap = -q_max_cap 

146 q_lower = max(q_min_cap, q_min_manual) 

147 q_upper = min(q_max_cap, q_max_manual) 

148 else: # mode is "P", "PQ", or "" 

149 # Use apparent power limits 

150 q_lower = max(-s_rated, q_min_manual) 

151 q_upper = min(s_rated, q_max_manual) 

152 

153 m.q_gen[bus_id, phase].setlb(q_lower) 

154 m.q_gen[bus_id, phase].setub(q_upper) 

155 

156 

157def create_lindist_model(case: Case) -> pyo.ConcreteModel: 

158 """ 

159 Factory function to create a Pyomo ConcreteModel for linear distribution system optimization. 

160 

161 Parameters 

162 ---------- 

163 case : Case 

164 Dataclass containing network data frames 

165 

166 Returns 

167 ------- 

168 pyo.ConcreteModel 

169 Configured Pyomo model with sets, variables, and parameters 

170 """ 

171 # Load and validate data frames 

172 branch = handle_branch_input(case.branch_data) 

173 bus = handle_bus_input(case.bus_data) 

174 gen = handle_gen_input(case.gen_data) 

175 cap = handle_cap_input(case.cap_data) 

176 reg = handle_reg_input(case.reg_data) 

177 

178 m = pyo.ConcreteModel() 

179 _create_sets(m, bus, branch, gen, cap) 

180 _create_parameters(m, branch) 

181 _create_variables(m) 

182 

183 add_voltage_bounds(m, bus) 

184 add_generator_bounds(m, gen) 

185 

186 return m