Coverage for src/distopf/matrix_models/lindist_capacitor_regulator_mi.py: 20%

84 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1from typing import Optional 

2from collections.abc import Callable 

3from time import perf_counter 

4import numpy as np 

5import pandas as pd 

6import cvxpy as cp 

7from scipy.sparse import csr_array, lil_array 

8from scipy.optimize import OptimizeResult 

9import distopf as opf 

10 

11 

12class LinDistModelCapacitorRegulatorMI(opf.LinDistModelCapMI): 

13 def __init__( 

14 self, 

15 branch_data: Optional[pd.DataFrame] = None, 

16 bus_data: Optional[pd.DataFrame] = None, 

17 gen_data: Optional[pd.DataFrame] = None, 

18 cap_data: Optional[pd.DataFrame] = None, 

19 reg_data: Optional[pd.DataFrame] = None, 

20 ): 

21 super().__init__( 

22 branch_data=branch_data, 

23 bus_data=bus_data, 

24 gen_data=gen_data, 

25 cap_data=cap_data, 

26 reg_data=reg_data, 

27 ) 

28 self.x0 = None 

29 self.xk: None | cp.Variable = None 

30 self.u_reg = None 

31 self.b_i = np.arange(0.9, 1.1, 0.00625) 

32 self.g_reg: list = [] 

33 

34 def add_regulator_model(self, a_eq, b_eq, j, a) -> tuple[lil_array, np.ndarray]: 

35 return a_eq, b_eq 

36 

37 def cvxpy_regulator_mi_constraints(self): 

38 n_u_reg = ( 

39 len(self.reg_buses["a"]) 

40 + len(self.reg_buses["b"]) 

41 + len(self.reg_buses["c"]) 

42 ) 

43 default_tap = np.zeros((max(n_u_reg, 1), 33)) 

44 default_tap[:, 16] = 1 

45 self.u_reg = cp.Variable( 

46 shape=(max(n_u_reg, 1), 33), name="u_reg", value=default_tap, boolean=True 

47 ) 

48 g_reg = [cp.sum(self.u_reg, axis=1) == 1] 

49 

50 big_m = 1e3 

51 self.b_i = np.arange(0.9, 1.1, 0.00625) 

52 i_reg = 0 

53 for j in self.reg.index: 

54 for a in "abc": 

55 if not self.phase_exists(a, j): 

56 continue 

57 

58 i = self.idx("bi", j, a)[0] 

59 vi = self.idx("v", i, a) 

60 vj = self.idx("v", j, a) 

61 for k in range(33): 

62 g_reg = g_reg + [ 

63 self.xk[vj] 

64 - self.b_i[k] ** 2 * self.xk[vi] 

65 - big_m * (1 - self.u_reg[i_reg, k]) 

66 <= 0 

67 ] 

68 g_reg = g_reg + [ 

69 self.xk[vj] 

70 - self.b_i[k] ** 2 * self.xk[vi] 

71 + big_m * (1 - self.u_reg[i_reg, k]) 

72 >= 0 

73 ] 

74 i_reg += 1 

75 return g_reg 

76 

77 def get_regulator_taps(self): 

78 reg_result = self.reg.copy() 

79 i_reg = 0 

80 for j in self.reg.index: 

81 for a in "abc": 

82 if not self.phase_exists(a, j): 

83 continue 

84 tap_index = int( 

85 np.where(np.round(self.u_reg.value[i_reg, :]).astype(bool))[0][0] 

86 ) 

87 tap = tap_index - 16 

88 ratio = self.b_i[tap_index] 

89 reg_result.loc[j, f"tap_{a}"] = int(tap) 

90 reg_result.loc[j, f"ratio_{a}"] = ratio 

91 i_reg += 1 

92 return reg_result 

93 

94 @classmethod 

95 def calculate_x0(cls, branch_data, bus_data, gen_data, cap_data, reg_data): 

96 bus_data = bus_data.copy() 

97 bus_data.loc[:, "v_min"] = 0.0 

98 bus_data.loc[:, "v_max"] = 2.0 

99 gen_data = gen_data.copy() 

100 gen_data.control_variable = opf.CONSTANT_PQ 

101 m0 = cls( 

102 branch_data=branch_data, 

103 bus_data=bus_data, 

104 gen_data=gen_data, 

105 cap_data=cap_data, 

106 reg_data=reg_data, 

107 ) 

108 result0 = opf.lp_solve(m0, np.zeros(m0.n_x)) 

109 x0 = result0.x 

110 return x0 

111 

112 def solve(self, obj_func: Callable, **kwargs) -> OptimizeResult: 

113 self.x0 = self.calculate_x0( 

114 self.branch_data, self.bus_data, self.gen_data, self.cap_data, self.reg_data 

115 ) 

116 self.xk = cp.Variable(shape=(self.n_x,), name="x", value=self.x0) 

117 self.g_reg = self.cvxpy_regulator_mi_constraints() 

118 tic = perf_counter() 

119 solver = kwargs.get("solver", cp.SCIP) 

120 

121 g = [csr_array(self.a_eq) @ self.xk - self.b_eq.flatten() == 0] 

122 g += [self.x_max >= self.xk, self.x_min <= self.xk] 

123 if self.a_ub is not None and self.b_ub is not None: 

124 if self.a_ub.shape[0] > 0: 

125 g += [self.a_ub @ self.xk - self.b_ub <= 0] 

126 error_percent = kwargs.get("error_percent", np.zeros(3)) 

127 target = kwargs.get("target", None) 

128 expression = obj_func(self, self.xk, target=target, error_percent=error_percent) 

129 prob = cp.Problem(cp.Minimize(expression), g + self.g_reg) 

130 prob.solve(solver=solver, verbose=False) 

131 

132 x_res = self.xk.value 

133 result = OptimizeResult( 

134 fun=prob.value, 

135 success=(prob.status == "optimal"), 

136 message=prob.status, 

137 x=x_res, 

138 nit=prob.solver_stats.num_iters, 

139 runtime=perf_counter() - tic, 

140 ) 

141 return result