Coverage for src/distopf/pyomo_models/constraints.py: 79%

172 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1""" 

2Constraint functions for DistOPF Pyomo models. 

3 

4Each function takes a Pyomo ConcreteModel and data, and adds constraints to the model. 

5Functions are designed to work with models created by create_lindist_model(). 

6""" 

7 

8import pyomo.environ as pyo # ty: ignore 

9from distopf.pyomo_models.lindist import ControlVariable 

10from distopf.pyomo_models.protocol import LindistModelProtocol 

11from numpy import sqrt 

12 

13sqrt2 = sqrt(2) 

14sqrt3 = sqrt(3) 

15 

16 

17def add_p_flow_constraints(m: LindistModelProtocol) -> None: 

18 """ 

19 Add LinDistFlow power balance constraints. 

20 Active power: P_ij = sum(P_jk) + p_L - p_D 

21 """ 

22 

23 def p_balance_rule(m: LindistModelProtocol, _id, ph, t): 

24 load = m.p_load[_id, ph, t] 

25 generation = m.p_gen[_id, ph, t] if (_id, ph, t) in m.p_gen else 0 

26 p_bat = m.p_bat[_id, ph, t] if (_id, ph, t) in m.p_bat else 0 

27 incoming_flow = m.p_flow[_id, ph, t] 

28 outgoing_flows = sum( 

29 m.p_flow[to_bus, ph, t] 

30 for to_bus in m.to_bus_map[_id] 

31 if (to_bus, ph) in m.branch_phase_set 

32 ) 

33 return incoming_flow == outgoing_flows + load - generation - p_bat 

34 

35 m.power_balance_p = pyo.Constraint( 

36 m.branch_phase_set, m.time_set, rule=p_balance_rule 

37 ) 

38 

39 

40def add_q_flow_constraints(m: LindistModelProtocol) -> None: 

41 """ 

42 Add LinDistFlow power balance constraints. 

43 Reactive power: Q_ij = sum(Q_jk) + q_L - q_D - q_C 

44 """ 

45 

46 def q_balanced_rule(m: LindistModelProtocol, _id, ph, t): 

47 load = m.q_load[_id, ph, t] 

48 generation = m.q_gen[_id, ph, t] if (_id, ph, t) in m.q_gen else 0 

49 q_bat = m.q_bat[_id, ph, t] if (_id, ph, t) in m.q_bat else 0 

50 capacitor = m.q_cap[_id, ph, t] if (_id, ph, t) in m.q_cap else 0 

51 incoming_flow = m.q_flow[_id, ph, t] 

52 outgoing_flows = sum( 

53 m.q_flow[to_bus, ph, t] 

54 for to_bus in m.to_bus_map[_id] 

55 if (to_bus, ph) in m.branch_phase_set 

56 ) 

57 return incoming_flow == outgoing_flows + load - generation - capacitor - q_bat 

58 

59 m.power_balance_q = pyo.Constraint( 

60 m.branch_phase_set, m.time_set, rule=q_balanced_rule 

61 ) 

62 

63 

64def add_voltage_drop_constraints(m: LindistModelProtocol) -> None: 

65 """ 

66 Add voltage drop constraints. 

67 Excludes regulator branches so they can be handled by `add_regulator_constraints`. 

68 

69 v_j = v_i - sum_q 2*Re[S_ij^pq * (z_ij^pq)*] 

70 Simplified for LinDistFlow: v_j = v_i - 2*(r*P + x*Q) 

71 """ 

72 

73 def voltage_drop_rule(m: LindistModelProtocol, _id, ph, t): 

74 if (_id, ph, t) in m.v2_reg: 

75 return pyo.Constraint.Skip 

76 # here, "a" represents the current phase, 

77 # b an c represent next and previous phase 

78 a = ph 

79 _i = "abc".index(a) 

80 b = "abc"[(_i + 1) % 3] # next phase. If phase is "a" then "b" 

81 c = "abc"[(_i - 1) % 3] # prev phase. If phase is "a" then "c" 

82 aa = "".join(sorted(a + a)) 

83 

84 voltage_drop = ( 

85 2 * m.r[_id, aa] * m.p_flow[_id, ph, t] 

86 + 2 * m.x[_id, aa] * m.q_flow[_id, ph, t] 

87 ) 

