Coverage for src/distopf/utils/utils.py: 57%
70 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-08 11:33 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-08 11:33 -0700
1import pandas as pd
2from typing import Optional
5def get(s: pd.Series, i, default=None):
6 """
7 Get value at index i from a Series. Return default if it does not exist.
8 Parameters
9 ----------
10 s : pd.Series
11 i : index or key for eries
12 default : value to return if it fails
14 Returns
15 -------
16 value: value at index i or default if it doesn't exist.
17 """
18 try:
19 return s.loc[i]
20 except (KeyError, ValueError, IndexError):
21 return default
24def handle_gen_input(gen_data: Optional[pd.DataFrame]) -> pd.DataFrame:
25 if gen_data is None:
26 return pd.DataFrame(
27 columns=[
28 "id",
29 "name",
30 "pa",
31 "pb",
32 "pc",
33 "qa",
34 "qb",
35 "qc",
36 "sa_max",
37 "sb_max",
38 "sc_max",
39 "phases",
40 "qa_max",
41 "qb_max",
42 "qc_max",
43 "qa_min",
44 "qb_min",
45 "qc_min",
46 "control_variable",
47 ]
48 )
49 if f"control_variable" not in gen_data.columns:
50 gen_data[f"control_variable"] = ""
51 gen = gen_data.sort_values(by="id", ignore_index=True)
52 gen.index = gen.id.to_numpy() - 1
53 return gen
56def handle_cap_input(cap_data: Optional[pd.DataFrame]) -> pd.DataFrame:
57 if cap_data is None:
58 return pd.DataFrame(
59 columns=[
60 "id",
61 "name",
62 "qa",
63 "qb",
64 "qc",
65 "phases",
66 ]
67 )
68 cap = cap_data.sort_values(by="id", ignore_index=True)
69 cap.index = cap.id.to_numpy() - 1
70 return cap
73def handle_reg_input(reg_data: Optional[pd.DataFrame]) -> pd.DataFrame:
74 if reg_data is None:
75 return pd.DataFrame(
76 columns=[
77 "fb",
78 "tb",
79 "phases",
80 "tap_a",
81 "tap_b",
82 "tap_c",
83 "ratio_a",
84 "ratio_b",
85 "ratio_c",
86 ]
87 )
88 reg = reg_data.sort_values(by="tb", ignore_index=True)
89 reg.index = reg.tb.to_numpy() - 1
90 for ph in "abc":
91 if f"tap_{ph}" in reg.columns and not f"ratio_{ph}" in reg.columns:
92 reg[f"ratio_{ph}"] = 1 + 0.00625 * reg[f"tap_{ph}"]
93 elif f"ratio_{ph}" in reg.columns and not f"tap_{ph}" in reg.columns:
94 reg[f"tap_{ph}"] = (reg[f"ratio_{ph}"] - 1) / 0.00625
95 elif f"ratio_{ph}" in reg.columns and f"tap_{ph}" in reg.columns:
96 reg[f"ratio_{ph}"] = 1 + 0.00625 * reg[f"tap_{ph}"]
97 # check consistency
98 # if any(abs(reg[f"ratio_{ph}"]) - (1 + 0.00625 * reg[f"tap_{ph}"]) > 1e-6):
99 # raise ValueError(
100 # f"Regulator taps and ratio are inconsistent on phase {ph}!"
101 # )
102 return reg
105def handle_branch_input(branch_data: Optional[pd.DataFrame]) -> pd.DataFrame:
106 if branch_data is None:
107 raise ValueError("Branch data must be provided.")
108 branch = branch_data.sort_values(by="tb", ignore_index=True)
109 branch = branch.loc[branch.status != "OPEN", :]
110 return branch
113def handle_bus_input(bus_data: Optional[pd.DataFrame]) -> pd.DataFrame:
114 if bus_data is None:
115 raise ValueError("Bus data must be provided.")
116 bus = bus_data.sort_values(by="id", ignore_index=True)
117 bus.index = bus.id.to_numpy() - 1
118 return bus
121def handle_loadshape_input(loadshape_data: Optional[pd.DataFrame]) -> pd.DataFrame:
122 if loadshape_data is None:
123 return pd.DataFrame(
124 columns=[
125 "time",
126 "M",
127 ]
128 )
129 loadshape = loadshape_data.sort_values(by="time", ignore_index=True)
130 loadshape.index = loadshape.time.to_numpy()
131 return loadshape
134def handle_pv_loadshape_input(
135 pv_loadshape_data: Optional[pd.DataFrame],
136) -> pd.DataFrame:
137 if pv_loadshape_data is None:
138 return pd.DataFrame(
139 columns=[
140 "time",
141 "PV",
142 ]
143 )
144 pv_loadshape = pv_loadshape_data.sort_values(by="time", ignore_index=True)
145 pv_loadshape.index = pv_loadshape.time.to_numpy()
146 return pv_loadshape
149def handle_bat_input(bat_data: Optional[pd.DataFrame]) -> pd.DataFrame:
150 if bat_data is None:
151 return pd.DataFrame(
152 columns=[
153 "id",
154 "name",
155 "nc_a",
156 "nc_b",
157 "nc_c",
158 "nd_a",
159 "nd_b",
160 "nd_c",
161 "hmax_a",
162 "hmax_b",
163 "hmax_c",
164 "Pb_max_a",
165 "Pb_max_b",
166 "Pb_max_c",
167 "bmin_a",
168 "bmin_b",
169 "bmin_c",
170 "bmax_a",
171 "bmax_b",
172 "bmax_c",
173 "b0_a",
174 "b0_b",
175 "b0_c",
176 "phases",
177 ]
178 )
179 if "b0_a" not in bat_data.columns:
180 bat_data["b0_a"] = bat_data.bmin_a
181 if "b0_b" not in bat_data.columns:
182 bat_data["b0_b"] = bat_data.bmin_a
183 if "b0_c" not in bat_data.columns:
184 bat_data["b0_c"] = bat_data.bmin_a
185 bat = bat_data.sort_values(by="id", ignore_index=True)
186 bat.index = bat.id.to_numpy() - 1
187 return bat