Coverage for src/distopf/matrix_models/multiperiod/solvers.py: 77%
52 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
1from time import perf_counter
2from typing import Optional, Callable
3import cvxpy as cp
4import numpy as np
5from scipy.optimize import OptimizeResult, linprog
6from scipy.sparse import csr_array
7from distopf.matrix_models.multiperiod import LinDistBaseMP
10def cvxpy_solve(
11 model: LinDistBaseMP,
12 obj_func: Callable,
13 **kwargs,
14) -> OptimizeResult:
15 """
16 Solve a convex optimization problem using cvxpy.
17 Parameters
18 ----------
19 model : LinDistModel, or LinDistModelP, or LinDistModelQ
20 obj_func : handle to the objective function
21 kwargs :
23 Returns
24 -------
25 result: scipy.optimize.OptimizeResult
27 """
28 m = model
29 tic = perf_counter()
30 solver = kwargs.get("solver", cp.CLARABEL)
31 x0 = kwargs.get("x0", None)
32 if x0 is None:
33 lin_res = lp_solve(m, np.zeros(m.n_x))
34 if not lin_res.success:
35 raise ValueError(lin_res.message)
36 x0 = lin_res.x.copy()
37 x = cp.Variable(shape=(m.n_x,), name="x", value=x0)
38 g = [csr_array(m.a_eq) @ x - m.b_eq.flatten() == 0]
39 g_inequality = []
40 if m.a_ub is not None and m.b_ub is not None:
41 if m.a_ub.shape[0] != 0 and m.a_ub.shape[1] != 0:
42 g_inequality = [m.a_ub @ x - m.b_ub <= 0]
44 lb = [x >= m.x_min]
45 ub = [x <= m.x_max]
46 expression = obj_func(m, x, **kwargs)
47 prob = cp.Problem(cp.Minimize(expression), g + g_inequality + ub + lb)
48 prob.solve(verbose=False, solver=solver)
50 x_res = x.value
51 result = OptimizeResult(
52 fun=prob.value,
53 success=(prob.status == "optimal"),
54 message=prob.status,
55 x=x_res,
56 nit=prob.solver_stats.num_iters,
57 runtime=perf_counter() - tic,
58 )
59 return result
62def lp_solve(model: LinDistBaseMP, c: Optional[np.ndarray] = None) -> OptimizeResult:
63 """
64 Solve a linear program using scipy.optimize.linprog and having the objective function:
65 Min c^T x
66 Parameters
67 ----------
68 model : LinDistModel
69 c : 1-D array
70 The coefficients of the linear objective function to be minimized.
71 Returns
72 -------
73 result : OptimizeResult
74 A :class:`scipy.optimize.OptimizeResult` consisting of the fields
75 below. Note that the return types of the fields may depend on whether
76 the optimization was successful, therefore it is recommended to check
77 `OptimizeResult.status` before relying on the other fields:
79 x : 1-D array
80 The values of the decision variables that minimizes the
81 objective function while satisfying the constraints.
82 fun : float
83 The optimal value of the objective function ``c @ x``.
84 slack : 1-D array
85 The (nominally positive) values of the slack variables,
86 ``b_ub - A_ub @ x``.
87 con : 1-D array
88 The (nominally zero) residuals of the equality constraints,
89 ``b_eq - A_eq @ x``.
90 success : bool
91 ``True`` when the algorithm succeeds in finding an optimal
92 solution.
93 status : int
94 An integer representing the exit status of the algorithm.
96 ``0`` : Optimization terminated successfully.
98 ``1`` : Iteration limit reached.
100 ``2`` : Problem appears to be infeasible.
102 ``3`` : Problem appears to be unbounded.
104 ``4`` : Numerical difficulties encountered.
106 nit : int
107 The total number of iterations performed in all phases.
108 message : str
109 A string descriptor of the exit status of the algorithm.
110 """
111 if isinstance(c, Callable): # type: ignore
112 c = c(model) # type: ignore
113 if c is None:
114 c = np.zeros(model.n_x)
115 tic = perf_counter()
116 res = linprog(
117 c,
118 A_eq=csr_array(model.a_eq),
119 b_eq=model.b_eq.flatten(),
120 A_ub=model.a_ub,
121 b_ub=model.b_ub,
122 bounds=model.bounds,
123 ) # type: ignore
124 if not res.success:
125 raise ValueError(res.message)
126 runtime = perf_counter() - tic
127 res["runtime"] = runtime
128 return res
131def pf(model) -> OptimizeResult:
132 c = np.zeros(model.n_x)
133 tic = perf_counter()
134 res = linprog(c, A_eq=csr_array(model.a_eq), b_eq=model.b_eq.flatten()) # type: ignore
135 if not res.success:
136 raise ValueError(res.message)
137 runtime = perf_counter() - tic
138 res["runtime"] = runtime
139 return res