88 if (_id, b) in m.branch_phase_set: 

89 ab = "".join(sorted(a + b)) 

90 voltage_drop += (-m.r[_id, ab] + sqrt3 * m.x[_id, ab]) * m.p_flow[_id, b, t] 

91 voltage_drop += (-m.x[_id, ab] - sqrt3 * m.r[_id, ab]) * m.q_flow[_id, b, t] 

92 if (_id, c) in m.branch_phase_set: 

93 ac = "".join(sorted(a + c)) 

94 voltage_drop += (-m.r[_id, ac] - sqrt3 * m.x[_id, ac]) * m.p_flow[_id, c, t] 

95 voltage_drop += (-m.x[_id, ac] + sqrt3 * m.r[_id, ac]) * m.q_flow[_id, c, t] 

96 

97 return m.v2[_id, ph, t] == m.v2[m.from_bus_map[_id], ph, t] - voltage_drop 

98 

99 m.voltage_drop = pyo.Constraint( 

100 m.branch_phase_set, m.time_set, rule=voltage_drop_rule 

101 ) 

102 

103 

104def add_regulator_constraints(m: LindistModelProtocol) -> None: 

105 """ 

106 vj = v_reg - 2r*pij - 2x*qij 

107 v_reg = vi*reg_ratio^2 

108 """ 

109 

110 def regulator_v_drop(m: LindistModelProtocol, _id, ph, t): 

111 raa = m.r[_id, ph + ph] 

112 xaa = m.x[_id, ph + ph] 

113 voltage_drop = 2 * raa * m.p_flow[_id, ph, t] + 2 * xaa * m.q_flow[_id, ph, t] 

114 return m.v2[_id, ph, t] == m.v2_reg[_id, ph, t] - voltage_drop 

115 

116 def regulator_rule(m: LindistModelProtocol, _id, ph, t): 

117 return ( 

118 m.v2_reg[_id, ph, t] 

119 == m.v2[m.from_bus_map[_id], ph, t] * m.reg_ratio[_id, ph] ** 2 

120 ) 

121 

122 m.regulator_voltage_drop = pyo.Constraint( 

123 m.reg_phase_set, m.time_set, rule=regulator_v_drop 

124 ) 

125 m.regulator_ratio = pyo.Constraint(m.reg_phase_set, m.time_set, rule=regulator_rule) 

126 

127 

128def add_cvr_load_constraints(m: LindistModelProtocol) -> None: 

129 """ 

130 Add voltage-dependent load constraints. 

131 p_L = p_0 + CVR_p * (p_0/2) * (v - 1) 

132 q_L = q_0 + CVR_q * (q_0/2) * (v - 1) 

133 """ 

134 

135 def cvr_p_rule(m: LindistModelProtocol, _id, ph, t): 

136 p_nom = m.p_load_nom[_id, ph, t] 

137 cvr_p = m.cvr_p[_id, ph] 

138 return m.p_load[_id, ph, t] == p_nom + cvr_p * p_nom / 2 * ( 

139 m.v2[_id, ph, t] - 1 

140 ) 

141 

142 def cvr_q_rule(m: LindistModelProtocol, _id, ph, t): 

143 q_nom = m.q_load_nom[_id, ph, t] 

144 cvr_q = m.cvr_q[_id, ph] 

145 return m.q_load[_id, ph, t] == q_nom + cvr_q * q_nom / 2 * ( 

146 m.v2[_id, ph, t] - 1 

147 ) 

148 

149 m.cvr_p_load = pyo.Constraint(m.bus_phase_set, m.time_set, rule=cvr_p_rule) 

150 m.cvr_q_load = pyo.Constraint(m.bus_phase_set, m.time_set, rule=cvr_q_rule) 

151 

152 

153def add_generator_constant_p_constraints(m: LindistModelProtocol) -> None: 

154 m.constant_p_gen = pyo.Constraint( 

155 m.gen_phase_set, 

156 m.time_set, 

157 rule=lambda m, _id, ph, t: m.p_gen[_id, ph, t] == m.p_gen_nom[_id, ph, t], 

158 ) 

159 

160 

161def add_generator_constant_q_constraints(m: LindistModelProtocol) -> None: 

