Coverage for src/distopf/matrix_models/lindist_q_gen.py: 17%

116 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1from typing import Optional 

2from functools import cache 

3import numpy as np 

4import pandas as pd 

5import distopf as opf 

6from distopf.matrix_models.base import LinDistBase 

7from distopf.utils import get 

8 

9 

10class LinDistModelQGen(LinDistBase): 

11 """ 

12 LinDistFlow Model with reactive power control of generators. 

13 No variables for active power generation exist. This can make the 

14 model smaller and run faster if there are many generators and only 

15 reactive power control is required. 

16 

17 Parameters 

18 ---------- 

19 branch_data : pd.DataFrame 

20 DataFrame containing branch data including resistance and reactance values and limits. 

21 bus_data : pd.DataFrame 

22 DataFrame containing bus data such as loads, voltages, and limits. 

23 gen_data : pd.DataFrame 

24 DataFrame containing generator data. 

25 cap_data : pd.DataFrame 

26 DataFrame containing capacitor data. 

27 reg_data : pd.DataFrame 

28 DataFrame containing regulator data. 

29 

30 """ 

31 

32 def __init__( 

33 self, 

34 branch_data: Optional[pd.DataFrame] = None, 

35 bus_data: Optional[pd.DataFrame] = None, 

36 gen_data: Optional[pd.DataFrame] = None, 

37 cap_data: Optional[pd.DataFrame] = None, 

38 reg_data: Optional[pd.DataFrame] = None, 

39 ): 

40 super().__init__(branch_data, bus_data, gen_data, cap_data, reg_data) 

41 self.build() 

42 

43 def initialize_variable_index_pointers(self): 

44 self.x_maps, self.n_x = self._variable_tables(self.branch) 

45 self.v_map, self.n_x = self._add_device_variables(self.n_x, self.all_buses) 

46 self.qg_map, self.n_x = self._add_device_variables(self.n_x, self.gen_buses) 

47 self.qc_map, self.n_x = self._add_device_variables(self.n_x, self.cap_buses) 

48 self.vx_map, self.n_x = self._add_device_variables(self.n_x, self.reg_buses) 

49 

50 def add_generator_limits(self, x_lim_lower, x_lim_upper): 

51 for a in "abc": 

52 if not self.phase_exists(a): 

53 continue 

54 s_rated = self.gen[f"s{a}_max"] 

55 p_out = self.gen[f"p{a}"] 

56 q_max = ((s_rated**2) - (p_out**2)) ** (1 / 2) 

57 q_min = -q_max 

58 q_max_manual = self.gen.get(f"q{a}_max", np.ones_like(q_min) * 100e3) 

59 q_min_manual = self.gen.get(f"q{a}_min", np.ones_like(q_min) * -100e3) 

60 for j in self.gen_buses[a]: 

61 qg = self.idx("qg", j, a) 

62 # reactive power bounds 

63 x_lim_lower[qg] = max(q_min[j], q_min_manual[j]) 

64 x_lim_upper[qg] = min(q_max[j], q_max_manual[j]) 

65 return x_lim_lower, x_lim_upper 

66 

67 @cache 

68 def idx(self, var, node_j, phase): 

69 if var in self.x_maps[phase].columns: 

70 return self.branch_into_j(var, node_j, phase) 

71 if var in ["pjk"]: # indexes of all branch active power out of node j 

72 return self.branches_out_of_j("pij", node_j, phase) 

73 if var in ["qjk"]: # indexes of all branch reactive power out of node j 

74 return self.branches_out_of_j("qij", node_j, phase) 

75 if var in ["v"]: # active power generation at node 

76 return get(self.v_map[phase], node_j, []) 

77 if var in ["qg", "q_gen"]: # reactive power generation at node 

78 return get(self.qg_map[phase], node_j, []) 

79 if var in ["qc", "q_cap"]: # reactive power injection by capacitor 

80 return get(self.qc_map[phase], node_j, []) 

81 if var in ["vx"]: 

82 return self.vx_map[phase].get(node_j, []) 

83 ix = self.additional_variable_idx(var, node_j, phase) 

84 if ix is not None: 

85 return ix 

