Coverage for src/distopf/matrix_models/multiperiod/solvers.py: 77%

52 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1from time import perf_counter 

2from typing import Optional, Callable 

3import cvxpy as cp 

4import numpy as np 

5from scipy.optimize import OptimizeResult, linprog 

6from scipy.sparse import csr_array 

7from distopf.matrix_models.multiperiod import LinDistBaseMP 

8 

9 

10def cvxpy_solve( 

11 model: LinDistBaseMP, 

12 obj_func: Callable, 

13 **kwargs, 

14) -> OptimizeResult: 

15 """ 

16 Solve a convex optimization problem using cvxpy. 

17 Parameters 

18 ---------- 

19 model : LinDistModel, or LinDistModelP, or LinDistModelQ 

20 obj_func : handle to the objective function 

21 kwargs : 

22 

23 Returns 

24 ------- 

25 result: scipy.optimize.OptimizeResult 

26 

27 """ 

28 m = model 

29 tic = perf_counter() 

30 solver = kwargs.get("solver", cp.CLARABEL) 

31 x0 = kwargs.get("x0", None) 

32 if x0 is None: 

33 lin_res = lp_solve(m, np.zeros(m.n_x)) 

34 if not lin_res.success: 

35 raise ValueError(lin_res.message) 

36 x0 = lin_res.x.copy() 

37 x = cp.Variable(shape=(m.n_x,), name="x", value=x0) 

38 g = [csr_array(m.a_eq) @ x - m.b_eq.flatten() == 0] 

39 g_inequality = [] 

40 if m.a_ub is not None and m.b_ub is not None: 

41 if m.a_ub.shape[0] != 0 and m.a_ub.shape[1] != 0: 

42 g_inequality = [m.a_ub @ x - m.b_ub <= 0] 

43 

44 lb = [x >= m.x_min] 

45 ub = [x <= m.x_max] 

46 expression = obj_func(m, x, **kwargs) 

47 prob = cp.Problem(cp.Minimize(expression), g + g_inequality + ub + lb) 

48 prob.solve(verbose=False, solver=solver) 

49 

50 x_res = x.value 

51 result = OptimizeResult( 

52 fun=prob.value, 

53 success=(prob.status == "optimal"), 

54 message=prob.status, 

55 x=x_res, 

56 nit=prob.solver_stats.num_iters, 

57 runtime=perf_counter() - tic, 

58 ) 

59 return result 

60 

61 

62def lp_solve(model: LinDistBaseMP, c: Optional[np.ndarray] = None) -> OptimizeResult: 

63 """ 

64 Solve a linear program using scipy.optimize.linprog and having the objective function: 

65 Min c^T x 

66 Parameters 

67 ---------- 

68 model : LinDistModel 

69 c : 1-D array 

70 The coefficients of the linear objective function to be minimized. 

71 Returns 

72 ------- 

73 result : OptimizeResult 

74 A :class:`scipy.optimize.OptimizeResult` consisting of the fields 

75 below. Note that the return types of the fields may depend on whether 

76 the optimization was successful, therefore it is recommended to check 

77 `OptimizeResult.status` before relying on the other fields: 

78 

79 x : 1-D array 

80 The values of the decision variables that minimizes the 

81 objective function while satisfying the constraints. 

82 fun : float 

83 The optimal value of the objective function ``c @ x``. 

84 slack : 1-D array 

85 The (nominally positive) values of the slack variables, 

86 ``b_ub - A_ub @ x``. 

87 con : 1-D array 

88 The (nominally zero) residuals of the equality constraints, 

89 ``b_eq - A_eq @ x``. 

90 success : bool 

91 ``True`` when the algorithm succeeds in finding an optimal 

92 solution. 

93 status : int 

94 An integer representing the exit status of the algorithm. 

95 

96 ``0`` : Optimization terminated successfully. 

97 

98 ``1`` : Iteration limit reached. 

99 

100 ``2`` : Problem appears to be infeasible. 

101 

102 ``3`` : Problem appears to be unbounded. 

103 

104 ``4`` : Numerical difficulties encountered. 

105 

106 nit : int 

107 The total number of iterations performed in all phases. 

108 message : str 

109 A string descriptor of the exit status of the algorithm. 

110 """ 

111 if isinstance(c, Callable): # type: ignore 

112 c = c(model) # type: ignore 

113 if c is None: 

114 c = np.zeros(model.n_x) 

115 tic = perf_counter() 

116 res = linprog( 

117 c, 

118 A_eq=csr_array(model.a_eq), 

119 b_eq=model.b_eq.flatten(), 

120 A_ub=model.a_ub, 

121 b_ub=model.b_ub, 

122 bounds=model.bounds, 

123 ) # type: ignore 

124 if not res.success: 

125 raise ValueError(res.message) 

126 runtime = perf_counter() - tic 

127 res["runtime"] = runtime 

128 return res 

129 

130 

131def pf(model) -> OptimizeResult: 

132 c = np.zeros(model.n_x) 

133 tic = perf_counter() 

134 res = linprog(c, A_eq=csr_array(model.a_eq), b_eq=model.b_eq.flatten()) # type: ignore 

135 if not res.success: 

136 raise ValueError(res.message) 

137 runtime = perf_counter() - tic 

138 res["runtime"] = runtime 

139 return res