Coverage for src/distopf/matrix_models/base.py: 73%
458 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
1from functools import cache
2from typing import Optional, Tuple
3import networkx as nx
4import numpy as np
5import pandas as pd
6from numpy import sqrt, zeros
7from scipy.sparse import csr_array, lil_array # type: ignore
8import distopf as opf
9from distopf.utils import (
10 handle_branch_input,
11 handle_bus_input,
12 handle_gen_input,
13 handle_cap_input,
14 handle_reg_input,
15 get,
16)
19class BaseModel:
20 """
21 LinDistFlow Model base class.
23 Parameters
24 ----------
25 branch_data : pd.DataFrame
26 DataFrame containing branch data including resistance and reactance values and limits.
27 bus_data : pd.DataFrame
28 DataFrame containing bus data such as loads, voltages, and limits.
29 gen_data : pd.DataFrame
30 DataFrame containing generator data.
31 cap_data : pd.DataFrame
32 DataFrame containing capacitor data.
33 reg_data : pd.DataFrame
34 DataFrame containing regulator data.
36 """
38 def __init__(
39 self,
40 branch_data: Optional[pd.DataFrame] = None,
41 bus_data: Optional[pd.DataFrame] = None,
42 gen_data: Optional[pd.DataFrame] = None,
43 cap_data: Optional[pd.DataFrame] = None,
44 reg_data: Optional[pd.DataFrame] = None,
45 ):
46 # ~~~~~~~~~~~~~~~~~~~~ Load Data Frames ~~~~~~~~~~~~~~~~~~~~
47 self.branch = handle_branch_input(branch_data)
48 self.bus = handle_bus_input(bus_data)
49 self.gen = handle_gen_input(gen_data)
50 self.cap = handle_cap_input(cap_data)
51 self.reg = handle_reg_input(reg_data)
52 self.branch_data = self.branch
53 self.bus_data = self.bus
54 self.gen_data = self.gen
55 self.cap_data = self.cap
56 self.reg_data = self.reg
58 # ~~~~~~~~~~~~~~~~~~~~ prepare data ~~~~~~~~~~~~~~~~~~~~
59 self.nb = len(self.bus.id)
60 self.r, self.x = self._init_rx(self.branch)
61 self.swing_bus = self.bus.loc[self.bus.bus_type == "SWING"].index[0]
62 self.all_buses = {
63 "a": self.bus.loc[self.bus.phases.str.contains("a")].index.to_numpy(),
64 "b": self.bus.loc[self.bus.phases.str.contains("b")].index.to_numpy(),
65 "c": self.bus.loc[self.bus.phases.str.contains("c")].index.to_numpy(),
66 }
67 self.load_buses = {
68 "a": self.all_buses["a"][np.where(self.all_buses["a"] != self.swing_bus)],
69 "b": self.all_buses["b"][np.where(self.all_buses["b"] != self.swing_bus)],
70 "c": self.all_buses["c"][np.where(self.all_buses["c"] != self.swing_bus)],
71 }
72 self.gen_buses = dict(a=np.array([]), b=np.array([]), c=np.array([]))
73 if self.gen.shape[0] > 0:
74 self.gen_buses = {
75 "a": self.gen.loc[self.gen.phases.str.contains("a")].index.to_numpy(),
76 "b": self.gen.loc[self.gen.phases.str.contains("b")].index.to_numpy(),
77 "c": self.gen.loc[self.gen.phases.str.contains("c")].index.to_numpy(),
78 }
79 self.n_gens = (
80 len(self.gen_buses["a"])
81 + len(self.gen_buses["b"])
82 + len(self.gen_buses["c"])
83 )
84 self.cap_buses = dict(a=np.array([]), b=np.array([]), c=np.array([]))
85 if self.cap.shape[0] > 0:
86 self.cap_buses = {
87 "a": self.cap.loc[self.cap.phases.str.contains("a")].index.to_numpy(),
88 "b": self.cap.loc[self.cap.phases.str.contains("b")].index.to_numpy(),
89 "c": self.cap.loc[self.cap.phases.str.contains("c")].index.to_numpy(),
90 }
91 self.n_caps = (
92 len(self.cap_buses["a"])
93 + len(self.cap_buses["b"])
94 + len(self.cap_buses["c"])
95 )
96 self.reg_buses = dict(a=np.array([]), b=np.array([]), c=np.array([]))
97 if self.reg.shape[0] > 0:
98 self.reg_buses = {
99 "a": self.reg.loc[self.reg.phases.str.contains("a")].index.to_numpy(),
100 "b": self.reg.loc[self.reg.phases.str.contains("b")].index.to_numpy(),
101 "c": self.reg.loc[self.reg.phases.str.contains("c")].index.to_numpy(),
102 }
103 self.n_regs = (
104 len(self.reg_buses["a"])
105 + len(self.reg_buses["b"])
106 + len(self.reg_buses["c"])
107 )
108 # ~~ initialize index pointers ~~
109 self.n_x = 0
110 self.x_maps: dict[str, pd.DataFrame] = {}
111 self.v_map: dict[str, pd.Series] = {}
112 self.pg_map: dict[str, pd.Series] = {}
113 self.qg_map: dict[str, pd.Series] = {}
114 self.qc_map: dict[str, pd.Series] = {}
115 self.pl_map: dict[str, pd.Series] = {}
116 self.ql_map: dict[str, pd.Series] = {}
117 # ~~~~~~~~~~~~~~~~~~~~ initialize Aeq and beq ~~~~~~~~~~~~~~~~~~~~
118 self.a_eq, self.b_eq = csr_array([0]), zeros(0)
119 self.a_ub, self.b_ub = csr_array([0]), zeros(0)
120 self.bounds: np.ndarray = zeros(0)
121 self.bounds_tuple: list[tuple] = []
122 self.x_min: np.ndarray = zeros(0)
123 self.x_max: np.ndarray = zeros(0)
125 @staticmethod
126 def _init_rx(branch):
127 """
128 Initializes resistance (`r`) and reactance (`x`) data for network branches
129 using sparse matrix representations.
131 Parameters
132 ----------
133 branch : pd.DataFrame
134 DataFrame containing branch information.
136 Returns
137 -------
138 r, x : dict
139 Dictionaries of csr_arrays for resistance and reactance matrices indexed
140 by phase pairs (e.g., 'aa', 'ab', ...).
141 """
142 row = np.array(np.r_[branch.fb, branch.tb], dtype=int) - 1
143 col = np.array(np.r_[branch.tb, branch.fb], dtype=int) - 1
144 r = {
145 "aa": csr_array((np.r_[branch.raa, branch.raa], (row, col))),
146 "ab": csr_array((np.r_[branch.rab, branch.rab], (row, col))),
147 "ac": csr_array((np.r_[branch.rac, branch.rac], (row, col))),
148 "bb": csr_array((np.r_[branch.rbb, branch.rbb], (row, col))),
149 "bc": csr_array((np.r_[branch.rbc, branch.rbc], (row, col))),
150 "cc": csr_array((np.r_[branch.rcc, branch.rcc], (row, col))),
151 }
152 x = {
153 "aa": csr_array((np.r_[branch.xaa, branch.xaa], (row, col))),
154 "ab": csr_array((np.r_[branch.xab, branch.xab], (row, col))),
155 "ac": csr_array((np.r_[branch.xac, branch.xac], (row, col))),
156 "bb": csr_array((np.r_[branch.xbb, branch.xbb], (row, col))),
157 "bc": csr_array((np.r_[branch.xbc, branch.xbc], (row, col))),
158 "cc": csr_array((np.r_[branch.xcc, branch.xcc], (row, col))),
159 }
160 return r, x
163class LinDistBase(BaseModel):
164 """
165 LinDistFlow Model base class for linear power flow modeling.
167 This class represents a linearized distribution model used for calculating
168 power flows, voltages, and other system properties in a distribution network
169 using the linearized branch-flow formulation from [1]. The model is composed of several power system components
170 such as buses, branches, generators, capacitors, and regulators.
172 Parameters
173 ----------
174 branch_data : pd.DataFrame
175 DataFrame containing branch data including resistance and reactance values and limits.
176 bus_data : pd.DataFrame
177 DataFrame containing bus data such as loads, voltages, and limits.
178 gen_data : pd.DataFrame
179 DataFrame containing generator data.
180 cap_data : pd.DataFrame
181 DataFrame containing capacitor data.
182 reg_data : pd.DataFrame
183 DataFrame containing regulator data.
185 References
186 ----------
187 [1] R. R. Jha, A. Dubey, C.-C. Liu, and K. P. Schneider,
188 “Bi-Level Volt-VAR Optimization to Coordinate Smart Inverters
189 With Voltage Control Devices,”
190 IEEE Trans. Power Syst., vol. 34, no. 3, pp. 1801–1813,
191 May 2019, doi: 10.1109/TPWRS.2018.2890613.
193 Examples
194 --------
195 This example demonstrates how to set up and solve a linear distribution flow model
196 using a provided case, and visualize the results.
198 >>> import distopf as opf
199 >>> # Prepare the case data
200 >>> case = opf.DistOPFCase(data_path="ieee123_30der")
201 >>> # Initialize the LinDistModel
202 >>> model = LinDistModel(
203 ... branch_data=case.branch_data,
204 ... bus_data=case.bus_data,
205 ... gen_data=case.gen_data,
206 ... cap_data=case.cap_data,
207 ... reg_data=case.reg_data,
208 ... )
209 >>> # Solve the model using the specified objective function
210 >>> result = opf.lp_solve(model, opf.gradient_load_min(model))
211 >>> # Extract and plot results
212 >>> v = model.get_voltages(result.x)
213 >>> s = model.get_apparent_power_flows(result.x)
214 >>> p_gens = model.get_p_gens(result.x)
215 >>> q_gens = model.get_q_gens(result.x)
216 >>> # Visualize network and power flows
217 >>> opf.plot_network(model, v=v, s=s, p_gen=p_gens, q_gen=q_gens).show()
218 >>> opf.plot_voltages(v).show()
219 >>> opf.plot_power_flows(s).show()
220 >>> opf.plot_gens(p_gens, q_gens).show()
221 """
223 def initialize_variable_index_pointers(self):
224 self.x_maps, self.n_x = self._variable_tables(self.branch)
225 self.v_map, self.n_x = self._add_device_variables(self.n_x, self.all_buses)
226 self.pg_map, self.n_x = self._add_device_variables(self.n_x, self.gen_buses)
227 self.qg_map, self.n_x = self._add_device_variables(self.n_x, self.gen_buses)
228 self.qc_map, self.n_x = self._add_device_variables(self.n_x, self.cap_buses)
229 self.vx_map, self.n_x = self._add_device_variables(self.n_x, self.reg_buses)
231 def build(self):
232 self.initialize_variable_index_pointers()
233 self.a_eq, self.b_eq = self.create_model()
234 self.a_ub, self.b_ub = self.create_inequality_constraints()
235 self.bounds = self.init_bounds()
236 self.bounds_tuple = list(map(tuple, self.bounds))
237 self.x_min = self.bounds[:, 0]
238 self.x_max = self.bounds[:, 1]
240 @staticmethod
241 def _variable_tables(branch, n_x=0) -> Tuple[dict[str, pd.DataFrame], int]:
242 """
243 Constructs tables to map branch power variables to indices within the
244 optimization variable vector.
246 Parameters
247 ----------
248 branch : pd.DataFrame
249 DataFrame containing branch information.
250 n_x : int
251 Initial variable index; used to offset additional device variables.
253 Returns
254 -------
255 x_maps : dict
256 Dictionary of DataFrames mapping branch indices to variable indices.
257 n_x : int
258 Updated variable index after accounting for new variables.
259 """
260 x_maps = {}
261 for a in "abc":
262 indices = branch.phases.str.contains(a)
263 lines = branch.loc[indices, ["fb", "tb"]].values.astype(int) - 1
264 n_lines = len(lines)
265 df = pd.DataFrame(columns=["bi", "bj", "pij", "qij"], index=range(n_lines))
266 if n_lines == 0:
267 x_maps[a] = df.astype(int)
268 continue
269 g: nx.Graph = nx.Graph()
270 g.add_edges_from(lines)
271 i_root = list(set(lines[:, 0]) - set(lines[:, 1]))[
272 0
273 ] # root node is only node with no from-bus
274 edges = np.array(list(nx.dfs_edges(g, source=i_root)))
275 df["bi"] = edges[:, 0]
276 df["bj"] = edges[:, 1]
277 df["pij"] = np.array([i for i in range(n_x, n_x + n_lines)])
278 n_x = n_x + n_lines
279 df["qij"] = np.array([i for i in range(n_x, n_x + n_lines)])
280 n_x = n_x + n_lines
281 x_maps[a] = df.astype(int)
282 return x_maps, n_x
284 @staticmethod
285 def _add_device_variables(
286 n_x: int, device_buses: dict
287 ) -> Tuple[dict[str, pd.Series], int]:
288 """
289 Adds device-related variables (e.g., voltage, power load) to the
290 optimization problem, indexed by the phase.
292 Parameters
293 ----------
294 n_x : int
295 Starting offset for variable indices.
296 device_buses : dict
297 Dictionary containing bus indices for devices, categorized by phase.
299 Returns
300 -------
301 device_maps : dict
302 Mapping of variable indices to bus indices for each device.
303 n_x : int
304 Updated offset after adding device variables.
305 """
306 n_a = len(device_buses["a"])
307 n_b = len(device_buses["b"])
308 n_c = len(device_buses["c"])
309 device_maps = {
310 "a": pd.Series(range(n_x, n_x + n_a), index=device_buses["a"]),
311 "b": pd.Series(range(n_x + n_a, n_x + n_a + n_b), index=device_buses["b"]),
312 "c": pd.Series(
313 range(n_x + n_a + n_b, n_x + n_a + n_b + n_c), index=device_buses["c"]
314 ),
315 }
316 n_x = n_x + n_a + n_b + n_c
317 return device_maps, n_x
319 def init_bounds(self) -> np.ndarray:
320 """
321 Initializes the variable bounds for the optimization problem.
323 Returns
324 -------
325 bounds : np.ndarray
326 Array of upper and lower bounds for each variable.
327 """
328 default = 100e3 # Default for unbounded variables.
329 # ~~~~~~~~~~ x limits ~~~~~~~~~~
330 x_lim_lower = np.ones(self.n_x) * -default
331 x_lim_upper = np.ones(self.n_x) * default
332 x_lim_lower, x_lim_upper = self.add_voltage_limits(x_lim_lower, x_lim_upper)
333 x_lim_lower, x_lim_upper = self.add_generator_limits(x_lim_lower, x_lim_upper)
334 x_lim_lower, x_lim_upper = self.user_added_limits(x_lim_lower, x_lim_upper)
335 bounds = np.c_[x_lim_lower, x_lim_upper]
336 # bounds = [(l, u) for (l, u) in zip(x_lim_lower, x_lim_upper)]
337 return bounds
339 def user_added_limits(
340 self, x_lim_lower, x_lim_upper
341 ) -> Tuple[np.ndarray, np.ndarray]:
342 """
343 User added limits function. Override this function to add custom variable limits.
344 Parameters
345 ----------
346 x_lim_lower :
347 x_lim_upper :
349 Returns
350 -------
351 x_lim_lower : lower limits for x-vector
352 x_lim_upper : upper limits for x-vector
354 Examples
355 --------
356 ```python
357 p_lim = 10
358 q_lim = 10
359 for a in "abc":
360 if not self.phase_exists(a):
361 continue
362 x_lim_lower[self.x_maps[a].pij] = -p_lim
363 x_lim_upper[self.x_maps[a].pij] = p_lim
364 x_lim_lower[self.x_maps[a].qij] = -q_lim
365 x_lim_upper[self.x_maps[a].qij] = q_lim
366 ```
367 """
368 return x_lim_lower, x_lim_upper
370 def add_voltage_limits(
371 self, x_lim_lower, x_lim_upper
372 ) -> Tuple[np.ndarray, np.ndarray]:
373 for a in "abc":
374 if not self.phase_exists(a):
375 continue
376 # ~~ v limits ~~:
377 x_lim_upper[self.v_map[a]] = self.bus.loc[self.v_map[a].index, "v_max"] ** 2
378 x_lim_lower[self.v_map[a]] = self.bus.loc[self.v_map[a].index, "v_min"] ** 2
379 return x_lim_lower, x_lim_upper
381 def add_generator_limits(
382 self, x_lim_lower, x_lim_upper
383 ) -> Tuple[np.ndarray, np.ndarray]:
384 for a in "abc":
385 if not self.phase_exists(a):
386 continue
387 s_rated = self.gen[f"s{a}_max"]
388 p_out = self.gen[f"p{a}"]
389 q_max = ((s_rated**2) - (p_out**2)) ** (1 / 2)
390 q_min = -q_max
391 q_max_manual = self.gen.get(f"q{a}_max", np.ones_like(q_min) * 100e3)
392 q_min_manual = self.gen.get(f"q{a}_min", np.ones_like(q_min) * -100e3)
393 for j in self.gen_buses[a]:
394 mode = self.gen.loc[j, "control_variable"]
395 pg = self.idx("pg", j, a)
396 qg = self.idx("qg", j, a)
397 # active power bounds
398 x_lim_lower[pg] = 0
399 x_lim_upper[pg] = p_out[j]
400 # reactive power bounds
401 if mode == opf.CONSTANT_P:
402 x_lim_lower[qg] = max(q_min[j], q_min_manual[j])
403 x_lim_upper[qg] = min(q_max[j], q_max_manual[j])
404 if mode != opf.CONSTANT_P:
405 # reactive power bounds
406 x_lim_lower[qg] = max(-s_rated[j], q_min_manual[j])
407 x_lim_upper[qg] = min(s_rated[j], q_max_manual[j])
408 return x_lim_lower, x_lim_upper
410 @cache
411 def branch_into_j(self, var, j, phase):
412 idx = self.x_maps[phase].loc[self.x_maps[phase].bj == j, var].to_numpy()
413 return idx[~np.isnan(idx)].astype(int)
415 @cache
416 def branches_out_of_j(self, var, j, phase):
417 idx = self.x_maps[phase].loc[self.x_maps[phase].bi == j, var].to_numpy()
418 return idx[~np.isnan(idx)].astype(int)
420 @cache
421 def idx(self, var, node_j, phase):
422 if var in self.x_maps[phase].columns:
423 return self.branch_into_j(var, node_j, phase)
424 if var in ["pjk"]: # indexes of all branch active power out of node j
425 return self.branches_out_of_j("pij", node_j, phase)
426 if var in ["qjk"]: # indexes of all branch reactive power out of node j
427 return self.branches_out_of_j("qij", node_j, phase)
428 if var in ["v"]: # active power generation at node
429 return self.v_map[phase].get(node_j, [])
430 if var in ["pg", "p_gen"]: # active power generation at node
431 return self.pg_map[phase].get(node_j, [])
432 if var in ["qg", "q_gen"]: # reactive power generation at node
433 return self.qg_map[phase].get(node_j, [])
434 if var in ["qc", "q_cap"]: # reactive power injection by capacitor
435 return self.qc_map[phase].get(node_j, [])
436 if var in ["vx"]:
437 return self.vx_map[phase].get(node_j, [])
438 ix = self.additional_variable_idx(var, node_j, phase)
439 if ix is not None:
440 return ix
441 raise ValueError(f"Variable name, '{var}', not found.")
443 def additional_variable_idx(self, var, node_j, phase):
444 """
445 User added index function. Override this function to add custom variables. Return None if `var` is not found.
446 Parameters
447 ----------
448 var : name of variable
449 node_j : node index (0 based; bus.id - 1)
450 phase : "a", "b", or "c"
452 Returns
453 -------
454 ix : index or list of indices of variable within x-vector or None if `var` is not found.
455 """
456 return None
458 @cache
459 def phase_exists(self, phase, index: Optional[int] = None) -> bool:
460 if index is None:
461 return self.x_maps[phase].shape[0] > 0
462 return len(self.idx("bj", index, phase)) > 0
464 def create_model(self) -> Tuple[csr_array, np.ndarray]:
465 """
466 Constructs the equality constraint matrices for the linear optimization
467 problem based on power flow equations.
468 a_eq*x == b_eq
470 Returns
471 -------
472 a_eq : csr_array
473 Sparse array representing the equality constraint matrix.
474 b_eq : np.ndarray
475 Array representing the equality constraint vector.
476 """
477 # ########## Aeq and Beq Formation ###########
478 n_rows = self.n_x
479 n_cols = self.n_x
480 # Aeq has the same number of rows as equations with a column for each x
481 a_eq = lil_array((n_rows, n_cols))
482 b_eq = zeros(n_rows)
483 for j in range(1, self.nb):
484 for ph in ["abc", "bca", "cab"]:
485 a, b, c = ph[0], ph[1], ph[2]
486 if not self.phase_exists(a, j):
487 continue
488 a_eq, b_eq = self.add_power_flow_model(a_eq, b_eq, j, a)
489 a_eq, b_eq = self.add_voltage_drop_model(a_eq, b_eq, j, a, b, c)
490 a_eq, b_eq = self.add_swing_voltage_model(a_eq, b_eq, j, a)
491 a_eq, b_eq = self.add_regulator_model(a_eq, b_eq, j, a)
492 a_eq, b_eq = self.add_load_model(a_eq, b_eq, j, a)
493 a_eq, b_eq = self.add_generator_model(a_eq, b_eq, j, a)
494 a_eq, b_eq = self.add_capacitor_model(a_eq, b_eq, j, a)
495 return csr_array(a_eq), b_eq
497 def add_power_flow_model(
498 self, a_eq: lil_array, b_eq, j, phase
499 ) -> Tuple[lil_array, np.ndarray]:
500 pij = self.idx("pij", j, phase)
501 qij = self.idx("qij", j, phase)
502 pjk = self.idx("pjk", j, phase)
503 qjk = self.idx("qjk", j, phase)
504 # Set P equation variable coefficients in a_eq
505 a_eq[pij, pij] = 1
506 a_eq[pij, pjk] = -1
507 # Set Q equation variable coefficients in a_eq
508 a_eq[qij, qij] = 1
509 a_eq[qij, qjk] = -1
510 return a_eq, b_eq
512 def add_voltage_drop_model(
513 self, a_eq: lil_array, b_eq, j, a, b, c
514 ) -> Tuple[lil_array, np.ndarray]:
515 if self.reg is not None:
516 if j in self.reg.tb:
517 return a_eq, b_eq
518 r, x = self.r, self.x
519 aa = "".join(sorted(a + a))
520 # if ph=='cab', then a+b=='ca'. Sort so ab=='ac'
521 ab = "".join(sorted(a + b))
522 ac = "".join(sorted(a + c))
523 i = self.idx("bi", j, a)[0] # get the upstream node, i, on branch from i to j
524 pij = self.idx("pij", j, a)
525 qij = self.idx("qij", j, a)
526 pijb = self.idx("pij", j, b)
527 qijb = self.idx("qij", j, b)
528 pijc = self.idx("pij", j, c)
529 qijc = self.idx("qij", j, c)
530 vi = self.idx("v", i, a)
531 vj = self.idx("v", j, a)
532 # Set V equation variable coefficients in a_eq and constants in b_eq
533 a_eq[vj, vj] = 1
534 a_eq[vj, vi] = -1
535 a_eq[vj, pij] = 2 * r[aa][i, j]
536 a_eq[vj, qij] = 2 * x[aa][i, j]
537 if self.phase_exists(b, j):
538 a_eq[vj, pijb] = -r[ab][i, j] + sqrt(3) * x[ab][i, j]
539 a_eq[vj, qijb] = -x[ab][i, j] - sqrt(3) * r[ab][i, j]
540 if self.phase_exists(c, j):
541 a_eq[vj, pijc] = -r[ac][i, j] - sqrt(3) * x[ac][i, j]
542 a_eq[vj, qijc] = -x[ac][i, j] + sqrt(3) * r[ac][i, j]
543 return a_eq, b_eq
545 def add_regulator_model(
546 self, a_eq: lil_array, b_eq, j, a
547 ) -> Tuple[lil_array, np.ndarray]:
548 if self.reg is None:
549 return a_eq, b_eq
550 if j not in self.reg.tb:
551 return a_eq, b_eq
552 i = self.idx("bi", j, a)[0] # get the upstream node, i, on branch from i to j
553 pij = self.idx("pij", j, a)
554 qij = self.idx("qij", j, a)
555 vi = self.idx("v", i, a)
556 vj = self.idx("v", j, a)
557 vx = self.idx("vx", j, a)
558 r, x = self.r, self.x
559 aa = "".join(sorted(a + a))
561 a_eq[vj, vj] = 1
562 a_eq[vj, vx] = -1
563 a_eq[vj, pij] = 2 * r[aa][i, j]
564 a_eq[vj, qij] = 2 * x[aa][i, j]
566 reg_ratio = get(self.reg[f"ratio_{a}"], j, 1)
567 a_eq[vx, vx] = 1
568 a_eq[vx, vi] = -1 * reg_ratio**2
569 return a_eq, b_eq
571 def add_swing_voltage_model(
572 self, a_eq: lil_array, b_eq, j, a
573 ) -> Tuple[lil_array, np.ndarray]:
574 i = self.idx("bi", j, a)[0] # get the upstream node, i, on branch from i to j
575 vi = self.idx("v", i, a)
576 # Set V equation variable coefficients in a_eq and constants in b_eq
577 if self.bus.bus_type[i] == opf.SWING_BUS: # Swing bus
578 a_eq[vi, vi] = 1
579 b_eq[vi] = self.bus.at[i, f"v_{a}"] ** 2
580 return a_eq, b_eq
582 def add_generator_model(
583 self, a_eq: lil_array, b_eq, j, a
584 ) -> Tuple[lil_array, np.ndarray]:
585 p_gen_nom, q_gen_nom = 0, 0
586 if self.gen is not None:
587 p_gen_nom = get(self.gen[f"p{a}"], j, 0)
588 q_gen_nom = get(self.gen[f"q{a}"], j, 0)
589 # equation indexes
590 pij = self.idx("pij", j, a)
591 qij = self.idx("qij", j, a)
592 pg = self.idx("pg", j, a)
593 qg = self.idx("qg", j, a)
594 # Set Generator equation variable coefficients in a_eq
595 a_eq[pij, pg] = 1
596 a_eq[qij, qg] = 1
597 if get(self.gen["control_variable"], j, 0) in [opf.CONSTANT_PQ, opf.CONSTANT_P]:
598 a_eq[pg, pg] = 1
599 b_eq[pg] = p_gen_nom
600 if get(self.gen["control_variable"], j, 0) in [opf.CONSTANT_PQ, opf.CONSTANT_Q]:
601 a_eq[qg, qg] = 1
602 b_eq[qg] = q_gen_nom
603 return a_eq, b_eq
605 def add_load_model(
606 self, a_eq: lil_array, b_eq, j, a
607 ) -> Tuple[lil_array, np.ndarray]:
608 pij = self.idx("pij", j, a)
609 qij = self.idx("qij", j, a)
610 vj = self.idx("v", j, a)
611 p_load_nom, q_load_nom = 0, 0
612 if self.bus.bus_type[j] == opf.PQ_BUS:
613 p_load_nom = self.bus[f"pl_{a}"][j]
614 q_load_nom = self.bus[f"ql_{a}"][j]
615 if self.bus.bus_type[j] != opf.PQ_FREE:
616 # Set loads model to power flow equation
617 a_eq[pij, vj] = -(self.bus.cvr_p[j] / 2) * p_load_nom
618 b_eq[pij] = (1 - (self.bus.cvr_p[j] / 2)) * p_load_nom
619 a_eq[qij, vj] = -(self.bus.cvr_q[j] / 2) * q_load_nom
620 b_eq[qij] = (1 - (self.bus.cvr_q[j] / 2)) * q_load_nom
621 return a_eq, b_eq
623 def add_capacitor_model(
624 self, a_eq: lil_array, b_eq, j, a
625 ) -> Tuple[lil_array, np.ndarray]:
626 q_cap_nom = 0
627 if self.cap is not None:
628 q_cap_nom = get(self.cap[f"q{a}"], j, 0)
629 # equation indexes
630 qij = self.idx("qij", j, a)
631 vj = self.idx("v", j, a)
632 qc = self.idx("q_cap", j, a)
633 a_eq[qij, qc] = 1 # add capacitor q variable to power flow equation
634 a_eq[qc, qc] = 1
635 a_eq[qc, vj] = -q_cap_nom
636 return a_eq, b_eq
638 def create_inequality_constraints(self) -> Tuple[csr_array, np.ndarray]:
639 """
640 Constructs the inequality constraint matrices.
641 a_ub*x <= b_ub
643 Returns
644 -------
645 a_ub, : csr_array
646 Sparse array representing the inequality constraint matrix.
647 b_ub : np.ndarray
648 Array representing the inequality constraint vector.
649 """
650 a_ub, b_ub = self.create_octagon_constraints()
651 return csr_array(a_ub), b_ub
653 def create_hexagon_constraints(self) -> Tuple[csr_array, np.ndarray]:
654 """
655 Use a hexagon to approximate the circular inequality constraint of an inverter.
656 """
657 n_inequalities = 5
658 n_rows_ineq = n_inequalities * (
659 len(np.where(self.gen.control_variable == opf.CONTROL_PQ)[0]) * 3
660 )
661 n_rows_ineq = max(n_rows_ineq, 1)
662 a_ineq = lil_array((n_rows_ineq, self.n_x))
663 b_ineq = zeros(n_rows_ineq)
664 ineq = list(range(n_inequalities)) # initialize equation indices
665 for j in self.gen.index:
666 for a in "abc":
667 if not self.phase_exists(a, j):
668 continue
669 if self.gen.loc[j, "control_variable"] != opf.CONTROL_PQ:
670 continue
671 pg = self.idx("pg", j, a)
672 qg = self.idx("qg", j, a)
673 s_rated = self.gen.at[j, f"s{a}_max"]
674 coef = sqrt(3) / 3 # ~=0.5774
675 # Right half plane. Positive P
676 # limit for small +P and large +Q
677 a_ineq[ineq[0], pg] = 0
678 a_ineq[ineq[0], qg] = 2 * coef
679 b_ineq[ineq[0]] = s_rated
680 # limit for large +P and small +Q
681 a_ineq[ineq[1], pg] = 1
682 a_ineq[ineq[1], qg] = coef
683 b_ineq[ineq[1]] = s_rated
684 # limit for large +P and small -Q
685 a_ineq[ineq[2], pg] = 1
686 a_ineq[ineq[2], qg] = -coef
687 b_ineq[ineq[2]] = s_rated
688 # limit for small +P and large -Q
689 a_ineq[ineq[3], pg] = 0
690 a_ineq[ineq[3], qg] = -2 * coef
691 b_ineq[ineq[3]] = s_rated
692 # limit to right half plane
693 a_ineq[ineq[4], pg] = -1
694 b_ineq[ineq[4]] = 0
695 # increment equation indices
696 for n_ineq in range(len(ineq)):
697 ineq[n_ineq] += len(ineq)
698 return csr_array(a_ineq), b_ineq
700 def create_octagon_constraints(self) -> Tuple[csr_array, np.ndarray]:
701 """
702 Use an octagon to approximate the circular inequality constraint of an inverter.
703 """
705 # ########## Aineq and Bineq Formation ###########
706 n_inequalities = 5
708 n_rows_ineq = n_inequalities * (
709 len(np.where(self.gen.control_variable == opf.CONTROL_PQ)[0]) * 3
710 )
711 n_rows_ineq = max(n_rows_ineq, 1)
712 a_ineq = lil_array((n_rows_ineq, self.n_x))
713 b_ineq = zeros(n_rows_ineq)
714 ineq = list(range(n_inequalities))
715 for j in self.gen.index:
716 for a in "abc":
717 if not self.phase_exists(a, j):
718 continue
719 if self.gen.loc[j, "control_variable"] != opf.CONTROL_PQ:
720 continue
721 pg = self.idx("pg", j, a)
722 qg = self.idx("qg", j, a)
723 s_rated: float = self.gen.at[j, f"s{a}_max"] # type: ignore
724 coef = sqrt(2) - 1 # ~=0.4142
725 # Right half plane. Positive P
726 # limit for small +P and large +Q
727 a_ineq[ineq[0], pg] = coef
728 a_ineq[ineq[0], qg] = 1
729 b_ineq[ineq[0]] = s_rated
730 # limit for large +P and small +Q
731 a_ineq[ineq[1], pg] = 1
732 a_ineq[ineq[1], qg] = coef
733 b_ineq[ineq[1]] = s_rated
734 # limit for large +P and small -Q
735 a_ineq[ineq[2], pg] = 1
736 a_ineq[ineq[2], qg] = -coef
737 b_ineq[ineq[2]] = s_rated
738 # limit for small +P and large -Q
739 a_ineq[ineq[3], pg] = coef
740 a_ineq[ineq[3], qg] = -1
741 b_ineq[ineq[3]] = s_rated
742 # limit to right half plane
743 a_ineq[ineq[4], pg] = -1
744 b_ineq[ineq[4]] = 0
746 for n_ineq in range(len(ineq)):
747 ineq[n_ineq] += len(ineq)
748 return csr_array(a_ineq), b_ineq
750 def parse_results(self, x, variable_name: str):
751 values = pd.DataFrame(columns=["name", "a", "b", "c"])
752 for ph in "abc":
753 for j in self.all_buses[ph]:
754 values.at[j + 1, "name"] = self.bus.at[j, "name"]
755 values.at[j + 1, ph] = x[self.idx(variable_name, j, ph)]
756 return values.sort_index()
758 def get_device_variables(self, x, variable_map):
759 if len(variable_map.keys()) == 0:
760 return pd.DataFrame(columns=["id", "name", "t", "a", "b", "c"])
761 index = np.unique(
762 np.r_[
763 variable_map["a"].index,
764 variable_map["b"].index,
765 variable_map["c"].index,
766 ]
767 )
768 bus_id = index + 1
769 df = pd.DataFrame(columns=["id", "name", "a", "b", "c"], index=bus_id)
770 df.id = bus_id
771 df.loc[bus_id, "name"] = self.bus.loc[index, "name"].to_numpy()
772 for a in "abc":
773 df.loc[variable_map[a].index + 1, a] = x[variable_map[a]]
774 return df
776 def get_voltages(self, x):
777 v_df = self.get_device_variables(x, self.v_map)
778 v_df.loc[:, ["a", "b", "c"]] = v_df.loc[:, ["a", "b", "c"]] ** 0.5
779 return v_df
781 def get_p_gens(self, x):
782 return self.get_device_variables(x, self.pg_map)
784 def get_q_gens(self, x):
785 return self.get_device_variables(x, self.qg_map)
787 def get_q_caps(self, x):
788 return self.get_device_variables(x, self.qc_map)
790 def get_apparent_power_flows(self, x):
791 s_df = pd.DataFrame(
792 columns=["fb", "tb", "from_name", "to_name", "a", "b", "c"],
793 index=range(2, self.nb + 1),
794 )
795 s_df["a"] = s_df["a"].astype(complex)
796 s_df["b"] = s_df["b"].astype(complex)
797 s_df["c"] = s_df["c"].astype(complex)
798 for ph in "abc":
799 fb_idxs = self.x_maps[ph].bi.to_numpy()
800 fb_names = self.bus.name[fb_idxs].to_numpy()
801 tb_idxs = self.x_maps[ph].bj.to_numpy()
802 tb_names = self.bus.name[tb_idxs].to_numpy()
803 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, "fb"] = fb_idxs + 1
804 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, "tb"] = tb_idxs + 1
805 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, "from_name"] = fb_names
806 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, "to_name"] = tb_names
807 s_df.loc[self.x_maps[ph].bj.to_numpy() + 1, ph] = (
808 x[self.x_maps[ph].pij] + 1j * x[self.x_maps[ph].qij]
809 )
810 return s_df
812 def get_p_flows(self, x):
813 df = pd.DataFrame(
814 columns=["fb", "tb", "from_name", "to_name", "a", "b", "c"],
815 index=range(2, self.nb + 1),
816 )
817 df["a"] = df["a"].astype(float)
818 df["b"] = df["b"].astype(float)
819 df["c"] = df["c"].astype(float)
820 for ph in "abc":
821 fb_idxs = self.x_maps[ph].bi.to_numpy()
822 fb_names = self.bus.name[fb_idxs].to_numpy()
823 tb_idxs = self.x_maps[ph].bj.to_numpy()
824 tb_names = self.bus.name[tb_idxs].to_numpy()
825 df.loc[self.x_maps[ph].bj.to_numpy() + 1, "fb"] = fb_idxs + 1
826 df.loc[self.x_maps[ph].bj.to_numpy() + 1, "id"] = tb_idxs + 1
827 df.loc[self.x_maps[ph].bj.to_numpy() + 1, "from_name"] = fb_names
828 df.loc[self.x_maps[ph].bj.to_numpy() + 1, "name"] = tb_names
829 df.loc[self.x_maps[ph].bj.to_numpy() + 1, ph] = x[self.x_maps[ph].pij]
830 return df
832 def get_q_flows(self, x):
833 df = pd.DataFrame(
834 columns=["fb", "tb", "from_name", "to_name", "a", "b", "c"],
835 index=range(2, self.nb + 1),
836 )
837 df["a"] = df["a"].astype(float)
838 df["b"] = df["b"].astype(float)
839 df["c"] = df["c"].astype(float)
840 for ph in "abc":
841 fb_idxs = self.x_maps[ph].bi.to_numpy()
842 fb_names = self.bus.name[fb_idxs].to_numpy()
843 tb_idxs = self.x_maps[ph].bj.to_numpy()
844 tb_names = self.bus.name[tb_idxs].to_numpy()
845 df.loc[self.x_maps[ph].bj.to_numpy() + 1, "fb"] = fb_idxs + 1
846 df.loc[self.x_maps[ph].bj.to_numpy() + 1, "id"] = tb_idxs + 1
847 df.loc[self.x_maps[ph].bj.to_numpy() + 1, "from_name"] = fb_names
848 df.loc[self.x_maps[ph].bj.to_numpy() + 1, "name"] = tb_names
849 df.loc[self.x_maps[ph].bj.to_numpy() + 1, ph] = x[self.x_maps[ph].qij]
850 return df
852 # def update(
853 # self,
854 # bus_data: Optional[pd.DataFrame] = None,
855 # gen_data: Optional[pd.DataFrame] = None,
856 # cap_data: Optional[pd.DataFrame] = None,
857 # reg_data: Optional[pd.DataFrame] = None,
858 # ):
859 # # TODO: update is untested!
860 # warnings.warn("update method is untested!")
861 # if bus_data is not None:
862 # self.bus = handle_bus_input(bus_data)
863 # if gen_data is not None:
864 # self.gen = handle_gen_input(gen_data)
865 # if cap_data is not None:
866 # self.cap = handle_cap_input(cap_data)
867 # if reg_data is not None:
868 # self.reg = handle_reg_input(reg_data)
870 # a_eq = lil_array(self.a_eq)
871 # for j in range(1, self.nb):
872 # for ph in ["abc", "bca", "cab"]:
873 # a, b, c = ph[0], ph[1], ph[2]
874 # if not self.phase_exists(a, j):
875 # continue
876 # if bus_data is not None:
877 # a_eq, self.b_eq = self.add_swing_voltage_model(
878 # a_eq, self.b_eq, j, a
879 # )
880 # a_eq, self.b_eq = self.add_load_model(a_eq, self.b_eq, j, a)
881 # if gen_data is not None:
882 # a_eq, self.b_eq = self.add_generator_model(a_eq, self.b_eq, j, a)
883 # if cap_data is not None:
884 # a_eq, self.b_eq = self.add_capacitor_model(a_eq, self.b_eq, j, a)
885 # if reg_data is not None:
886 # a_eq, self.b_eq = self.add_regulator_model(a_eq, self.b_eq, j, a)
887 # self.a_eq = csr_array(a_eq)
888 # self.a_ub, self.b_ub = self.create_inequality_constraints()
889 # self.bounds = self.init_bounds()
890 # self.bounds_tuple = list(map(tuple, self.bounds))
891 # self.x_min = self.bounds[:, 0]
892 # self.x_max = self.bounds[:, 1]