Coverage for src/distopf/pyomo_models/lindist.py: 86%
223 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
1from enum import IntEnum
2import pyomo.environ as pyo
3from typing import Tuple, List
4import pandas as pd
5from distopf.importer import Case
6from distopf.pyomo_models.protocol import LindistModelProtocol
9class ControlVariable(IntEnum):
10 NONE = 0
11 Q = 1
12 P = 2
13 PQ = 3
16CONTROL_VARIABLE_MAP = {"": 0, "Q": 1, "P": 2, "PQ": 3}
19def _create_phase_tuples(df: pd.DataFrame, id_col: str = "id") -> List[Tuple[str, str]]:
20 """Create (id, phase) tuples from dataframe with phases column"""
21 result = []
22 for _, row in df.iterrows():
23 result.extend([(int(row[id_col]), str(phase)) for phase in row.phases])
24 return result
27def _create_sets(m: pyo.ConcreteModel, case: Case) -> None:
28 """Create all Pyomo sets"""
29 m.time_set = pyo.RangeSet(case.start_step, case.start_step + case.n_steps - 1)
30 m.bus_set = pyo.Set(initialize=case.bus_data.id.tolist())
31 m.swing_bus_set = pyo.Set(
32 initialize=case.bus_data[case.bus_data.bus_type == "SWING"].id.tolist()
33 )
34 m.swing_phase_set = pyo.Set(
35 initialize=_create_phase_tuples(
36 case.bus_data[case.bus_data.bus_type == "SWING"]
37 ),
38 dimen=2,
39 )
40 m.branch_set = pyo.Set(
41 initialize=case.bus_data[case.bus_data.bus_type != "SWING"].id.tolist()
42 )
43 m.phase_pair_set = pyo.Set(initialize=["aa", "ab", "ac", "bb", "bc", "cc"])
44 m.bus_phase_set = pyo.Set(initialize=_create_phase_tuples(case.bus_data), dimen=2)
45 m.branch_phase_set = pyo.Set(
46 initialize=_create_phase_tuples(case.branch_data, "tb"), dimen=2
47 )
48 m.gen_phase_set = pyo.Set(initialize=_create_phase_tuples(case.gen_data), dimen=2)
49 m.cap_phase_set = pyo.Set(initialize=_create_phase_tuples(case.cap_data), dimen=2)
50 m.reg_phase_set = pyo.Set(
51 initialize=_create_phase_tuples(case.reg_data, "tb"), dimen=2
52 )
53 m.bat_phase_set = pyo.Set(
54 initialize=_create_phase_tuples(case.bat_data, "id"), dimen=2
55 )
56 m.bat_set = pyo.Set(initialize=case.bat_data.id.tolist())
59def _create_rx_parameters(m: pyo.ConcreteModel, case: Case) -> None:
60 """Create resistance and reactance parameters"""
61 r_data, x_data = {}, {}
63 for _, row in case.branch_data.iterrows():
64 for phase_pair in m.phase_pair_set:
65 r_col, x_col = f"r{phase_pair}", f"x{phase_pair}"
67 if r_col in case.branch_data.columns and x_col in case.branch_data.columns:
68 r_data[(row.tb, phase_pair)] = row[r_col]
69 x_data[(row.tb, phase_pair)] = row[x_col]
71 m.r = pyo.Param(m.branch_set, m.phase_pair_set, initialize=r_data, default=0.0)
72 m.x = pyo.Param(m.branch_set, m.phase_pair_set, initialize=x_data, default=0.0)
75def _create_load_parameters(m: pyo.ConcreteModel, case: Case) -> None:
76 load_p_data, load_q_data = {}, {}
77 cvr_p_data, cvr_q_data = {}, {}
79 for _, row in case.bus_data.iterrows():
80 for phase in row.phases:
81 if (row.id, phase) not in m.bus_phase_set:
82 continue
84 # CVR factors for voltage-dependent loads
85 cvr_p = getattr(row, "cvr_p", 0.0)
86 cvr_q = getattr(row, "cvr_q", 0.0)
87 cvr_p_data[(row.id, phase)] = cvr_p
88 cvr_q_data[(row.id, phase)] = cvr_q
89 # Active and reactive loads
90 p_load = getattr(row, f"pl_{phase}", 0.0)
91 q_load = getattr(row, f"ql_{phase}", 0.0)
92 load_mult_p = load_mult_q = 1.0
93 load_shape = getattr(row, "load_shape", "default")
94 for t in m.time_set:
95 if load_shape in case.schedules.columns:
96 load_mult_p = load_mult_q = case.schedules.at[t, load_shape]
97 elif f"{load_shape}.{phase}.p" in case.schedules.columns:
98 load_mult_p = case.schedules.at[t, f"{load_shape}.{phase}.p"]
99 load_mult_q = case.schedules.at[t, f"{load_shape}.{phase}.q"]
100 load_p_data[(row.id, phase, t)] = p_load * load_mult_p
101 load_q_data[(row.id, phase, t)] = q_load * load_mult_q
103 m.p_load_nom = pyo.Param(
104 m.bus_phase_set,
105 m.time_set,
106 initialize=load_p_data,
107 default=0.0,
108 doc="Nominal active power load at 1.0 p.u. voltage",
109 )
110 m.q_load_nom = pyo.Param(
111 m.bus_phase_set,
112 m.time_set,
113 initialize=load_q_data,
114 default=0.0,
115 doc="Nominal reactive power load at 1.0 p.u. voltage",
116 )
117 m.cvr_p = pyo.Param(
118 m.bus_phase_set,
119 initialize=cvr_p_data,
120 default=0.0,
121 doc="CVR factor for active power loads",
122 )
123 m.cvr_q = pyo.Param(
124 m.bus_phase_set,
125 initialize=cvr_q_data,
126 default=0.0,
127 doc="CVR factor for reactive power loads",
128 )
131def _create_generator_parameters(m: pyo.ConcreteModel, case: Case) -> None:
132 p_gen_data, q_gen_data = {}, {}
133 s_rated_data, q_gen_min_data, q_gen_max_data = {}, {}, {}
134 gen_control_data = {}
136 for _, row in case.gen_data.iterrows():
137 for phase in row.phases:
138 if (row.id, phase) not in m.gen_phase_set:
139 continue
141 # Generation limits
142 s_rated = getattr(row, f"s{phase}_max", 1000.0)
143 q_max = getattr(row, f"q{phase}_max", s_rated)
144 q_min = getattr(row, f"q{phase}_min", -s_rated)
145 s_rated_data[(row.id, phase)] = s_rated
146 q_gen_min_data[(row.id, phase)] = q_min
147 q_gen_max_data[(row.id, phase)] = q_max
149 # Control variable type
150 control_var = getattr(row, "control_variable", "")
151 gen_control_data[(row.id, phase)] = CONTROL_VARIABLE_MAP[control_var]
153 # Nominal generation values
154 p_gen = getattr(row, f"p{phase}", 0.0)
155 q_gen = getattr(row, f"q{phase}", 0.0)
156 for t in m.time_set:
157 p_gen_data[(row.id, phase, t)] = p_gen
158 q_gen_data[(row.id, phase, t)] = q_gen
160 m.p_gen_nom = pyo.Param(
161 m.gen_phase_set,
162 m.time_set,
163 initialize=p_gen_data,
164 default=0.0,
165 doc="Nominal active power generation",
166 )
167 m.q_gen_nom = pyo.Param(
168 m.gen_phase_set,
169 m.time_set,
170 initialize=q_gen_data,
171 default=0.0,
172 doc="Nominal reactive power generation",
173 )
174 m.s_rated = pyo.Param(
175 m.gen_phase_set,
176 initialize=s_rated_data,
177 default=1000.0,
178 doc="Maximum apparent power rating",
179 )
180 m.q_gen_max = pyo.Param(
181 m.gen_phase_set,
182 initialize=q_gen_max_data,
183 default=1000.0,
184 doc="Maximum reactive power generation",
185 )
186 m.q_gen_min = pyo.Param(
187 m.gen_phase_set,
188 initialize=q_gen_min_data,
189 default=-1000.0,
190 doc="Minimum reactive power generation",
191 )
192 m.gen_control_type = pyo.Param(
193 m.gen_phase_set,
194 initialize=gen_control_data,
195 default=0,
196 doc="Generator control variable type",
197 )
200def _create_capacitor_parameters(m: pyo.ConcreteModel, case: Case) -> None:
201 q_cap_data = {}
202 for _, row in case.cap_data.iterrows():
203 for phase in row.phases:
204 q_cap = getattr(row, f"q{phase}", 0.0)
205 q_cap_data[(row.id, phase)] = q_cap
207 m.q_cap_nom = pyo.Param(
208 m.cap_phase_set,
209 initialize=q_cap_data,
210 default=0.0,
211 doc="Nominal capacitor reactive power at 1.0 p.u. voltage",
212 )
215def _create_regulator_parameters(m: pyo.ConcreteModel, case: Case) -> None:
216 ratio_data = {}
217 for _, row in case.reg_data.iterrows():
218 for phase in row.phases:
219 ratio_data[(row.tb, phase)] = getattr(row, f"ratio_{phase}", 1.0)
221 m.reg_ratio = pyo.Param(
222 m.reg_phase_set,
223 initialize=ratio_data,
224 default=1.0,
225 doc="Voltage regulator turn ratio",
226 )
229def _create_v_swing_parameters(m: pyo.ConcreteModel, case: Case) -> None:
230 v_swing_data = {}
232 # Find swing buses
233 swing_buses = case.bus_data[case.bus_data.bus_type == "SWING"]
235 for _, row in swing_buses.iterrows():
236 for phase in row.phases:
237 if (row.id, phase) not in m.bus_phase_set:
238 continue
239 v_swing = getattr(row, f"v_{phase}", 1.0)
240 for t in m.time_set:
241 v_swing_data[(row.id, phase, t)] = v_swing
243 m.v_swing = pyo.Param(
244 m.swing_phase_set,
245 m.time_set,
246 initialize=v_swing_data,
247 default=1.0,
248 doc="Swing bus voltage magnitude squared",
249 )
252def _create_v_limit_parameters(m: pyo.ConcreteModel, case: Case) -> None:
253 v_min_data, v_max_data = {}, {}
255 for _, row in case.bus_data.iterrows():
256 for phase in row.phases:
257 if (row.id, phase) in m.bus_phase_set:
258 v_min_data[(row.id, phase)] = getattr(row, "v_min", 0.95)
259 v_max_data[(row.id, phase)] = getattr(row, "v_max", 1.05)
261 m.v_min = pyo.Param(
262 m.bus_phase_set,
263 initialize=v_min_data,
264 default=0.95,
265 doc="Minimum voltage magnitude squared",
266 )
267 m.v_max = pyo.Param(
268 m.bus_phase_set,
269 initialize=v_max_data,
270 default=1.05,
271 doc="Maximum voltage magnitude squared",
272 )
275def _create_battery_parameters(m: pyo.ConcreteModel, case: Case) -> None:
276 p_bat_data, q_bat_data = {}, {}
277 s_rated_data, q_bat_min_data, q_bat_max_data = {}, {}, {}
278 energy_capacity_data = {}
279 min_soc_data, max_soc_data = {}, {}
280 start_soc_data = {}
281 charge_efficiency_data = {}
282 discharge_efficiency_data = {}
283 annual_cycle_limit = {}
284 bat_control_data = {}
285 battery_has_a_phase = {}
286 battery_has_b_phase = {}
287 battery_has_c_phase = {}
288 battery_has_phase = {}
289 battery_n_phases = {}
291 for _, row in case.bat_data.iterrows():
292 energy_capacity_data[row.id] = getattr(row, "energy_capacity")
293 min_soc_data[row.id] = getattr(row, "min_soc", 0)
294 max_soc_data[row.id] = getattr(row, "max_soc", 1)
295 start_soc_data[row.id] = getattr(row, "start_soc", 0.5)
296 charge_efficiency_data[row.id] = getattr(row, "charge_efficiency", 1)
297 discharge_efficiency_data[row.id] = getattr(row, "discharge_efficiency", 1)
298 annual_cycle_limit[row.id] = getattr(row, "annual_cycle_limit", 365)
299 bat_control_data[row.id] = CONTROL_VARIABLE_MAP[
300 getattr(row, "control_variable", "P")
301 ]
303 battery_has_a_phase[row.id] = "a" in row.phases
304 battery_has_b_phase[row.id] = "b" in row.phases
305 battery_has_c_phase[row.id] = "c" in row.phases
306 battery_has_phase[(row.id, "a")] = "a" in row.phases
307 battery_has_phase[(row.id, "b")] = "b" in row.phases
308 battery_has_phase[(row.id, "c")] = "c" in row.phases
309 n_phases = len(row.phases)
310 battery_n_phases[row.id] = n_phases
311 # Generation limits
312 for phase in row.phases:
313 if (row.id, phase) not in m.bat_phase_set:
314 continue
315 s_rated = getattr(row, "s_max", 1000.0)
316 s_rated_data[(row.id, phase)] = s_rated / n_phases
317 q_bat_min_data[(row.id, phase)] = getattr(row, "q_min", -s_rated) / n_phases
318 q_bat_max_data[(row.id, phase)] = getattr(row, "q_max", s_rated) / n_phases
319 # Nominal generation values
320 for t in m.time_set:
321 p_bat_data[(row.id, phase, t)] = getattr(row, "p", 0.0) / n_phases
322 q_bat_data[(row.id, phase, t)] = getattr(row, "q", 0.0) / n_phases
324 m.p_bat_nom = pyo.Param(
325 m.bat_phase_set,
326 m.time_set,
327 initialize=p_bat_data,
328 default=0.0,
329 doc="Nominal active power discharge from battery",
330 )
332 m.q_bat_nom = pyo.Param(
333 m.bat_phase_set,
334 m.time_set,
335 initialize=q_bat_data,
336 default=0.0,
337 doc="Nominal reactive power discharge from battery",
338 )
339 m.s_bat_rated = pyo.Param(
340 m.bat_phase_set,
341 initialize=s_rated_data,
342 default=1000.0,
343 doc="Maximum apparent power rating",
344 )
345 m.q_bat_max = pyo.Param(
346 m.bat_phase_set,
347 initialize=q_bat_max_data,
348 default=1000.0,
349 doc="Maximum reactive power generation",
350 )
351 m.q_bat_min = pyo.Param(
352 m.bat_phase_set,
353 initialize=q_bat_min_data,
354 default=-1000.0,
355 doc="Minimum reactive power generation",
356 )
357 m.bat_control_type = pyo.Param(
358 m.bat_set,
359 initialize=bat_control_data,
360 default=0,
361 doc="Battery control variable type",
362 )
363 m.energy_capacity = pyo.Param(
364 m.bat_set,
365 initialize=energy_capacity_data,
366 default=0,
367 doc="Battery energy capacity in units power-base * Wh",
368 )
369 m.soc_min = pyo.Param(
370 m.bat_set,
371 initialize=min_soc_data,
372 default=0,
373 doc="Battery soc minimum as a fraction of energy capacity",
374 )
375 m.soc_max = pyo.Param(
376 m.bat_set,
377 initialize=max_soc_data,
378 default=1,
379 doc="Battery soc maximum as a fraction of energy capacity",
380 )
381 m.start_soc = pyo.Param(
382 m.bat_set,
383 initialize=start_soc_data,
384 default=0.5,
385 doc="Battery starting soc as a fraction of energy capacity",
386 )
387 m.charge_efficiency = pyo.Param(
388 m.bat_set,
389 initialize=charge_efficiency_data,
390 default=1.0,
391 doc="Battery charging efficiency",
392 )
393 m.discharge_efficiency = pyo.Param(
394 m.bat_set,
395 initialize=discharge_efficiency_data,
396 default=1.0,
397 doc="Battery discharging efficiency",
398 )
399 m.annual_cycle_limit = pyo.Param(
400 m.bat_set,
401 initialize=annual_cycle_limit,
402 default=365,
403 doc="Limit to number of discharge cycles per year",
404 )
405 m.battery_has_a_phase = pyo.Param(
406 m.bat_set, initialize=battery_has_a_phase, default=True
407 )
408 m.battery_has_b_phase = pyo.Param(
409 m.bat_set, initialize=battery_has_b_phase, default=True
410 )
411 m.battery_has_c_phase = pyo.Param(
412 m.bat_set, initialize=battery_has_c_phase, default=True
413 )
414 m.battery_has_phase = pyo.Param(
415 m.bat_set, ["a", "b", "c"], initialize=battery_has_phase, default=True
416 )
418 m.battery_n_phases = pyo.Param(
419 m.bat_set,
420 initialize=battery_n_phases,
421 default=3,
422 doc="Number of phases connected to battery.",
423 )
426def _create_parameters(m: pyo.ConcreteModel, case: Case) -> None:
427 """
428 Create all parameters for the Pyomo model including impedances, loads,
429 generators, capacitors, regulator ratios, and swing bus voltages.
430 """
431 _create_rx_parameters(m, case)
432 _create_load_parameters(m, case)
433 _create_generator_parameters(m, case)
434 _create_capacitor_parameters(m, case)
435 _create_regulator_parameters(m, case)
436 _create_v_swing_parameters(m, case)
437 _create_v_limit_parameters(m, case)
438 _create_battery_parameters(m, case)
441def create_lindist_model(case: Case) -> LindistModelProtocol:
442 """
443 Factory function to create a Pyomo ConcreteModel for multiperiod linear distribution system optimization.
445 Parameters
446 ----------
447 case : Case
448 Dataclass containing network data frames
450 Returns
451 -------
452 pyo.ConcreteModel
453 Configured Pyomo model with sets, variables, and parameters
454 """
456 m = pyo.ConcreteModel()
457 m.from_bus_map = {
458 int(tb): int(fb) for fb, tb in case.branch_data.loc[:, ["fb", "tb"]].to_numpy()
459 }
460 m.to_bus_map = {
461 int(bus_id): case.branch_data.loc[
462 case.branch_data.fb == int(bus_id), "tb"
463 ].to_list()
464 for bus_id in case.bus_data.id.to_numpy()
465 }
466 m.name_map = {
467 int(_id): str(name)
468 for _id, name in case.bus_data.loc[:, ["id", "name"]].to_numpy()
469 }
470 m.delta_t = pyo.Param(initialize=case.delta_t)
471 m.start_step = pyo.Param(initialize=case.start_step)
472 m.n_steps = pyo.Param(initialize=case.n_steps)
473 _create_sets(m, case)
474 _create_parameters(m, case)
476 # Time-indexed variables
477 m.v2 = pyo.Var(
478 m.bus_phase_set, m.time_set, domain=pyo.NonNegativeReals, initialize=1
479 ) # Voltage magnitude squared
480 m.p_flow = pyo.Var(m.branch_phase_set, m.time_set)
481 m.q_flow = pyo.Var(m.branch_phase_set, m.time_set, initialize=0)
482 m.p_gen = pyo.Var(m.gen_phase_set, m.time_set, domain=pyo.NonNegativeReals)
483 m.q_gen = pyo.Var(m.gen_phase_set, m.time_set, initialize=0)
484 m.q_cap = pyo.Var(m.cap_phase_set, m.time_set)
485 m.v2_reg = pyo.Var(
486 m.reg_phase_set, m.time_set, domain=pyo.NonNegativeReals, initialize=1
487 )
488 m.p_load = pyo.Var(m.bus_phase_set, m.time_set)
489 m.q_load = pyo.Var(m.bus_phase_set, m.time_set)
491 # create battery variables
492 m.p_charge = pyo.Var(m.bat_set, m.time_set, initialize=0)
493 m.p_discharge = pyo.Var(m.bat_set, m.time_set, initialize=0)
494 m.p_bat = pyo.Var(m.bat_phase_set, m.time_set, initialize=0)
495 m.q_bat = pyo.Var(m.bat_phase_set, m.time_set, initialize=0)
496 m.soc = pyo.Var(m.bat_set, m.time_set, initialize=0.5)
497 model: LindistModelProtocol = m
498 return model