Coverage for src/distopf/utils.py: 75%

83 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1import pandas as pd 

2from typing import Optional 

3 

4 

5def get(s: pd.Series, i, default=None): 

6 """ 

7 Get value at index i from a Series. Return default if it does not exist. 

8 Parameters 

9 ---------- 

10 s : pd.Series 

11 i : index or key for eries 

12 default : value to return if it fails 

13 

14 Returns 

15 ------- 

16 value: value at index i or default if it doesn't exist. 

17 """ 

18 try: 

19 return s.loc[i] 

20 except (KeyError, ValueError, IndexError): 

21 return default 

22 

23 

24def handle_gen_input(gen_data: Optional[pd.DataFrame]) -> pd.DataFrame: 

25 if gen_data is None: 

26 return pd.DataFrame( 

27 columns=[ 

28 "id", 

29 "name", 

30 "pa", 

31 "pb", 

32 "pc", 

33 "qa", 

34 "qb", 

35 "qc", 

36 "sa_max", 

37 "sb_max", 

38 "sc_max", 

39 "phases", 

40 "qa_max", 

41 "qb_max", 

42 "qc_max", 

43 "qa_min", 

44 "qb_min", 

45 "qc_min", 

46 "control_variable", 

47 ] 

48 ) 

49 if "control_variable" not in gen_data.columns: 

50 gen_data["control_variable"] = "" 

51 gen = gen_data.sort_values(by="id", ignore_index=True) 

52 gen.index = gen.id.to_numpy() - 1 

53 return gen 

54 

55 

56def handle_cap_input(cap_data: Optional[pd.DataFrame]) -> pd.DataFrame: 

57 if cap_data is None: 

58 return pd.DataFrame( 

59 columns=[ 

60 "id", 

61 "name", 

62 "qa", 

63 "qb", 

64 "qc", 

65 "phases", 

66 ] 

67 ) 

68 cap = cap_data.sort_values(by="id", ignore_index=True) 

69 cap.index = cap.id.to_numpy() - 1 

70 return cap 

71 

72 

73def handle_reg_input(reg_data: Optional[pd.DataFrame]) -> pd.DataFrame: 

74 if reg_data is None: 

75 return pd.DataFrame( 

76 columns=[ 

77 "fb", 

78 "tb", 

79 "phases", 

80 "tap_a", 

81 "tap_b", 

82 "tap_c", 

83 "ratio_a", 

84 "ratio_b", 

85 "ratio_c", 

86 ] 

87 ) 

88 reg = reg_data.sort_values(by="tb", ignore_index=True) 

89 reg.index = reg.tb.to_numpy() - 1 

90 for ph in "abc": 

91 if f"tap_{ph}" in reg.columns and f"ratio_{ph}" not in reg.columns: 

92 reg[f"ratio_{ph}"] = 1 + 0.00625 * reg[f"tap_{ph}"] 

93 elif f"ratio_{ph}" in reg.columns and f"tap_{ph}" not in reg.columns: 

94 reg[f"tap_{ph}"] = (reg[f"ratio_{ph}"] - 1) / 0.00625 

95 elif f"ratio_{ph}" in reg.columns and f"tap_{ph}" in reg.columns: 

96 reg[f"ratio_{ph}"] = 1 + 0.00625 * reg[f"tap_{ph}"] 

97 # check consistency 

98 # if any(abs(reg[f"ratio_{ph}"]) - (1 + 0.00625 * reg[f"tap_{ph}"]) > 1e-6): 

99 # raise ValueError( 

100 # f"Regulator taps and ratio are inconsistent on phase {ph}!" 

101 # ) 

102 return reg 

103 

104 

105def handle_branch_input(branch_data: Optional[pd.DataFrame]) -> pd.DataFrame: 

106 if branch_data is None: 

107 raise ValueError("Branch data must be provided.") 

108 branch = branch_data.sort_values(by="tb", ignore_index=True) 

109 branch = pd.DataFrame(branch.loc[branch.status != "OPEN", :]) 

110 return branch 

111 

112 

113def handle_bus_input(bus_data: Optional[pd.DataFrame]) -> pd.DataFrame: 

114 if bus_data is None: 

115 raise ValueError("Bus data must be provided.") 

