Coverage for src/distopf/pyomo_models/constraints.py: 79%
172 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
1"""
2Constraint functions for DistOPF Pyomo models.
4Each function takes a Pyomo ConcreteModel and data, and adds constraints to the model.
5Functions are designed to work with models created by create_lindist_model().
6"""
8import pyomo.environ as pyo # ty: ignore
9from distopf.pyomo_models.lindist import ControlVariable
10from distopf.pyomo_models.protocol import LindistModelProtocol
11from numpy import sqrt
13sqrt2 = sqrt(2)
14sqrt3 = sqrt(3)
17def add_p_flow_constraints(m: LindistModelProtocol) -> None:
18 """
19 Add LinDistFlow power balance constraints.
20 Active power: P_ij = sum(P_jk) + p_L - p_D
21 """
23 def p_balance_rule(m: LindistModelProtocol, _id, ph, t):
24 load = m.p_load[_id, ph, t]
25 generation = m.p_gen[_id, ph, t] if (_id, ph, t) in m.p_gen else 0
26 p_bat = m.p_bat[_id, ph, t] if (_id, ph, t) in m.p_bat else 0
27 incoming_flow = m.p_flow[_id, ph, t]
28 outgoing_flows = sum(
29 m.p_flow[to_bus, ph, t]
30 for to_bus in m.to_bus_map[_id]
31 if (to_bus, ph) in m.branch_phase_set
32 )
33 return incoming_flow == outgoing_flows + load - generation - p_bat
35 m.power_balance_p = pyo.Constraint(
36 m.branch_phase_set, m.time_set, rule=p_balance_rule
37 )
40def add_q_flow_constraints(m: LindistModelProtocol) -> None:
41 """
42 Add LinDistFlow power balance constraints.
43 Reactive power: Q_ij = sum(Q_jk) + q_L - q_D - q_C
44 """
46 def q_balanced_rule(m: LindistModelProtocol, _id, ph, t):
47 load = m.q_load[_id, ph, t]
48 generation = m.q_gen[_id, ph, t] if (_id, ph, t) in m.q_gen else 0
49 q_bat = m.q_bat[_id, ph, t] if (_id, ph, t) in m.q_bat else 0
50 capacitor = m.q_cap[_id, ph, t] if (_id, ph, t) in m.q_cap else 0
51 incoming_flow = m.q_flow[_id, ph, t]
52 outgoing_flows = sum(
53 m.q_flow[to_bus, ph, t]
54 for to_bus in m.to_bus_map[_id]
55 if (to_bus, ph) in m.branch_phase_set
56 )
57 return incoming_flow == outgoing_flows + load - generation - capacitor - q_bat
59 m.power_balance_q = pyo.Constraint(
60 m.branch_phase_set, m.time_set, rule=q_balanced_rule
61 )
64def add_voltage_drop_constraints(m: LindistModelProtocol) -> None:
65 """
66 Add voltage drop constraints.
67 Excludes regulator branches so they can be handled by `add_regulator_constraints`.
69 v_j = v_i - sum_q 2*Re[S_ij^pq * (z_ij^pq)*]
70 Simplified for LinDistFlow: v_j = v_i - 2*(r*P + x*Q)
71 """
73 def voltage_drop_rule(m: LindistModelProtocol, _id, ph, t):
74 if (_id, ph, t) in m.v2_reg:
75 return pyo.Constraint.Skip
76 # here, "a" represents the current phase,
77 # b an c represent next and previous phase
78 a = ph
79 _i = "abc".index(a)
80 b = "abc"[(_i + 1) % 3] # next phase. If phase is "a" then "b"
81 c = "abc"[(_i - 1) % 3] # prev phase. If phase is "a" then "c"
82 aa = "".join(sorted(a + a))
84 voltage_drop = (
85 2 * m.r[_id, aa] * m.p_flow[_id, ph, t]
86 + 2 * m.x[_id, aa] * m.q_flow[_id, ph, t]
87 )
88 if (_id, b) in m.branch_phase_set:
89 ab = "".join(sorted(a + b))
90 voltage_drop += (-m.r[_id, ab] + sqrt3 * m.x[_id, ab]) * m.p_flow[_id, b, t]
91 voltage_drop += (-m.x[_id, ab] - sqrt3 * m.r[_id, ab]) * m.q_flow[_id, b, t]
92 if (_id, c) in m.branch_phase_set:
93 ac = "".join(sorted(a + c))
94 voltage_drop += (-m.r[_id, ac] - sqrt3 * m.x[_id, ac]) * m.p_flow[_id, c, t]
95 voltage_drop += (-m.x[_id, ac] + sqrt3 * m.r[_id, ac]) * m.q_flow[_id, c, t]
97 return m.v2[_id, ph, t] == m.v2[m.from_bus_map[_id], ph, t] - voltage_drop
99 m.voltage_drop = pyo.Constraint(
100 m.branch_phase_set, m.time_set, rule=voltage_drop_rule
101 )
104def add_regulator_constraints(m: LindistModelProtocol) -> None:
105 """
106 vj = v_reg - 2r*pij - 2x*qij
107 v_reg = vi*reg_ratio^2
108 """
110 def regulator_v_drop(m: LindistModelProtocol, _id, ph, t):
111 raa = m.r[_id, ph + ph]
112 xaa = m.x[_id, ph + ph]
113 voltage_drop = 2 * raa * m.p_flow[_id, ph, t] + 2 * xaa * m.q_flow[_id, ph, t]
114 return m.v2[_id, ph, t] == m.v2_reg[_id, ph, t] - voltage_drop
116 def regulator_rule(m: LindistModelProtocol, _id, ph, t):
117 return (
118 m.v2_reg[_id, ph, t]
119 == m.v2[m.from_bus_map[_id], ph, t] * m.reg_ratio[_id, ph] ** 2
120 )
122 m.regulator_voltage_drop = pyo.Constraint(
123 m.reg_phase_set, m.time_set, rule=regulator_v_drop
124 )
125 m.regulator_ratio = pyo.Constraint(m.reg_phase_set, m.time_set, rule=regulator_rule)
128def add_cvr_load_constraints(m: LindistModelProtocol) -> None:
129 """
130 Add voltage-dependent load constraints.
131 p_L = p_0 + CVR_p * (p_0/2) * (v - 1)
132 q_L = q_0 + CVR_q * (q_0/2) * (v - 1)
133 """
135 def cvr_p_rule(m: LindistModelProtocol, _id, ph, t):
136 p_nom = m.p_load_nom[_id, ph, t]
137 cvr_p = m.cvr_p[_id, ph]
138 return m.p_load[_id, ph, t] == p_nom + cvr_p * p_nom / 2 * (
139 m.v2[_id, ph, t] - 1
140 )
142 def cvr_q_rule(m: LindistModelProtocol, _id, ph, t):
143 q_nom = m.q_load_nom[_id, ph, t]
144 cvr_q = m.cvr_q[_id, ph]
145 return m.q_load[_id, ph, t] == q_nom + cvr_q * q_nom / 2 * (
146 m.v2[_id, ph, t] - 1
147 )
149 m.cvr_p_load = pyo.Constraint(m.bus_phase_set, m.time_set, rule=cvr_p_rule)
150 m.cvr_q_load = pyo.Constraint(m.bus_phase_set, m.time_set, rule=cvr_q_rule)
153def add_generator_constant_p_constraints(m: LindistModelProtocol) -> None:
154 m.constant_p_gen = pyo.Constraint(
155 m.gen_phase_set,
156 m.time_set,
157 rule=lambda m, _id, ph, t: m.p_gen[_id, ph, t] == m.p_gen_nom[_id, ph, t],
158 )
161def add_generator_constant_q_constraints(m: LindistModelProtocol) -> None:
162 m.constant_q_gen = pyo.Constraint(
163 m.gen_phase_set,
164 m.time_set,
165 rule=lambda m, _id, ph, t: m.q_gen[_id, ph, t] == m.q_gen_nom[_id, ph, t],
166 )
169def add_generator_constant_p_constraints_q_control(m: LindistModelProtocol) -> None:
170 def _rule(m: LindistModelProtocol, _id, ph, t):
171 if m.gen_control_type[_id, ph] in [ControlVariable.P, ControlVariable.PQ]:
172 return pyo.Constraint.Skip
173 return m.p_gen[_id, ph, t] == m.p_gen_nom[_id, ph, t]
175 m.constant_p_gen = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_rule)
178def add_generator_constant_q_constraints_p_control(m: LindistModelProtocol) -> None:
179 def _rule(m: LindistModelProtocol, _id, ph, t):
180 if m.gen_control_type[_id, ph] in [ControlVariable.Q, ControlVariable.PQ]:
181 return pyo.Constraint.Skip
182 return m.q_gen[_id, ph, t] == m.q_gen_nom[_id, ph, t]
184 m.constant_q_gen = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_rule)
187def add_octagonal_inverter_constraints_pq_control(m: LindistModelProtocol) -> None:
188 """
189 Add octagonal inverter constraints (equation 2.14).
191 Linear approximation of circular curve using 8 constraints.
192 Only applied to generators with control_variable=="PQ".
194 c = sqrt(2) - 1
195 c * p_gen + 1 * q_gen <= s_rated
196 1 * p_gen + c * q_gen <= s_rated
197 1 * p_gen - c * q_gen <= s_rated
198 c * p_gen - 1 * q_gen <= s_rated
199 """
200 c = sqrt2 - 1 # ≈ 0.4142
202 # If the P-Q Plane was on a clock:
203 # Line from 12:00 to 1:30. Or 90 to 45 deg.
204 def _1(m: LindistModelProtocol, _id, ph, t):
205 if m.gen_control_type[_id, ph] != ControlVariable.PQ:
206 return pyo.Constraint.Skip
207 return c * m.p_gen[_id, ph, t] + 1 * m.q_gen[_id, ph, t] <= m.s_rated[_id, ph]
209 # Line from 1:30 to 3:00 on a clock. Or 45 to 0 deg.
210 def _2(m: LindistModelProtocol, _id, ph, t):
211 if m.gen_control_type[_id, ph] != ControlVariable.PQ:
212 return pyo.Constraint.Skip
213 return 1 * m.p_gen[_id, ph, t] + c * m.q_gen[_id, ph, t] <= m.s_rated[_id, ph]
215 # Line from 3:00 to 4:30 on a clock. Or 0 to -45 deg.
216 def _3(m: LindistModelProtocol, _id, ph, t):
217 if m.gen_control_type[_id, ph] != ControlVariable.PQ:
218 return pyo.Constraint.Skip
219 return 1 * m.p_gen[_id, ph, t] - c * m.q_gen[_id, ph, t] <= m.s_rated[_id, ph]
221 # Line from 4:30 to 6:00 on a clock. Or -45 to -90 deg.
222 def _4(m: LindistModelProtocol, _id, ph, t):
223 if m.gen_control_type[_id, ph] != ControlVariable.PQ:
224 return pyo.Constraint.Skip
225 return c * m.p_gen[_id, ph, t] - 1 * m.q_gen[_id, ph, t] <= m.s_rated[_id, ph]
227 # Add all octagonal constraints
228 m.gen_octagon_1 = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_1)
229 m.gen_octagon_2 = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_2)
230 m.gen_octagon_3 = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_3)
231 m.gen_octagon_4 = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_4)
234def add_circular_generator_constraints_pq_control(m: LindistModelProtocol) -> None:
235 """
236 Add circular generator constraints.
238 Uses the exact circular constraint: p_gen² + q_gen² ≤ s_rated²
239 Only applied to generators with control_variable=="PQ".
240 """
242 def _circle(m: LindistModelProtocol, _id, ph, t):
243 if m.gen_control_type[_id, ph] != ControlVariable.PQ.value:
244 return pyo.Constraint.Skip
245 return (
246 m.p_gen[_id, ph, t] ** 2 + m.q_gen[_id, ph, t] ** 2
247 <= m.s_rated[_id, ph] ** 2
248 )
250 m.gen_circle_constraint = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_circle)
253def add_capacitor_constraints(m: LindistModelProtocol) -> None:
254 """
255 Add capacitor constraints.
256 q_C = q_rated * v^2
257 """
259 def capacitor_rule(m: LindistModelProtocol, _id, ph, t):
260 return m.q_cap[_id, ph, t] == m.q_cap_nom[_id, ph] * m.v2[_id, ph, t]
262 m.capacitor_injection = pyo.Constraint(
263 m.cap_phase_set, m.time_set, rule=capacitor_rule
264 )
267def add_swing_bus_constraints(m: LindistModelProtocol) -> None:
268 """
269 Add swing bus voltage constraints.
271 Sets voltage at swing bus to specified values.
272 """
274 def swing_voltage_rule(m: LindistModelProtocol, _id, ph, t):
275 """Fix swing bus voltages"""
276 if _id not in m.swing_bus_set:
277 return pyo.Constraint.Skip
278 return m.v2[_id, ph, t] == m.v_swing[_id, ph, t] ** 2
280 m.swing_voltage = pyo.Constraint(
281 m.swing_phase_set, m.time_set, rule=swing_voltage_rule
282 )
285def add_voltage_limits(m: LindistModelProtocol) -> None:
286 """Add voltage bounds (for voltage magnitude squared)"""
288 def voltage_limits(m: LindistModelProtocol, _id, ph, t):
289 return (m.v_min[_id, ph] ** 2, m.v2[_id, ph, t], m.v_max[_id, ph] ** 2)
291 m.voltage_limits = pyo.Constraint(m.bus_phase_set, m.time_set, rule=voltage_limits)
294def add_generator_limits(m: LindistModelProtocol) -> None:
295 """Add generator bounds following the original base.py logic"""
297 def p_gen_bounds(m: LindistModelProtocol, _id, ph, t):
298 return (
299 0,
300 m.p_gen[_id, ph, t],
301 min(m.p_gen_nom[_id, ph, t], m.s_rated[_id, ph]),
302 )
304 def q_gen_bounds(m: LindistModelProtocol, _id, ph, t):
305 if m.gen_control_type[_id, ph] == ControlVariable.Q:
306 q_max = sqrt(max(0, m.s_rated[_id, ph] ** 2 - m.p_gen_nom[_id, ph, t] ** 2))
307 return (
308 max(-q_max, m.q_gen_min[_id, ph]),
309 m.q_gen[_id, ph, t],
310 min(q_max, m.q_gen_max[_id, ph]),
311 )
312 return (
313 max(-m.s_rated[_id, ph], m.q_gen_min[_id, ph]),
314 m.q_gen[_id, ph, t],
315 min(m.s_rated[_id, ph], m.q_gen_max[_id, ph]),
316 )
318 m.p_gen_limits = pyo.Constraint(m.gen_phase_set, m.time_set, rule=p_gen_bounds)
319 m.q_gen_limits = pyo.Constraint(m.gen_phase_set, m.time_set, rule=q_gen_bounds)
322# ============ Battery Constraints =====================================================
323# ======================================================================================
326def add_battery_power_limits(m: LindistModelProtocol) -> None:
327 def _d(m: LindistModelProtocol, _id, ph, t):
328 return (0, m.p_discharge[_id, t], m.s_bat_rated[_id, ph])
330 def _c(m: LindistModelProtocol, _id, ph, t):
331 return (0, m.p_charge[_id, t], m.s_bat_rated[_id, ph])
333 m.battery_discharging_limits = pyo.Constraint(m.bat_phase_set, m.time_set, rule=_d)
334 m.battery_charging_limits = pyo.Constraint(m.bat_phase_set, m.time_set, rule=_c)
337def add_battery_soc_limits(m: LindistModelProtocol) -> None:
338 def battery_soc_limits(m: LindistModelProtocol, _id, t):
339 return (m.soc_min[_id], m.soc[_id, t], m.soc_max[_id])
341 m.battery_soc_limits = pyo.Constraint(
342 m.bat_set, m.time_set, rule=battery_soc_limits
343 )
346def add_battery_net_p_bat_constraints(m: LindistModelProtocol) -> None:
347 def net_discharge(m: LindistModelProtocol, _id, t):
348 p_bat_a = m.p_bat[_id, "a", t] if m.battery_has_phase[_id, "a"] else 0
349 p_bat_b = m.p_bat[_id, "b", t] if m.battery_has_phase[_id, "b"] else 0
350 p_bat_c = m.p_bat[_id, "c", t] if m.battery_has_phase[_id, "c"] else 0
351 return p_bat_a + p_bat_b + p_bat_c == m.p_discharge[_id, t] - m.p_charge[_id, t]
353 m.net_discharge = pyo.Constraint(m.bat_phase_set, m.time_set, rule=net_discharge)
356def add_battery_net_p_bat_equal_phase_constraints(m: LindistModelProtocol) -> None:
357 def net_discharge_equal_phases(m: LindistModelProtocol, _id, ph, t):
358 n_phases = m.battery_n_phases[_id]
359 return (
360 m.p_bat[_id, ph, t]
361 == (m.p_discharge[_id, t] - m.p_charge[_id, t]) / n_phases
362 )
364 m.net_discharge = pyo.Constraint(
365 m.bat_phase_set, m.time_set, rule=net_discharge_equal_phases
366 )
369def add_battery_energy_constraints(m: LindistModelProtocol) -> None:
370 def storage(m: LindistModelProtocol, _id, t):
371 eta_d = m.discharge_efficiency[_id]
372 eta_c = m.charge_efficiency[_id]
373 if t == m.start_step:
374 soc0 = m.start_soc[_id]
375 else:
376 soc0 = m.soc[_id, t - 1]
377 return (
378 m.soc[_id, t] - soc0
379 == eta_c * m.delta_t * m.p_charge[_id, t]
380 - (1 / eta_d) * m.delta_t * m.p_discharge[_id, t]
381 )
383 m.storage = pyo.Constraint(m.bat_set, m.time_set, rule=storage)
386# def add_battery_phase_equality_constraints(m: LindistModelProtocol) -> None:
387# def _p_equal(m: LindistModelProtocol, _id, ph, t):
388# a = ph
389# _i = "abc".index(a)
390# b = "abc"[(_i + 1) % 3] # next phase. If phase is "a" then "b"
391# if (_id, b) not in m.bat_phase_set:
392# pyo.Constraint.Skip
393# return m.p_bat[_id, a, t] == m.p_bat[_id, b, t]
395# def _q_equal(m: LindistModelProtocol, _id, ph, t):
396# a = ph
397# _i = "abc".index(a)
398# b = "abc"[(_i + 1) % 3] # next phase. If phase is "a" then "b"
399# if (_id, b) not in m.bat_phase_set:
400# pyo.Constraint.Skip
401# return m.q_bat[_id, a, t] == m.q_bat[_id, b, t]
404def add_battery_constant_q_constraints_p_control(m: LindistModelProtocol) -> None:
405 def _rule(m: LindistModelProtocol, _id, ph, t):
406 if m.bat_control_type[_id] != ControlVariable.P:
407 return pyo.Constraint.Skip
408 return m.q_bat[_id, ph, t] == m.q_bat_nom[_id, ph, t]
410 m.battery_constant_q_bat = pyo.Constraint(m.bat_phase_set, m.time_set, rule=_rule)