Coverage for src/distopf/matrix_models/solvers.py: 37%
119 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
1from collections.abc import Callable
2from time import perf_counter
4import pyomo.environ as pe
5import cvxpy as cp
6import numpy as np
7from scipy.optimize import OptimizeResult, linprog
8from scipy.sparse import csr_array
9from distopf import LinDistModelCapMI
10from distopf.matrix_models.base import LinDistBase
13def cvxpy_solve(
14 model: LinDistBase,
15 obj_func: Callable,
16 **kwargs,
17) -> OptimizeResult:
18 """
19 Solve a convex optimization problem using cvxpy.
20 Parameters
21 ----------
22 model : LinDistModel, or LinDistModelP, or LinDistModelQ
23 obj_func : handle to the objective function
24 kwargs :
26 Returns
27 -------
28 result: scipy.optimize.OptimizeResult
30 """
31 m = model
32 tic = perf_counter()
33 solver = kwargs.get("solver", cp.CLARABEL)
34 x0 = kwargs.get("x0", None)
35 if x0 is None:
36 lin_res = lp_solve(m, np.zeros(m.n_x))
37 if not lin_res.success:
38 raise ValueError(lin_res.message)
39 x0 = lin_res.x.copy()
40 x = cp.Variable(shape=(m.n_x,), name="x", value=x0)
41 g = [m.a_eq @ x - m.b_eq.flatten() == 0]
42 # lb = [x[i] >= m.bounds[i][0] for i in range(m.n_x)]
43 # ub = [x[i] <= m.bounds[i][1] for i in range(m.n_x)]
44 lb = [x >= m.x_min]
45 ub = [x <= m.x_max]
46 g_inequality = []
47 if m.a_ub is not None and m.b_ub is not None:
48 if m.a_ub.shape[0] != 0 and m.a_ub.shape[1] != 0:
49 g_inequality = [m.a_ub @ x - m.b_ub <= 0]
50 expression = obj_func(m, x, **kwargs)
51 prob = cp.Problem(cp.Minimize(expression), g + g_inequality + ub + lb)
52 prob.solve(verbose=False, solver=solver)
54 x_res = x.value
55 result = OptimizeResult(
56 fun=prob.value,
57 success=(prob.status == "optimal"),
58 message=prob.status,
59 x=x_res,
60 nit=prob.solver_stats.num_iters,
61 runtime=perf_counter() - tic,
62 )
63 return result
66def cvxpy_mi_solve(
67 model: LinDistModelCapMI,
68 obj_func: Callable,
69 **kwargs,
70) -> OptimizeResult:
71 """
72 Solve a convex optimization problem using cvxpy.
73 Parameters
74 ----------
75 model : LinDistModel, or LinDistModelP, or LinDistModelQ
76 obj_func : handle to the objective function
77 kwargs :
79 Returns
80 -------
81 result: scipy.optimize.OptimizeResult
83 """
84 m = model
85 tic = perf_counter()
86 solver = kwargs.get("solver")
87 x0 = kwargs.get("x0", None)
88 if x0 is None:
89 lin_res = lp_solve(m, np.zeros(m.n_x))
90 if not lin_res.success:
91 raise ValueError(lin_res.message)
92 x0 = lin_res.x.copy()
93 x = cp.Variable(shape=(m.n_x,), name="x", value=x0)
94 n_u = len(m.cap_buses["a"]) + len(m.cap_buses["b"]) + len(m.cap_buses["c"])
95 u_c = cp.Variable(shape=(n_u,), name="u_c", value=np.ones(n_u), boolean=True)
96 u_idxs = np.r_[m.uc_map["a"], m.uc_map["b"], m.uc_map["c"]]
97 gu = [x[u_idxs] == u_c]
98 g_ineq = [csr_array(m.a_ineq) @ x - m.b_ineq.flatten() <= 0]
99 g = [csr_array(m.a_eq) @ x - m.b_eq.flatten() == 0]
100 lb = [x[i] >= m.bounds[i][0] for i in range(m.n_x)]
101 ub = [x[i] <= m.bounds[i][1] for i in range(m.n_x)]
103 error_percent = kwargs.get("error_percent", np.zeros(3))
104 target = kwargs.get("target", None)
105 expression = obj_func(m, x, target=target, error_percent=error_percent)
106 prob = cp.Problem(cp.Minimize(expression), g + ub + lb + gu + g_ineq)
107 prob.solve(verbose=False, solver=solver)
109 x_res = x.value
110 result = OptimizeResult(
111 fun=prob.value,
112 success=(prob.status == "optimal"),
113 message=prob.status,
114 x=x_res,
115 nit=prob.solver_stats.num_iters,
116 runtime=perf_counter() - tic,
117 )
118 return result
121def pf(model: LinDistBase) -> OptimizeResult:
122 c = np.zeros(model.n_x)
123 tic = perf_counter()
124 res = linprog(c, A_eq=csr_array(model.a_eq), b_eq=model.b_eq.flatten())
125 if not res.success:
126 raise ValueError(res.message)
127 runtime = perf_counter() - tic
128 res["runtime"] = runtime
129 return res
132def lp_solve(
133 model: LinDistBase,
134 c: np.ndarray | Callable = None,
135 **kwargs,
136) -> OptimizeResult:
137 """
138 Solve a linear program using scipy.optimize.linprog and having the objective function:
139 Min c^T x
140 Parameters
141 ----------
142 model : LinDistModel
143 c : 1-D array
144 The coefficients of the linear objective function to be minimized.
145 Returns
146 -------
147 result : OptimizeResult
148 A :class:`scipy.optimize.OptimizeResult` consisting of the fields
149 below. Note that the return types of the fields may depend on whether
150 the optimization was successful, therefore it is recommended to check
151 `OptimizeResult.status` before relying on the other fields:
153 x : 1-D array
154 The values of the decision variables that minimizes the
155 objective function while satisfying the constraints.
156 fun : float
157 The optimal value of the objective function ``c @ x``.
158 slack : 1-D array
159 The (nominally positive) values of the slack variables,
160 ``b_ub - A_ub @ x``.
161 con : 1-D array
162 The (nominally zero) residuals of the equality constraints,
163 ``b_eq - A_eq @ x``.
164 success : bool
165 ``True`` when the algorithm succeeds in finding an optimal
166 solution.
167 status : int
168 An integer representing the exit status of the algorithm.
170 ``0`` : Optimization terminated successfully.
172 ``1`` : Iteration limit reached.
174 ``2`` : Problem appears to be infeasible.
176 ``3`` : Problem appears to be unbounded.
178 ``4`` : Numerical difficulties encountered.
180 nit : int
181 The total number of iterations performed in all phases.
182 message : str
183 A string descriptor of the exit status of the algorithm.
184 """
185 if isinstance(c, Callable):
186 c = c(model)
187 if c is None:
188 c = np.zeros(model.n_x)
189 tic = perf_counter()
190 res = linprog(
191 c,
192 A_eq=csr_array(model.a_eq),
193 b_eq=model.b_eq.flatten(),
194 A_ub=model.a_ub,
195 b_ub=model.b_ub,
196 bounds=model.bounds,
197 )
198 if not res.success:
199 raise ValueError(res.message)
200 runtime = perf_counter() - tic
201 res["runtime"] = runtime
202 return res
205def pyomo_solve(
206 model: LinDistBase,
207 obj_func: Callable,
208 **kwargs,
209) -> OptimizeResult:
210 m = model
211 tic = perf_counter()
212 solver = kwargs.get("solver", "ipopt")
213 x0 = kwargs.get("x0", None)
214 if x0 is None:
215 lin_res = lp_solve(m, np.zeros(m.n_x))
216 if not lin_res.success:
217 raise ValueError(lin_res.message)
218 x0 = lin_res.x.copy()
220 cm = pe.ConcreteModel()
221 cm.n_xk = pe.RangeSet(0, model.n_x - 1)
222 cm.xk = pe.Var(cm.n_xk, initialize=x0)
223 cm.constraints = pe.ConstraintList()
224 for i in range(model.n_x):
225 cm.constraints.add(cm.xk[i] <= model.x_max[i])
226 cm.constraints.add(cm.xk[i] >= model.x_min[i])
228 def equality_rule(_cm, i):
229 if model.a_eq[[i], :].nnz > 0:
230 return model.b_eq[i] == sum(
231 _cm.xk[j] * model.a_eq[i, j]
232 for j in range(model.n_x)
233 if model.a_eq[i, j]
234 )
235 return pe.Constraint.Skip
237 def inequality_rule(_cm, i):
238 if model.a_ub[[i], :].nnz > 0:
239 return model.b_ub[i] >= sum(
240 _cm.xk[j] * model.a_ub[i, j]
241 for j in range(model.n_x)
242 if model.a_ub[i, j]
243 )
244 return pe.Constraint.Skip
246 cm.equality = pe.Constraint(cm.n_xk, rule=equality_rule)
247 if model.a_ub.shape[0] != 0:
248 cm.ineq_set = pe.RangeSet(0, model.a_ub.shape[0] - 1)
249 cm.inequality = pe.Constraint(cm.ineq_set, rule=inequality_rule)
250 cm.objective = pe.Objective(expr=obj_func(model, cm.xk, **kwargs))
251 opt = pe.SolverFactory(solver)
252 opt.solve(cm)
254 x_dict = cm.xk.extract_values()
255 x_res = np.zeros(len(x_dict))
256 for key, value in x_dict.items():
257 x_res[key] = value
259 result = OptimizeResult(
260 fun=float(pe.value(cm.objective)),
261 # success=(prob.status == "optimal"),
262 # message=prob.status,
263 x=x_res,
264 # nit=prob.solver_stats.num_iters,
265 runtime=perf_counter() - tic,
266 )
267 return result