Coverage for src/distopf/matrix_models/solvers.py: 37%

119 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1from collections.abc import Callable 

2from time import perf_counter 

3 

4import pyomo.environ as pe 

5import cvxpy as cp 

6import numpy as np 

7from scipy.optimize import OptimizeResult, linprog 

8from scipy.sparse import csr_array 

9from distopf import LinDistModelCapMI 

10from distopf.matrix_models.base import LinDistBase 

11 

12 

13def cvxpy_solve( 

14 model: LinDistBase, 

15 obj_func: Callable, 

16 **kwargs, 

17) -> OptimizeResult: 

18 """ 

19 Solve a convex optimization problem using cvxpy. 

20 Parameters 

21 ---------- 

22 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

23 obj_func : handle to the objective function 

24 kwargs : 

25 

26 Returns 

27 ------- 

28 result: scipy.optimize.OptimizeResult 

29 

30 """ 

31 m = model 

32 tic = perf_counter() 

33 solver = kwargs.get("solver", cp.CLARABEL) 

34 x0 = kwargs.get("x0", None) 

35 if x0 is None: 

36 lin_res = lp_solve(m, np.zeros(m.n_x)) 

37 if not lin_res.success: 

38 raise ValueError(lin_res.message) 

39 x0 = lin_res.x.copy() 

40 x = cp.Variable(shape=(m.n_x,), name="x", value=x0) 

41 g = [m.a_eq @ x - m.b_eq.flatten() == 0] 

42 # lb = [x[i] >= m.bounds[i][0] for i in range(m.n_x)] 

43 # ub = [x[i] <= m.bounds[i][1] for i in range(m.n_x)] 

44 lb = [x >= m.x_min] 

45 ub = [x <= m.x_max] 

46 g_inequality = [] 

47 if m.a_ub is not None and m.b_ub is not None: 

48 if m.a_ub.shape[0] != 0 and m.a_ub.shape[1] != 0: 

49 g_inequality = [m.a_ub @ x - m.b_ub <= 0] 

50 expression = obj_func(m, x, **kwargs) 

51 prob = cp.Problem(cp.Minimize(expression), g + g_inequality + ub + lb) 

52 prob.solve(verbose=False, solver=solver) 

53 

54 x_res = x.value 

55 result = OptimizeResult( 

56 fun=prob.value, 

57 success=(prob.status == "optimal"), 

58 message=prob.status, 

59 x=x_res, 

60 nit=prob.solver_stats.num_iters, 

61 runtime=perf_counter() - tic, 

62 ) 

63 return result 

64 

65 

66def cvxpy_mi_solve( 

67 model: LinDistModelCapMI, 

68 obj_func: Callable, 

69 **kwargs, 

70) -> OptimizeResult: 

71 """ 

72 Solve a convex optimization problem using cvxpy. 

73 Parameters 

74 ---------- 

75 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

76 obj_func : handle to the objective function 

77 kwargs : 

78 

79 Returns 

80 ------- 

81 result: scipy.optimize.OptimizeResult 

82 

83 """ 

84 m = model 

85 tic = perf_counter() 

86 solver = kwargs.get("solver") 

87 x0 = kwargs.get("x0", None) 

88 if x0 is None: 

89 lin_res = lp_solve(m, np.zeros(m.n_x)) 

90 if not lin_res.success: 

91 raise ValueError(lin_res.message) 

92 x0 = lin_res.x.copy() 

93 x = cp.Variable(shape=(m.n_x,), name="x", value=x0) 

94 n_u = len(m.cap_buses["a"]) + len(m.cap_buses["b"]) + len(m.cap_buses["c"]) 

95 u_c = cp.Variable(shape=(n_u,), name="u_c", value=np.ones(n_u), boolean=True) 

96 u_idxs = np.r_[m.uc_map["a"], m.uc_map["b"], m.uc_map["c"]] 

97 gu = [x[u_idxs] == u_c] 

98 g_ineq = [csr_array(m.a_ineq) @ x - m.b_ineq.flatten() <= 0] 

99 g = [csr_array(m.a_eq) @ x - m.b_eq.flatten() == 0] 

100 lb = [x[i] >= m.bounds[i][0] for i in range(m.n_x)] 

101 ub = [x[i] <= m.bounds[i][1] for i in range(m.n_x)] 

102 

103 error_percent = kwargs.get("error_percent", np.zeros(3)) 

104 target = kwargs.get("target", None) 

105 expression = obj_func(m, x, target=target, error_percent=error_percent) 

106 prob = cp.Problem(cp.Minimize(expression), g + ub + lb + gu + g_ineq) 

107 prob.solve(verbose=False, solver=solver) 

108 

109 x_res = x.value 

110 result = OptimizeResult( 

111 fun=prob.value, 

112 success=(prob.status == "optimal"), 

113 message=prob.status, 

114 x=x_res, 

115 nit=prob.solver_stats.num_iters, 

116 runtime=perf_counter() - tic, 

117 ) 

118 return result 

119 

120 

121def pf(model: LinDistBase) -> OptimizeResult: 

122 c = np.zeros(model.n_x) 

123 tic = perf_counter() 

124 res = linprog(c, A_eq=csr_array(model.a_eq), b_eq=model.b_eq.flatten()) 

125 if not res.success: 

126 raise ValueError(res.message) 

