Coverage for src/distopf/matrix_models/lindist_capacitor_regulator_mi.py: 20%
84 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
1from typing import Optional
2from collections.abc import Callable
3from time import perf_counter
4import numpy as np
5import pandas as pd
6import cvxpy as cp
7from scipy.sparse import csr_array, lil_array
8from scipy.optimize import OptimizeResult
9import distopf as opf
12class LinDistModelCapacitorRegulatorMI(opf.LinDistModelCapMI):
13 def __init__(
14 self,
15 branch_data: Optional[pd.DataFrame] = None,
16 bus_data: Optional[pd.DataFrame] = None,
17 gen_data: Optional[pd.DataFrame] = None,
18 cap_data: Optional[pd.DataFrame] = None,
19 reg_data: Optional[pd.DataFrame] = None,
20 ):
21 super().__init__(
22 branch_data=branch_data,
23 bus_data=bus_data,
24 gen_data=gen_data,
25 cap_data=cap_data,
26 reg_data=reg_data,
27 )
28 self.x0 = None
29 self.xk: None | cp.Variable = None
30 self.u_reg = None
31 self.b_i = np.arange(0.9, 1.1, 0.00625)
32 self.g_reg: list = []
34 def add_regulator_model(self, a_eq, b_eq, j, a) -> tuple[lil_array, np.ndarray]:
35 return a_eq, b_eq
37 def cvxpy_regulator_mi_constraints(self):
38 n_u_reg = (
39 len(self.reg_buses["a"])
40 + len(self.reg_buses["b"])
41 + len(self.reg_buses["c"])
42 )
43 default_tap = np.zeros((max(n_u_reg, 1), 33))
44 default_tap[:, 16] = 1
45 self.u_reg = cp.Variable(
46 shape=(max(n_u_reg, 1), 33), name="u_reg", value=default_tap, boolean=True
47 )
48 g_reg = [cp.sum(self.u_reg, axis=1) == 1]
50 big_m = 1e3
51 self.b_i = np.arange(0.9, 1.1, 0.00625)
52 i_reg = 0
53 for j in self.reg.index:
54 for a in "abc":
55 if not self.phase_exists(a, j):
56 continue
58 i = self.idx("bi", j, a)[0]
59 vi = self.idx("v", i, a)
60 vj = self.idx("v", j, a)
61 for k in range(33):
62 g_reg = g_reg + [
63 self.xk[vj]
64 - self.b_i[k] ** 2 * self.xk[vi]
65 - big_m * (1 - self.u_reg[i_reg, k])
66 <= 0
67 ]
68 g_reg = g_reg + [
69 self.xk[vj]
70 - self.b_i[k] ** 2 * self.xk[vi]
71 + big_m * (1 - self.u_reg[i_reg, k])
72 >= 0
73 ]
74 i_reg += 1
75 return g_reg
77 def get_regulator_taps(self):
78 reg_result = self.reg.copy()
79 i_reg = 0
80 for j in self.reg.index:
81 for a in "abc":
82 if not self.phase_exists(a, j):
83 continue
84 tap_index = int(
85 np.where(np.round(self.u_reg.value[i_reg, :]).astype(bool))[0][0]
86 )
87 tap = tap_index - 16
88 ratio = self.b_i[tap_index]
89 reg_result.loc[j, f"tap_{a}"] = int(tap)
90 reg_result.loc[j, f"ratio_{a}"] = ratio
91 i_reg += 1
92 return reg_result
94 @classmethod
95 def calculate_x0(cls, branch_data, bus_data, gen_data, cap_data, reg_data):
96 bus_data = bus_data.copy()
97 bus_data.loc[:, "v_min"] = 0.0
98 bus_data.loc[:, "v_max"] = 2.0
99 gen_data = gen_data.copy()
100 gen_data.control_variable = opf.CONSTANT_PQ
101 m0 = cls(
102 branch_data=branch_data,
103 bus_data=bus_data,
104 gen_data=gen_data,
105 cap_data=cap_data,
106 reg_data=reg_data,
107 )
108 result0 = opf.lp_solve(m0, np.zeros(m0.n_x))
109 x0 = result0.x
110 return x0
112 def solve(self, obj_func: Callable, **kwargs) -> OptimizeResult:
113 self.x0 = self.calculate_x0(
114 self.branch_data, self.bus_data, self.gen_data, self.cap_data, self.reg_data
115 )
116 self.xk = cp.Variable(shape=(self.n_x,), name="x", value=self.x0)
117 self.g_reg = self.cvxpy_regulator_mi_constraints()
118 tic = perf_counter()
119 solver = kwargs.get("solver", cp.SCIP)
121 g = [csr_array(self.a_eq) @ self.xk - self.b_eq.flatten() == 0]
122 g += [self.x_max >= self.xk, self.x_min <= self.xk]
123 if self.a_ub is not None and self.b_ub is not None:
124 if self.a_ub.shape[0] > 0:
125 g += [self.a_ub @ self.xk - self.b_ub <= 0]
126 error_percent = kwargs.get("error_percent", np.zeros(3))
127 target = kwargs.get("target", None)
128 expression = obj_func(self, self.xk, target=target, error_percent=error_percent)
129 prob = cp.Problem(cp.Minimize(expression), g + self.g_reg)
130 prob.solve(solver=solver, verbose=False)
132 x_res = self.xk.value
133 result = OptimizeResult(
134 fun=prob.value,
135 success=(prob.status == "optimal"),
136 message=prob.status,
137 x=x_res,
138 nit=prob.solver_stats.num_iters,
139 runtime=perf_counter() - tic,
140 )
141 return result