Coverage for src/distopf/pyomo_models/pyomo_lindist.py: 94%
107 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-08 11:30 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-08 11:30 -0700
1import pyomo.environ as pyo
2from typing import Optional, Tuple, Dict, List
3import pandas as pd
4from dataclasses import dataclass
5import distopf as opf
6from numpy import sqrt, ones_like
7from distopf.utils.utils import (
8 handle_branch_input,
9 handle_bus_input,
10 handle_gen_input,
11 handle_cap_input,
12 handle_reg_input,
13)
16@dataclass
17class Case:
18 branch_data: Optional[pd.DataFrame] = None
19 bus_data: Optional[pd.DataFrame] = None
20 gen_data: Optional[pd.DataFrame] = None
21 cap_data: Optional[pd.DataFrame] = None
22 reg_data: Optional[pd.DataFrame] = None
25def _create_phase_tuples(df: pd.DataFrame, id_col: str = "id") -> List[Tuple[str, str]]:
26 """Create (id, phase) tuples from dataframe with phases column"""
27 result = []
28 for _, row in df.iterrows():
29 result.extend([(row[id_col], phase) for phase in row.phases])
30 return result
33def _create_sets(
34 m: pyo.ConcreteModel,
35 bus: pd.DataFrame,
36 branch: pd.DataFrame,
37 gen: pd.DataFrame,
38 cap: pd.DataFrame,
39) -> None:
40 """Create all Pyomo sets"""
41 m.bus_set = pyo.Set(initialize=bus.id.tolist())
42 m.phase_set = pyo.Set(initialize=["a", "b", "c"])
43 m.swing_bus_set = pyo.Set(initialize=bus[bus.bus_type == "SWING"].id.tolist())
44 m.branch_set = pyo.Set(initialize=bus[bus.bus_type != "SWING"].id.tolist())
45 m.phase_pair_set = pyo.Set(initialize=["aa", "ab", "ac", "bb", "bc", "cc"])
47 m.bus_phase_set = pyo.Set(initialize=_create_phase_tuples(bus), dimen=2)
48 m.branch_phase_set = pyo.Set(initialize=_create_phase_tuples(branch, "tb"), dimen=2)
49 m.gen_phase_set = pyo.Set(
50 initialize=_create_phase_tuples(gen) if len(gen) > 0 else [], dimen=2
51 )
52 m.cap_phase_set = pyo.Set(
53 initialize=_create_phase_tuples(cap) if len(cap) > 0 else [], dimen=2
54 )
57def _create_parameters(m: pyo.ConcreteModel, branch: pd.DataFrame) -> None:
58 """Create resistance and reactance parameters"""
59 r_data, x_data = {}, {}
61 for _, row in branch.iterrows():
62 for phase_pair in m.phase_pair_set:
63 r_col, x_col = f"r{phase_pair}", f"x{phase_pair}"
65 if r_col in branch.columns and x_col in branch.columns:
66 r_data[(phase_pair, row.tb)] = row[r_col]
67 x_data[(phase_pair, row.tb)] = row[x_col]
69 m.r = pyo.Param(m.phase_pair_set, m.branch_set, initialize=r_data, default=0.0)
70 m.x = pyo.Param(m.phase_pair_set, m.branch_set, initialize=x_data, default=0.0)
73def _create_variables(m: pyo.ConcreteModel) -> None:
74 """Create all variables without bounds"""
75 m.v = pyo.Var(m.bus_phase_set) # Voltage magnitude squared
76 m.p_flow = pyo.Var(m.branch_phase_set)
77 m.q_flow = pyo.Var(m.branch_phase_set)
78 m.p_gen = pyo.Var(m.gen_phase_set)
79 m.q_gen = pyo.Var(m.gen_phase_set)
80 m.q_cap = pyo.Var(m.cap_phase_set)
83def add_voltage_bounds(m: pyo.ConcreteModel, bus: pd.DataFrame) -> None:
84 """Add voltage bounds (for voltage magnitude squared)"""
85 for bus_id, phase in m.bus_phase_set:
86 bus_row = bus[bus.id == bus_id].iloc[0]
87 v_min = getattr(bus_row, "v_min", 0.95)
88 v_max = getattr(bus_row, "v_max", 1.05)
90 # Apply bounds to voltage squared
91 m.v[bus_id, phase].setlb(v_min**2)
92 m.v[bus_id, phase].setub(v_max**2)
95def add_generator_bounds(m: pyo.ConcreteModel, gen: pd.DataFrame) -> None:
96 """Add generator bounds following the original base.py logic"""
97 if len(gen) == 0:
98 return
100 for phase in "abc":
101 # Get generators that have this phase
102 gen_phase_buses = [bus_id for bus_id, p in m.gen_phase_set if p == phase]
103 if not gen_phase_buses:
104 continue
106 # Create lookup dictionaries for generator data by bus_id
107 gen_lookup = {}
108 for _, row in gen.iterrows():
109 gen_lookup[row.id] = row
111 # Get phase-specific arrays for generators with this phase
112 s_rated_dict = {}
113 p_out_dict = {}
114 q_max_manual_dict = {}
115 q_min_manual_dict = {}
117 for bus_id in gen_phase_buses:
118 if bus_id in gen_lookup:
119 row = gen_lookup[bus_id]
120 s_rated_dict[bus_id] = getattr(row, f"s{phase}_max", 0)
121 p_out_dict[bus_id] = getattr(row, f"p{phase}", 0)
122 q_max_manual_dict[bus_id] = getattr(row, f"q{phase}_max", 100e3)
123 q_min_manual_dict[bus_id] = getattr(row, f"q{phase}_min", -100e3)
125 for bus_id in gen_phase_buses:
126 if bus_id not in gen_lookup:
127 continue
129 gen_row = gen_lookup[bus_id]
130 mode = getattr(gen_row, "control_variable", "")
132 s_rated = s_rated_dict[bus_id]
133 p_out = p_out_dict[bus_id]
134 q_max_manual = q_max_manual_dict[bus_id]
135 q_min_manual = q_min_manual_dict[bus_id]
137 # Active power bounds
138 m.p_gen[bus_id, phase].setlb(0)
139 m.p_gen[bus_id, phase].setub(p_out)
141 # Reactive power bounds
142 if mode == opf.CONSTANT_P: # mode == "Q"
143 # Use capability curve limits
144 q_max_cap = sqrt(max(0, s_rated**2 - p_out**2))
145 q_min_cap = -q_max_cap
146 q_lower = max(q_min_cap, q_min_manual)
147 q_upper = min(q_max_cap, q_max_manual)
148 else: # mode is "P", "PQ", or ""
149 # Use apparent power limits
150 q_lower = max(-s_rated, q_min_manual)
151 q_upper = min(s_rated, q_max_manual)
153 m.q_gen[bus_id, phase].setlb(q_lower)
154 m.q_gen[bus_id, phase].setub(q_upper)
157def create_lindist_model(case: Case) -> pyo.ConcreteModel:
158 """
159 Factory function to create a Pyomo ConcreteModel for linear distribution system optimization.
161 Parameters
162 ----------
163 case : Case
164 Dataclass containing network data frames
166 Returns
167 -------
168 pyo.ConcreteModel
169 Configured Pyomo model with sets, variables, and parameters
170 """
171 # Load and validate data frames
172 branch = handle_branch_input(case.branch_data)
173 bus = handle_bus_input(case.bus_data)
174 gen = handle_gen_input(case.gen_data)
175 cap = handle_cap_input(case.cap_data)
176 reg = handle_reg_input(case.reg_data)
178 m = pyo.ConcreteModel()
179 _create_sets(m, bus, branch, gen, cap)
180 _create_parameters(m, branch)
181 _create_variables(m)
183 add_voltage_bounds(m, bus)
184 add_generator_bounds(m, gen)
186 return m