162 m.constant_q_gen = pyo.Constraint( 

163 m.gen_phase_set, 

164 m.time_set, 

165 rule=lambda m, _id, ph, t: m.q_gen[_id, ph, t] == m.q_gen_nom[_id, ph, t], 

166 ) 

167 

168 

169def add_generator_constant_p_constraints_q_control(m: LindistModelProtocol) -> None: 

170 def _rule(m: LindistModelProtocol, _id, ph, t): 

171 if m.gen_control_type[_id, ph] in [ControlVariable.P, ControlVariable.PQ]: 

172 return pyo.Constraint.Skip 

173 return m.p_gen[_id, ph, t] == m.p_gen_nom[_id, ph, t] 

174 

175 m.constant_p_gen = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_rule) 

176 

177 

178def add_generator_constant_q_constraints_p_control(m: LindistModelProtocol) -> None: 

179 def _rule(m: LindistModelProtocol, _id, ph, t): 

180 if m.gen_control_type[_id, ph] in [ControlVariable.Q, ControlVariable.PQ]: 

181 return pyo.Constraint.Skip 

182 return m.q_gen[_id, ph, t] == m.q_gen_nom[_id, ph, t] 

183 

184 m.constant_q_gen = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_rule) 

185 

186 

187def add_octagonal_inverter_constraints_pq_control(m: LindistModelProtocol) -> None: 

188 """ 

189 Add octagonal inverter constraints (equation 2.14). 

190 

191 Linear approximation of circular curve using 8 constraints. 

192 Only applied to generators with control_variable=="PQ". 

193 

194 c = sqrt(2) - 1 

195 c * p_gen + 1 * q_gen <= s_rated 

196 1 * p_gen + c * q_gen <= s_rated 

197 1 * p_gen - c * q_gen <= s_rated 

198 c * p_gen - 1 * q_gen <= s_rated 

199 """ 

200 c = sqrt2 - 1 # ≈ 0.4142 

201 

202 # If the P-Q Plane was on a clock: 

203 # Line from 12:00 to 1:30. Or 90 to 45 deg. 

204 def _1(m: LindistModelProtocol, _id, ph, t): 

205 if m.gen_control_type[_id, ph] != ControlVariable.PQ: 

206 return pyo.Constraint.Skip 

207 return c * m.p_gen[_id, ph, t] + 1 * m.q_gen[_id, ph, t] <= m.s_rated[_id, ph] 

208 

209 # Line from 1:30 to 3:00 on a clock. Or 45 to 0 deg. 

210 def _2(m: LindistModelProtocol, _id, ph, t): 

211 if m.gen_control_type[_id, ph] != ControlVariable.PQ: 

212 return pyo.Constraint.Skip 

213 return 1 * m.p_gen[_id, ph, t] + c * m.q_gen[_id, ph, t] <= m.s_rated[_id, ph] 

214 

215 # Line from 3:00 to 4:30 on a clock. Or 0 to -45 deg. 

216 def _3(m: LindistModelProtocol, _id, ph, t): 

217 if m.gen_control_type[_id, ph] != ControlVariable.PQ: 

218 return pyo.Constraint.Skip 

219 return 1 * m.p_gen[_id, ph, t] - c * m.q_gen[_id, ph, t] <= m.s_rated[_id, ph] 

220 

221 # Line from 4:30 to 6:00 on a clock. Or -45 to -90 deg. 

222 def _4(m: LindistModelProtocol, _id, ph, t): 

223 if m.gen_control_type[_id, ph] != ControlVariable.PQ: 

224 return pyo.Constraint.Skip 

225 return c * m.p_gen[_id, ph, t] - 1 * m.q_gen[_id, ph, t] <= m.s_rated[_id, ph] 

226 

227 # Add all octagonal constraints 

228 m.gen_octagon_1 = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_1) 

229 m.gen_octagon_2 = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_2) 

230 m.gen_octagon_3 = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_3) 

231 m.gen_octagon_4 = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_4) 

232 

233 

234def add_circular_generator_constraints_pq_control(m: LindistModelProtocol) -> None: 

235 """ 

236 Add circular generator constraints. 

237 

238 Uses the exact circular constraint: p_gen² + q_gen² ≤ s_rated² 

239 Only applied to generators with control_variable=="PQ". 

240 """ 

241 

242 def _circle(m: LindistModelProtocol, _id, ph, t): 