116 type_dict = { 

117 "id": int, 

118 "name": str, 

119 "pl_a": float, 

120 "ql_a": float, 

121 "pl_b": float, 

122 "ql_b": float, 

123 "pl_c": float, 

124 "ql_c": float, 

125 "bus_type": str, 

126 "v_a": float, 

127 "v_b": float, 

128 "v_c": float, 

129 "v_ln_base": float, 

130 "s_base": float, 

131 "v_min": float, 

132 "v_max": float, 

133 "cvr_p": float, 

134 "cvr_q": float, 

135 "phases": str, 

136 "has_gen": bool, 

137 "has_load": bool, 

138 "has_cap": bool, 

139 "latitude": float, 

140 "longitude": float, 

141 "load_shape": str, 

142 } 

143 if "load_shape" not in bus_data.columns: 

144 bus_data["load_shape"] = "default" 

145 for c, t in type_dict.items(): 

146 if c not in bus_data.columns: 

147 bus_data[c] = t() 

148 bus = bus_data.astype(type_dict) 

149 bus = bus_data.sort_values(by="id", ignore_index=True) 

150 bus.index = bus.id.to_numpy() - 1 

151 return bus 

152 

153 

154def handle_schedules_input(loadshape_data: Optional[pd.DataFrame]) -> pd.DataFrame: 

155 if loadshape_data is None: 

156 return pd.DataFrame( 

157 columns=[ 

158 "time", 

159 "M", 

160 ] 

161 ) 

162 loadshape = loadshape_data.sort_values(by="time", ignore_index=True) 

163 loadshape.index = loadshape.time.to_numpy() 

164 return loadshape 

165 

166 

167def handle_pv_loadshape_input( 

168 pv_loadshape_data: Optional[pd.DataFrame], 

169) -> pd.DataFrame: 

170 if pv_loadshape_data is None: 

171 return pd.DataFrame( 

172 columns=[ 

173 "time", 

174 "PV", 

175 ] 

176 ) 

177 pv_loadshape = pv_loadshape_data.sort_values(by="time", ignore_index=True) 

178 pv_loadshape.index = pv_loadshape.time.to_numpy() 

179 return pv_loadshape 

180 

181 

182def handle_bat_input_depricated(bat_data: Optional[pd.DataFrame]) -> pd.DataFrame: 

183 if bat_data is None: 

184 return pd.DataFrame( 

185 columns=[ 

186 "id", 

187 "name", 

188 "nc_a", 

189 "nc_b", 

190 "nc_c", 

191 "nd_a", 

192 "nd_b", 

193 "nd_c", 

194 "hmax_a", 

195 "hmax_b", 

196 "hmax_c", 

197 "Pb_max_a", 

198 "Pb_max_b", 

199 "Pb_max_c", 

200 "bmin_a", 

201 "bmin_b", 

202 "bmin_c", 

203 "bmax_a", 

204 "bmax_b", 

205 "bmax_c", 

206 "b0_a", 

207 "b0_b", 

208 "b0_c", 

209 "phases", 

210 ] 

211 ) 

212 if "b0_a" not in bat_data.columns: 

213 bat_data["b0_a"] = bat_data.bmin_a 

214 if "b0_b" not in bat_data.columns: 

215 bat_data["b0_b"] = bat_data.bmin_a 

216 if "b0_c" not in bat_data.columns: 

217 bat_data["b0_c"] = bat_data.bmin_a 

218 bat = bat_data.sort_values(by="id", ignore_index=True) 

219 bat.index = bat.id.to_numpy() - 1 

220 return bat 

221 

222 

223def handle_bat_input(bat_data: Optional[pd.DataFrame]) -> pd.DataFrame: 

224 if bat_data is None: 

225 return pd.DataFrame( 

226 columns=[ 

227 "id", 

228 "name", 

229 "s_max", 

230 "phases", 

231 "energy_capacity", 

232 "min_soc", 

233 "max_soc", 

234 "start_soc", 

235 "charge_efficiency", 

236 "discharge_efficiency", 

237 "control_variable", 

238 ] 

239 ) 

240 bat = bat_data.sort_values(by="id", ignore_index=True) 

241 bat.index = bat.id.to_numpy() - 1 

242 return bat