86 raise ValueError(f"Variable name, '{var}', not found.") 

87 

88 def add_power_flow_model(self, a_eq, b_eq, j, phase): 

89 pij = self.idx("pij", j, phase) 

90 qij = self.idx("qij", j, phase) 

91 pjk = self.idx("pjk", j, phase) 

92 qjk = self.idx("qjk", j, phase) 

93 qg = self.idx("qg", j, phase) 

94 qc = self.idx("q_cap", j, phase) 

95 vj = self.idx("v", j, phase) 

96 p_gen_nom = 0, 0 

97 if self.gen is not None: 

98 p_gen_nom = get(self.gen[f"p{phase}"], j, 0) 

99 p_load_nom, q_load_nom = 0, 0 

100 if self.bus.bus_type[j] == opf.PQ_BUS: 

101 p_load_nom = self.bus[f"pl_{phase}"][j] 

102 q_load_nom = self.bus[f"ql_{phase}"][j] 

103 # Set P equation variable coefficients in a_eq 

104 a_eq[pij, pij] = 1 

105 a_eq[pij, pjk] = -1 

106 # Set Q equation variable coefficients in a_eq 

107 a_eq[qij, qij] = 1 

108 a_eq[qij, qjk] = -1 

109 a_eq[qij, qg] = 1 

110 a_eq[qij, qc] = 1 

111 if self.bus.bus_type[j] != opf.PQ_FREE: 

112 # Set Load equation variable coefficients in a_eq 

113 a_eq[pij, vj] = -(self.bus.cvr_p[j] / 2) * p_load_nom 

114 b_eq[pij] = (1 - (self.bus.cvr_p[j] / 2)) * p_load_nom - p_gen_nom 

115 a_eq[qij, vj] = -(self.bus.cvr_q[j] / 2) * q_load_nom 

116 b_eq[qij] = (1 - (self.bus.cvr_q[j] / 2)) * q_load_nom 

117 return a_eq, b_eq 

118 

119 def add_generator_model(self, a_eq, b_eq, j, phase): 

120 return a_eq, b_eq 

121 

122 def add_load_model(self, a_eq, b_eq, j, phase): 

123 return a_eq, b_eq 

124 

125 def add_capacitor_model(self, a_eq, b_eq, j, phase): 

126 q_cap_nom = 0 

127 if self.cap is not None: 

128 q_cap_nom = get(self.cap[f"q{phase}"], j, 0) 

129 # equation indexes 

130 vj = self.idx("v", j, phase) 

131 qc = self.idx("q_cap", j, phase) 

132 a_eq[qc, qc] = 1 

133 a_eq[qc, vj] = -q_cap_nom 

134 return a_eq, b_eq 

135 

136 def create_inequality_constraints(self): 

137 return None, None 

138 

139 def get_p_gens(self, x): 

140 df = self.get_device_variables(x, self.qg_map) 

141 df.a = self.gen_data.pa.to_numpy() 

142 df.b = self.gen_data.pb.to_numpy() 

143 df.c = self.gen_data.pc.to_numpy() 

144 return df 

145 

146 def get_apparent_power_flows(self, x): 

147 s_df = pd.DataFrame( 

148 columns=["fb", "tb", "from_name", "to_name", "a", "b", "c"], 

149 index=range(2, self.nb + 1), 

150 ) 

151 s_df["a"] = s_df["a"].astype(complex) 

152 s_df["b"] = s_df["b"].astype(complex) 

153 s_df["c"] = s_df["c"].astype(complex) 

154 for ph in "abc": 

155 fb_idxs = self.x_maps[ph].bi.to_numpy() 

156 fb_names = self.bus.name[fb_idxs].to_numpy() 

157 tb_idxs = self.x_maps[ph].bj.to_numpy() 

158 tb_names = self.bus.name[tb_idxs].to_numpy() 

159 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, "fb"] = fb_idxs + 1 

160 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, "tb"] = tb_idxs + 1 

161 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, "from_name"] = fb_names 

162 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, "to_name"] = tb_names 

163 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, ph] = ( 

164 x[self.x_maps[ph].pij] + 1j * x[self.x_maps[ph].qij] 

165 ) 

166 return s_df