243 if m.gen_control_type[_id, ph] != ControlVariable.PQ.value: 

244 return pyo.Constraint.Skip 

245 return ( 

246 m.p_gen[_id, ph, t] ** 2 + m.q_gen[_id, ph, t] ** 2 

247 <= m.s_rated[_id, ph] ** 2 

248 ) 

249 

250 m.gen_circle_constraint = pyo.Constraint(m.gen_phase_set, m.time_set, rule=_circle) 

251 

252 

253def add_capacitor_constraints(m: LindistModelProtocol) -> None: 

254 """ 

255 Add capacitor constraints. 

256 q_C = q_rated * v^2 

257 """ 

258 

259 def capacitor_rule(m: LindistModelProtocol, _id, ph, t): 

260 return m.q_cap[_id, ph, t] == m.q_cap_nom[_id, ph] * m.v2[_id, ph, t] 

261 

262 m.capacitor_injection = pyo.Constraint( 

263 m.cap_phase_set, m.time_set, rule=capacitor_rule 

264 ) 

265 

266 

267def add_swing_bus_constraints(m: LindistModelProtocol) -> None: 

268 """ 

269 Add swing bus voltage constraints. 

270 

271 Sets voltage at swing bus to specified values. 

272 """ 

273 

274 def swing_voltage_rule(m: LindistModelProtocol, _id, ph, t): 

275 """Fix swing bus voltages""" 

276 if _id not in m.swing_bus_set: 

277 return pyo.Constraint.Skip 

278 return m.v2[_id, ph, t] == m.v_swing[_id, ph, t] ** 2 

279 

280 m.swing_voltage = pyo.Constraint( 

281 m.swing_phase_set, m.time_set, rule=swing_voltage_rule 

282 ) 

283 

284 

285def add_voltage_limits(m: LindistModelProtocol) -> None: 

286 """Add voltage bounds (for voltage magnitude squared)""" 

287 

288 def voltage_limits(m: LindistModelProtocol, _id, ph, t): 

289 return (m.v_min[_id, ph] ** 2, m.v2[_id, ph, t], m.v_max[_id, ph] ** 2) 

290 

291 m.voltage_limits = pyo.Constraint(m.bus_phase_set, m.time_set, rule=voltage_limits) 

292 

293 

294def add_generator_limits(m: LindistModelProtocol) -> None: 

295 """Add generator bounds following the original base.py logic""" 

296 

297 def p_gen_bounds(m: LindistModelProtocol, _id, ph, t): 

298 return ( 

299 0, 

300 m.p_gen[_id, ph, t], 

301 min(m.p_gen_nom[_id, ph, t], m.s_rated[_id, ph]), 

302 ) 

303 

304 def q_gen_bounds(m: LindistModelProtocol, _id, ph, t): 

305 if m.gen_control_type[_id, ph] == ControlVariable.Q: 

306 q_max = sqrt(max(0, m.s_rated[_id, ph] ** 2 - m.p_gen_nom[_id, ph, t] ** 2)) 

307 return ( 

308 max(-q_max, m.q_gen_min[_id, ph]), 

309 m.q_gen[_id, ph, t], 

310 min(q_max, m.q_gen_max[_id, ph]), 

311 ) 

312 return ( 

313 max(-m.s_rated[_id, ph], m.q_gen_min[_id, ph]), 

314 m.q_gen[_id, ph, t], 

315 min(m.s_rated[_id, ph], m.q_gen_max[_id, ph]), 

316 ) 

317 

318 m.p_gen_limits = pyo.Constraint(m.gen_phase_set, m.time_set, rule=p_gen_bounds) 

319 m.q_gen_limits = pyo.Constraint(m.gen_phase_set, m.time_set, rule=q_gen_bounds) 

320 

321 

322# ============ Battery Constraints ===================================================== 

323# ====================================================================================== 

324 

325 

326def add_battery_power_limits(m: LindistModelProtocol) -> None: 

327 def _d(m: LindistModelProtocol, _id, ph, t): 

328 return (0, m.p_discharge[_id, t], m.s_bat_rated[_id, ph]) 

329 

330 def _c(m: LindistModelProtocol, _id, ph, t): 

331 return (0, m.p_charge[_id, t], m.s_bat_rated[_id, ph]) 

332 

