Coverage for src/distopf/importer.py: 48%
161 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-13 17:34 -0800
1from pathlib import Path
2import pandas as pd
3from distopf.dss_importer import DSSToCSVConverter
4from distopf.cim_importer import CIMToCSVConverter
5from typing import Optional
6from distopf.utils import (
7 handle_branch_input,
8 handle_bus_input,
9 handle_gen_input,
10 handle_cap_input,
11 handle_reg_input,
12 handle_bat_input,
13 handle_schedules_input,
14)
17class Case:
18 def __init__(
19 self,
20 branch_data: pd.DataFrame,
21 bus_data: pd.DataFrame,
22 gen_data: Optional[pd.DataFrame] = None,
23 cap_data: Optional[pd.DataFrame] = None,
24 reg_data: Optional[pd.DataFrame] = None,
25 bat_data: Optional[pd.DataFrame] = None,
26 schedules: Optional[pd.DataFrame] = None,
27 start_step: int = 0,
28 n_steps: int = 1,
29 delta_t: float = 1, # hours per step
30 ):
31 self.branch_data = handle_branch_input(branch_data)
32 self.bus_data = handle_bus_input(bus_data)
33 self.gen_data = handle_gen_input(gen_data)
34 self.cap_data = handle_cap_input(cap_data)
35 self.reg_data = handle_reg_input(reg_data)
36 self.bat_data = handle_bat_input(bat_data)
37 self.schedules = handle_schedules_input(schedules)
38 self.start_step = start_step
39 self.n_steps = n_steps
40 self.delta_t = delta_t # hours per step
41 self._validate_case()
43 def _validate_case(self):
44 # TODO: add validation logic here
45 # test phase consistency across all devices
46 # check control variable is all caps and one of "", "P", "Q", "PQ"
47 pass
50def create_case(
51 data_path: Path,
52 model_type: Optional[str] = None,
53 start_step: int = 0,
54 n_steps: int = 1,
55 delta_t: float = 1,
56) -> Case:
57 """
58 Create a Case object from various input formats.
60 Automatically detects the model type based on file/directory structure
61 if model_type is not specified.
63 Parameters
64 ----------
65 data_path : Path
66 Path to the model data. Can be:
67 - Directory containing CSV files
68 - OpenDSS .dss file
69 - CIM .xml file
70 model_type : Optional[str]
71 Explicitly specify the model type. Options:
72 - "csv": CSV directory
73 - "dss" or "opendss": OpenDSS file
74 - "cim": CIM XML file
75 - None: Auto-detect based on path
77 Returns
78 -------
79 Case
80 Case object with loaded data
82 Raises
83 ------
84 FileNotFoundError
85 If the specified path does not exist
86 ValueError
87 If the model type cannot be determined or is unsupported
88 """
90 # Convert to Path object if string
91 data_path = Path(data_path)
93 if not data_path.exists():
94 raise FileNotFoundError(f"Path does not exist: {data_path}")
96 # Auto-detect model type if not specified
97 if model_type is None:
98 model_type = _detect_model_type(data_path)
100 # Normalize model type
101 model_type = model_type.lower().strip()
103 # Route to appropriate function based on model type
104 if model_type == "csv":
105 return create_case_from_csv(
106 data_path,
107 start_step=start_step,
108 n_steps=n_steps,
109 delta_t=delta_t,
110 )
111 elif model_type in ["dss", "opendss"]:
112 return create_case_from_dss(
113 data_path,
114 start_step=start_step,
115 n_steps=n_steps,
116 delta_t=delta_t,
117 )
118 elif model_type == "cim":
119 return create_case_from_cim(
120 data_path,
121 start_step=start_step,
122 n_steps=n_steps,
123 delta_t=delta_t,
124 )
125 else:
126 raise ValueError(
127 f"Unsupported model type: '{model_type}'. "
128 f"Supported types are: 'csv', 'dss', 'opendss', 'cim'"
129 )
132def _detect_model_type(data_path: Path) -> str:
133 """
134 Automatically detect the model type based on file/directory structure.
136 Parameters
137 ----------
138 data_path : Path
139 Path to examine
141 Returns
142 -------
143 str
144 Detected model type: "csv", "dss", or "cim"
146 Raises
147 ------
148 ValueError
149 If model type cannot be determined
150 """
152 if data_path.is_file():
153 # Check file extension
154 suffix = data_path.suffix.lower()
156 if suffix == ".dss":
157 return "dss"
158 elif suffix == ".xml":
159 # Could be CIM or other XML format
160 # For now, assume CIM if it's XML
161 # Could add more sophisticated detection by reading file content
162 return "cim"
163 else:
164 raise ValueError(
165 f"Cannot determine model type for file: {data_path}. "
166 f"Expected .dss or .xml extension, got: {suffix}"
167 )
169 elif data_path.is_dir():
170 # Check for CSV files
171 csv_files = {
172 "branch_data.csv": data_path / "branch_data.csv",
173 "bus_data.csv": data_path / "bus_data.csv",
174 "gen_data.csv": data_path / "gen_data.csv",
175 "cap_data.csv": data_path / "cap_data.csv",
176 "reg_data.csv": data_path / "reg_data.csv",
177 }
179 # Check if we have at least the essential CSV files
180 essential_files = ["branch_data.csv", "bus_data.csv"]
181 has_essential = all(csv_files[file].exists() for file in essential_files)
183 if has_essential:
184 return "csv"
186 # Check for OpenDSS files in directory
187 dss_files = list(data_path.glob("*.dss"))
188 if dss_files:
189 # If there are DSS files, this might be a DSS directory
190 # But our current implementation expects a single .dss file
191 raise ValueError(
192 "Directory contains .dss files but create_case() expects a single .dss file. "
193 "Please specify the exact .dss file path instead of the directory."
194 )
196 # Check for CIM files in directory
197 xml_files = list(data_path.glob("*.xml"))
198 if xml_files:
199 # Similar issue as with DSS
200 raise ValueError(
201 "Directory contains .xml files but create_case() expects a single .xml file. "
202 "Please specify the exact .xml file path instead of the directory."
203 )
205 raise ValueError(
206 f"Cannot determine model type for directory: {data_path}. "
207 f"Expected CSV files (branch_data.csv, bus_data.csv) not found."
208 )
210 else:
211 raise ValueError(f"Path is neither a file nor a directory: {data_path}")
214def _validate_case_data(case: Case) -> None:
215 """
216 Validate that the Case has the minimum required data.
218 Parameters
219 ----------
220 case : Case
221 Case object to validate
223 Raises
224 ------
225 ValueError
226 If essential data is missing
227 """
229 if case.bus_data is None or len(case.bus_data) == 0:
230 raise ValueError("Case must contain bus data")
232 if case.branch_data is None or len(case.branch_data) == 0:
233 raise ValueError("Case must contain branch data")
235 # Check for swing bus
236 if case.bus_data is not None:
237 swing_buses = case.bus_data[case.bus_data.bus_type == "SWING"]
238 if len(swing_buses) == 0:
239 raise ValueError("Case must contain at least one SWING bus")
240 elif len(swing_buses) > 1:
241 raise ValueError("Case cannot contain more than one SWING bus")
244# Enhanced versions of existing functions with better error handling
245def create_case_from_csv(
246 data_path: Path,
247 start_step: int = 0,
248 n_steps: int = 1,
249 delta_t: float = 1,
250) -> Case:
251 """Enhanced version with better error handling and validation."""
253 if not data_path.exists():
254 raise FileNotFoundError(f"Path does not exist: {data_path}")
256 if data_path.is_file():
257 raise ValueError(
258 f"Expected directory containing CSV files, got file: {data_path}"
259 )
261 if not data_path.is_dir():
262 raise ValueError(f"Path is not a directory: {data_path}")
264 # Initialize data variables
265 branch_data = None
266 bus_data = None
267 gen_data = None
268 cap_data = None
269 reg_data = None
270 bat_data = None
271 schedules = None
273 # Load CSV files
274 csv_files = {
275 "branch_data": data_path / "branch_data.csv",
276 "bus_data": data_path / "bus_data.csv",
277 "gen_data": data_path / "gen_data.csv",
278 "cap_data": data_path / "cap_data.csv",
279 "reg_data": data_path / "reg_data.csv",
280 "bat_data": data_path / "bat_data.csv",
281 "schedules": data_path / "schedules.csv",
282 }
284 try:
285 # Load branch data (required)
286 if csv_files["branch_data"].exists():
287 branch_data = pd.read_csv(csv_files["branch_data"], header=0)
288 else:
289 raise FileNotFoundError(
290 f"Required file not found: {csv_files['branch_data']}"
291 )
293 # Load bus data (required)
294 if csv_files["bus_data"].exists():
295 bus_data = pd.read_csv(csv_files["bus_data"], header=0)
296 else:
297 raise FileNotFoundError(f"Required file not found: {csv_files['bus_data']}")
299 # Load optional files
300 if csv_files["gen_data"].exists():
301 gen_data = pd.read_csv(csv_files["gen_data"], header=0)
303 if csv_files["cap_data"].exists():
304 cap_data = pd.read_csv(csv_files["cap_data"], header=0)
306 if csv_files["reg_data"].exists():
307 reg_data = pd.read_csv(csv_files["reg_data"], header=0)
309 if csv_files["bat_data"].exists():
310 bat_data = pd.read_csv(csv_files["bat_data"], header=0)
312 if csv_files["schedules"].exists():
313 schedules = pd.read_csv(csv_files["schedules"], header=0)
315 except Exception as e:
316 raise ValueError(f"Error reading CSV files from {data_path}: {e}")
318 # Create and validate case
319 case = Case(
320 branch_data,
321 bus_data,
322 gen_data,
323 cap_data,
324 reg_data,
325 bat_data,
326 schedules,
327 start_step=start_step,
328 n_steps=n_steps,
329 delta_t=delta_t,
330 )
332 _validate_case_data(case)
333 return case
336def create_case_from_dss(
337 data_path: Path,
338 start_step: int = 0,
339 n_steps: int = 1,
340 delta_t: float = 1,
341) -> Case:
342 """Enhanced version with better error handling."""
344 if not data_path.exists():
345 raise FileNotFoundError(f"OpenDSS file does not exist: {data_path}")
347 if not data_path.is_file():
348 raise ValueError(f"Expected OpenDSS file, got directory: {data_path}")
350 if data_path.suffix.lower() != ".dss":
351 raise ValueError(f"Expected .dss file extension, got: {data_path.suffix}")
353 try:
354 dss_parser = DSSToCSVConverter(data_path)
355 case = Case(
356 dss_parser.branch_data,
357 dss_parser.bus_data,
358 dss_parser.gen_data,
359 dss_parser.cap_data,
360 dss_parser.reg_data,
361 start_step=start_step,
362 n_steps=n_steps,
363 delta_t=delta_t,
364 )
365 _validate_case_data(case)
366 return case
368 except Exception as e:
369 raise ValueError(f"Error converting OpenDSS file {data_path}: {e}")
372def create_case_from_cim(
373 data_path: Path,
374 start_step: int = 0,
375 n_steps: int = 1,
376 delta_t: float = 1,
377) -> Case:
378 """Enhanced version with better error handling."""
380 if not data_path.exists():
381 raise FileNotFoundError(f"CIM file does not exist: {data_path}")
383 if not data_path.is_file():
384 raise ValueError(f"Expected CIM XML file, got directory: {data_path}")
386 if data_path.suffix.lower() != ".xml":
387 raise ValueError(f"Expected .xml file extension, got: {data_path.suffix}")
389 try:
390 cim_parser = CIMToCSVConverter(data_path)
391 data = cim_parser.convert()
393 case = Case(
394 data["branch_data"],
395 data["bus_data"],
396 data["gen_data"],
397 data["cap_data"],
398 data["reg_data"],
399 start_step=start_step,
400 n_steps=n_steps,
401 delta_t=delta_t,
402 )
403 _validate_case_data(case)
404 return case
406 except Exception as e:
407 raise ValueError(f"Error converting CIM file {data_path}: {e}")
410def modify_case(
411 case: Case,
412 load_mult=None,
413 gen_mult=None,
414 control_variable=None,
415 v_swing=None,
416 v_min=None,
417 v_max=None,
418 cvr_p=None,
419 cvr_q=None,
420):
421 # Modify load multiplier
422 if load_mult is not None:
423 case.bus_data.loc[:, ["pl_a", "ql_a", "pl_b", "ql_b", "pl_c", "ql_c"]] *= (
424 load_mult
425 )
426 # Modify generation multiplier
427 if gen_mult is not None and case.gen_data is not None:
428 case.gen_data.loc[:, ["pa", "pb", "pc"]] *= gen_mult
429 case.gen_data.loc[:, ["qa", "qb", "qc"]] *= gen_mult
430 case.gen_data.loc[:, ["sa_max", "sb_max", "sc_max"]] *= gen_mult
431 # Modify control_variable
432 if control_variable is not None and case.gen_data is not None:
433 if control_variable == "":
434 case.gen_data.control_variable = "P"
435 if control_variable.upper() == "P":
436 case.gen_data.control_variable = "P"
437 if control_variable.upper() == "Q":
438 case.gen_data.control_variable = "Q"
439 if control_variable.upper() == "PQ":
440 case.gen_data.control_variable = "PQ"
442 # Modify swing voltage
443 if v_swing is not None:
444 case.bus_data.loc[case.bus_data.bus_type == "SWING", ["v_a", "v_b", "v_c"]] = (
445 v_swing
446 )
448 if v_min is not None:
449 case.bus_data.loc[:, "v_min"] = v_min
451 if v_max is not None:
452 case.bus_data.loc[:, "v_max"] = v_max
454 if cvr_p is not None:
455 case.bus_data.loc[:, "cvr_p"] = cvr_p
457 if cvr_q is not None:
458 case.bus_data.loc[:, "cvr_q"] = cvr_q
460 return case