Coverage for src/distopf/matrix_models/lindist_q_gen.py: 17%
116 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
1from typing import Optional
2from functools import cache
3import numpy as np
4import pandas as pd
5import distopf as opf
6from distopf.matrix_models.base import LinDistBase
7from distopf.utils import get
10class LinDistModelQGen(LinDistBase):
11 """
12 LinDistFlow Model with reactive power control of generators.
13 No variables for active power generation exist. This can make the
14 model smaller and run faster if there are many generators and only
15 reactive power control is required.
17 Parameters
18 ----------
19 branch_data : pd.DataFrame
20 DataFrame containing branch data including resistance and reactance values and limits.
21 bus_data : pd.DataFrame
22 DataFrame containing bus data such as loads, voltages, and limits.
23 gen_data : pd.DataFrame
24 DataFrame containing generator data.
25 cap_data : pd.DataFrame
26 DataFrame containing capacitor data.
27 reg_data : pd.DataFrame
28 DataFrame containing regulator data.
30 """
32 def __init__(
33 self,
34 branch_data: Optional[pd.DataFrame] = None,
35 bus_data: Optional[pd.DataFrame] = None,
36 gen_data: Optional[pd.DataFrame] = None,
37 cap_data: Optional[pd.DataFrame] = None,
38 reg_data: Optional[pd.DataFrame] = None,
39 ):
40 super().__init__(branch_data, bus_data, gen_data, cap_data, reg_data)
41 self.build()
43 def initialize_variable_index_pointers(self):
44 self.x_maps, self.n_x = self._variable_tables(self.branch)
45 self.v_map, self.n_x = self._add_device_variables(self.n_x, self.all_buses)
46 self.qg_map, self.n_x = self._add_device_variables(self.n_x, self.gen_buses)
47 self.qc_map, self.n_x = self._add_device_variables(self.n_x, self.cap_buses)
48 self.vx_map, self.n_x = self._add_device_variables(self.n_x, self.reg_buses)
50 def add_generator_limits(self, x_lim_lower, x_lim_upper):
51 for a in "abc":
52 if not self.phase_exists(a):
53 continue
54 s_rated = self.gen[f"s{a}_max"]
55 p_out = self.gen[f"p{a}"]
56 q_max = ((s_rated**2) - (p_out**2)) ** (1 / 2)
57 q_min = -q_max
58 q_max_manual = self.gen.get(f"q{a}_max", np.ones_like(q_min) * 100e3)
59 q_min_manual = self.gen.get(f"q{a}_min", np.ones_like(q_min) * -100e3)
60 for j in self.gen_buses[a]:
61 qg = self.idx("qg", j, a)
62 # reactive power bounds
63 x_lim_lower[qg] = max(q_min[j], q_min_manual[j])
64 x_lim_upper[qg] = min(q_max[j], q_max_manual[j])
65 return x_lim_lower, x_lim_upper
67 @cache
68 def idx(self, var, node_j, phase):
69 if var in self.x_maps[phase].columns:
70 return self.branch_into_j(var, node_j, phase)
71 if var in ["pjk"]: # indexes of all branch active power out of node j
72 return self.branches_out_of_j("pij", node_j, phase)
73 if var in ["qjk"]: # indexes of all branch reactive power out of node j
74 return self.branches_out_of_j("qij", node_j, phase)
75 if var in ["v"]: # active power generation at node
76 return get(self.v_map[phase], node_j, [])
77 if var in ["qg", "q_gen"]: # reactive power generation at node
78 return get(self.qg_map[phase], node_j, [])
79 if var in ["qc", "q_cap"]: # reactive power injection by capacitor
80 return get(self.qc_map[phase], node_j, [])
81 if var in ["vx"]:
82 return self.vx_map[phase].get(node_j, [])
83 ix = self.additional_variable_idx(var, node_j, phase)
84 if ix is not None:
85 return ix
86 raise ValueError(f"Variable name, '{var}', not found.")
88 def add_power_flow_model(self, a_eq, b_eq, j, phase):
89 pij = self.idx("pij", j, phase)
90 qij = self.idx("qij", j, phase)
91 pjk = self.idx("pjk", j, phase)
92 qjk = self.idx("qjk", j, phase)
93 qg = self.idx("qg", j, phase)
94 qc = self.idx("q_cap", j, phase)
95 vj = self.idx("v", j, phase)
96 p_gen_nom = 0, 0
97 if self.gen is not None:
98 p_gen_nom = get(self.gen[f"p{phase}"], j, 0)
99 p_load_nom, q_load_nom = 0, 0
100 if self.bus.bus_type[j] == opf.PQ_BUS:
101 p_load_nom = self.bus[f"pl_{phase}"][j]
102 q_load_nom = self.bus[f"ql_{phase}"][j]
103 # Set P equation variable coefficients in a_eq
104 a_eq[pij, pij] = 1
105 a_eq[pij, pjk] = -1
106 # Set Q equation variable coefficients in a_eq
107 a_eq[qij, qij] = 1
108 a_eq[qij, qjk] = -1
109 a_eq[qij, qg] = 1
110 a_eq[qij, qc] = 1
111 if self.bus.bus_type[j] != opf.PQ_FREE:
112 # Set Load equation variable coefficients in a_eq
113 a_eq[pij, vj] = -(self.bus.cvr_p[j] / 2) * p_load_nom
114 b_eq[pij] = (1 - (self.bus.cvr_p[j] / 2)) * p_load_nom - p_gen_nom
115 a_eq[qij, vj] = -(self.bus.cvr_q[j] / 2) * q_load_nom
116 b_eq[qij] = (1 - (self.bus.cvr_q[j] / 2)) * q_load_nom
117 return a_eq, b_eq
119 def add_generator_model(self, a_eq, b_eq, j, phase):
120 return a_eq, b_eq
122 def add_load_model(self, a_eq, b_eq, j, phase):
123 return a_eq, b_eq
125 def add_capacitor_model(self, a_eq, b_eq, j, phase):
126 q_cap_nom = 0
127 if self.cap is not None:
128 q_cap_nom = get(self.cap[f"q{phase}"], j, 0)
129 # equation indexes
130 vj = self.idx("v", j, phase)
131 qc = self.idx("q_cap", j, phase)
132 a_eq[qc, qc] = 1
133 a_eq[qc, vj] = -q_cap_nom
134 return a_eq, b_eq
136 def create_inequality_constraints(self):
137 return None, None
139 def get_p_gens(self, x):
140 df = self.get_device_variables(x, self.qg_map)
141 df.a = self.gen_data.pa.to_numpy()
142 df.b = self.gen_data.pb.to_numpy()
143 df.c = self.gen_data.pc.to_numpy()
144 return df
146 def get_apparent_power_flows(self, x):
147 s_df = pd.DataFrame(
148 columns=["fb", "tb", "from_name", "to_name", "a", "b", "c"],
149 index=range(2, self.nb + 1),
150 )
151 s_df["a"] = s_df["a"].astype(complex)
152 s_df["b"] = s_df["b"].astype(complex)
153 s_df["c"] = s_df["c"].astype(complex)
154 for ph in "abc":
155 fb_idxs = self.x_maps[ph].bi.to_numpy()
156 fb_names = self.bus.name[fb_idxs].to_numpy()
157 tb_idxs = self.x_maps[ph].bj.to_numpy()
158 tb_names = self.bus.name[tb_idxs].to_numpy()
159 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, "fb"] = fb_idxs + 1
160 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, "tb"] = tb_idxs + 1
161 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, "from_name"] = fb_names
162 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, "to_name"] = tb_names
163 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, ph] = (
164 x[self.x_maps[ph].pij] + 1j * x[self.x_maps[ph].qij]
165 )
166 return s_df