333 m.battery_discharging_limits = pyo.Constraint(m.bat_phase_set, m.time_set, rule=_d) 

334 m.battery_charging_limits = pyo.Constraint(m.bat_phase_set, m.time_set, rule=_c) 

335 

336 

337def add_battery_soc_limits(m: LindistModelProtocol) -> None: 

338 def battery_soc_limits(m: LindistModelProtocol, _id, t): 

339 return (m.soc_min[_id], m.soc[_id, t], m.soc_max[_id]) 

340 

341 m.battery_soc_limits = pyo.Constraint( 

342 m.bat_set, m.time_set, rule=battery_soc_limits 

343 ) 

344 

345 

346def add_battery_net_p_bat_constraints(m: LindistModelProtocol) -> None: 

347 def net_discharge(m: LindistModelProtocol, _id, t): 

348 p_bat_a = m.p_bat[_id, "a", t] if m.battery_has_phase[_id, "a"] else 0 

349 p_bat_b = m.p_bat[_id, "b", t] if m.battery_has_phase[_id, "b"] else 0 

350 p_bat_c = m.p_bat[_id, "c", t] if m.battery_has_phase[_id, "c"] else 0 

351 return p_bat_a + p_bat_b + p_bat_c == m.p_discharge[_id, t] - m.p_charge[_id, t] 

352 

353 m.net_discharge = pyo.Constraint(m.bat_phase_set, m.time_set, rule=net_discharge) 

354 

355 

356def add_battery_net_p_bat_equal_phase_constraints(m: LindistModelProtocol) -> None: 

357 def net_discharge_equal_phases(m: LindistModelProtocol, _id, ph, t): 

358 n_phases = m.battery_n_phases[_id] 

359 return ( 

360 m.p_bat[_id, ph, t] 

361 == (m.p_discharge[_id, t] - m.p_charge[_id, t]) / n_phases 

362 ) 

363 

364 m.net_discharge = pyo.Constraint( 

365 m.bat_phase_set, m.time_set, rule=net_discharge_equal_phases 

366 ) 

367 

368 

369def add_battery_energy_constraints(m: LindistModelProtocol) -> None: 

370 def storage(m: LindistModelProtocol, _id, t): 

371 eta_d = m.discharge_efficiency[_id] 

372 eta_c = m.charge_efficiency[_id] 

373 if t == m.start_step: 

374 soc0 = m.start_soc[_id] 

375 else: 

376 soc0 = m.soc[_id, t - 1] 

377 return ( 

378 m.soc[_id, t] - soc0 

379 == eta_c * m.delta_t * m.p_charge[_id, t] 

380 - (1 / eta_d) * m.delta_t * m.p_discharge[_id, t] 

381 ) 

382 

383 m.storage = pyo.Constraint(m.bat_set, m.time_set, rule=storage) 

384 

385 

386# def add_battery_phase_equality_constraints(m: LindistModelProtocol) -> None: 

387# def _p_equal(m: LindistModelProtocol, _id, ph, t): 

388# a = ph 

389# _i = "abc".index(a) 

390# b = "abc"[(_i + 1) % 3] # next phase. If phase is "a" then "b" 

391# if (_id, b) not in m.bat_phase_set: 

392# pyo.Constraint.Skip 

393# return m.p_bat[_id, a, t] == m.p_bat[_id, b, t] 

394 

395# def _q_equal(m: LindistModelProtocol, _id, ph, t): 

396# a = ph 

397# _i = "abc".index(a) 

398# b = "abc"[(_i + 1) % 3] # next phase. If phase is "a" then "b" 

399# if (_id, b) not in m.bat_phase_set: 

400# pyo.Constraint.Skip 

401# return m.q_bat[_id, a, t] == m.q_bat[_id, b, t] 

402 

403 

404def add_battery_constant_q_constraints_p_control(m: LindistModelProtocol) -> None: 

405 def _rule(m: LindistModelProtocol, _id, ph, t): 

406 if m.bat_control_type[_id] != ControlVariable.P: 

407 return pyo.Constraint.Skip 

408 return m.q_bat[_id, ph, t] == m.q_bat_nom[_id, ph, t] 

409 

410 m.battery_constant_q_bat = pyo.Constraint(m.bat_phase_set, m.time_set, rule=_rule)