127 runtime = perf_counter() - tic 

128 res["runtime"] = runtime 

129 return res 

130 

131 

132def lp_solve( 

133 model: LinDistBase, 

134 c: np.ndarray | Callable = None, 

135 **kwargs, 

136) -> OptimizeResult: 

137 """ 

138 Solve a linear program using scipy.optimize.linprog and having the objective function: 

139 Min c^T x 

140 Parameters 

141 ---------- 

142 model : LinDistModel 

143 c : 1-D array 

144 The coefficients of the linear objective function to be minimized. 

145 Returns 

146 ------- 

147 result : OptimizeResult 

148 A :class:`scipy.optimize.OptimizeResult` consisting of the fields 

149 below. Note that the return types of the fields may depend on whether 

150 the optimization was successful, therefore it is recommended to check 

151 `OptimizeResult.status` before relying on the other fields: 

152 

153 x : 1-D array 

154 The values of the decision variables that minimizes the 

155 objective function while satisfying the constraints. 

156 fun : float 

157 The optimal value of the objective function ``c @ x``. 

158 slack : 1-D array 

159 The (nominally positive) values of the slack variables, 

160 ``b_ub - A_ub @ x``. 

161 con : 1-D array 

162 The (nominally zero) residuals of the equality constraints, 

163 ``b_eq - A_eq @ x``. 

164 success : bool 

165 ``True`` when the algorithm succeeds in finding an optimal 

166 solution. 

167 status : int 

168 An integer representing the exit status of the algorithm. 

169 

170 ``0`` : Optimization terminated successfully. 

171 

172 ``1`` : Iteration limit reached. 

173 

174 ``2`` : Problem appears to be infeasible. 

175 

176 ``3`` : Problem appears to be unbounded. 

177 

178 ``4`` : Numerical difficulties encountered. 

179 

180 nit : int 

181 The total number of iterations performed in all phases. 

182 message : str 

183 A string descriptor of the exit status of the algorithm. 

184 """ 

185 if isinstance(c, Callable): 

186 c = c(model) 

187 if c is None: 

188 c = np.zeros(model.n_x) 

189 tic = perf_counter() 

190 res = linprog( 

191 c, 

192 A_eq=csr_array(model.a_eq), 

193 b_eq=model.b_eq.flatten(), 

194 A_ub=model.a_ub, 

195 b_ub=model.b_ub, 

196 bounds=model.bounds, 

197 ) 

198 if not res.success: 

199 raise ValueError(res.message) 

200 runtime = perf_counter() - tic 

201 res["runtime"] = runtime 

202 return res 

203 

204 

205def pyomo_solve( 

206 model: LinDistBase, 

207 obj_func: Callable, 

208 **kwargs, 

209) -> OptimizeResult: 

210 m = model 

211 tic = perf_counter() 

212 solver = kwargs.get("solver", "ipopt") 

213 x0 = kwargs.get("x0", None) 

214 if x0 is None: 

215 lin_res = lp_solve(m, np.zeros(m.n_x)) 

216 if not lin_res.success: 

217 raise ValueError(lin_res.message) 

218 x0 = lin_res.x.copy() 

219 

220 cm = pe.ConcreteModel() 

221 cm.n_xk = pe.RangeSet(0, model.n_x - 1) 

222 cm.xk = pe.Var(cm.n_xk, initialize=x0) 

223 cm.constraints = pe.ConstraintList() 

224 for i in range(model.n_x): 

225 cm.constraints.add(cm.xk[i] <= model.x_max[i]) 

226 cm.constraints.add(cm.xk[i] >= model.x_min[i]) 

227 

228 def equality_rule(_cm, i): 

229 if model.a_eq[[i], :].nnz > 0: 

230 return model.b_eq[i] == sum( 

231 _cm.xk[j] * model.a_eq[i, j] 

232 for j in range(model.n_x) 

233 if model.a_eq[i, j] 

234 ) 

235 return pe.Constraint.Skip 

236 

237 def inequality_rule(_cm, i): 

238 if model.a_ub[[i], :].nnz > 0: 

239 return model.b_ub[i] >= sum( 

240 _cm.xk[j] * model.a_ub[i, j] 

241 for j in range(model.n_x) 

242 if model.a_ub[i, j] 

243 ) 

244 return pe.Constraint.Skip 

245 

246 cm.equality = pe.Constraint(cm.n_xk, rule=equality_rule) 

247 if model.a_ub.shape[0] != 0: 

248 cm.ineq_set = pe.RangeSet(0, model.a_ub.shape[0] - 1) 

249 cm.inequality = pe.Constraint(cm.ineq_set, rule=inequality_rule) 

250 cm.objective = pe.Objective(expr=obj_func(model, cm.xk, **kwargs)) 

251 opt = pe.SolverFactory(solver) 

252 opt.solve(cm) 

253 

254 x_dict = cm.xk.extract_values() 

255 x_res = np.zeros(len(x_dict)) 

256 for key, value in x_dict.items(): 

257 x_res[key] = value 

258 

259 result = OptimizeResult( 

260 fun=float(pe.value(cm.objective)), 

261 # success=(prob.status == "optimal"), 

262 # message=prob.status, 

263 x=x_res, 

264 # nit=prob.solver_stats.num_iters, 

265 runtime=perf_counter() - tic, 

266 ) 

267 return result