Coverage for src/distopf/importer.py: 48%

161 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1from pathlib import Path 

2import pandas as pd 

3from distopf.dss_importer import DSSToCSVConverter 

4from distopf.cim_importer import CIMToCSVConverter 

5from typing import Optional 

6from distopf.utils import ( 

7 handle_branch_input, 

8 handle_bus_input, 

9 handle_gen_input, 

10 handle_cap_input, 

11 handle_reg_input, 

12 handle_bat_input, 

13 handle_schedules_input, 

14) 

15 

16 

17class Case: 

18 def __init__( 

19 self, 

20 branch_data: pd.DataFrame, 

21 bus_data: pd.DataFrame, 

22 gen_data: Optional[pd.DataFrame] = None, 

23 cap_data: Optional[pd.DataFrame] = None, 

24 reg_data: Optional[pd.DataFrame] = None, 

25 bat_data: Optional[pd.DataFrame] = None, 

26 schedules: Optional[pd.DataFrame] = None, 

27 start_step: int = 0, 

28 n_steps: int = 1, 

29 delta_t: float = 1, # hours per step 

30 ): 

31 self.branch_data = handle_branch_input(branch_data) 

32 self.bus_data = handle_bus_input(bus_data) 

33 self.gen_data = handle_gen_input(gen_data) 

34 self.cap_data = handle_cap_input(cap_data) 

35 self.reg_data = handle_reg_input(reg_data) 

36 self.bat_data = handle_bat_input(bat_data) 

37 self.schedules = handle_schedules_input(schedules) 

38 self.start_step = start_step 

39 self.n_steps = n_steps 

40 self.delta_t = delta_t # hours per step 

41 self._validate_case() 

42 

43 def _validate_case(self): 

44 # TODO: add validation logic here 

45 # test phase consistency across all devices 

46 # check control variable is all caps and one of "", "P", "Q", "PQ" 

47 pass 

48 

49 

50def create_case( 

51 data_path: Path, 

52 model_type: Optional[str] = None, 

53 start_step: int = 0, 

54 n_steps: int = 1, 

55 delta_t: float = 1, 

56) -> Case: 

57 """ 

58 Create a Case object from various input formats. 

59 

60 Automatically detects the model type based on file/directory structure 

61 if model_type is not specified. 

62 

63 Parameters 

64 ---------- 

65 data_path : Path 

66 Path to the model data. Can be: 

67 - Directory containing CSV files 

68 - OpenDSS .dss file 

69 - CIM .xml file 

70 model_type : Optional[str] 

71 Explicitly specify the model type. Options: 

72 - "csv": CSV directory 

73 - "dss" or "opendss": OpenDSS file 

74 - "cim": CIM XML file 

75 - None: Auto-detect based on path 

76 

77 Returns 

78 ------- 

79 Case 

80 Case object with loaded data 

81 

82 Raises 

83 ------ 

84 FileNotFoundError 

85 If the specified path does not exist 

86 ValueError 

87 If the model type cannot be determined or is unsupported 

88 """ 

89 

90 # Convert to Path object if string 

91 data_path = Path(data_path) 

92 

93 if not data_path.exists(): 

94 raise FileNotFoundError(f"Path does not exist: {data_path}") 

95 

96 # Auto-detect model type if not specified 

97 if model_type is None: 

98 model_type = _detect_model_type(data_path) 

99 

100 # Normalize model type 

101 model_type = model_type.lower().strip() 

102 

103 # Route to appropriate function based on model type 

104 if model_type == "csv": 

105 return create_case_from_csv( 

106 data_path, 

107 start_step=start_step, 

108 n_steps=n_steps, 

109 delta_t=delta_t, 

110 ) 

111 elif model_type in ["dss", "opendss"]: 

112 return create_case_from_dss( 

113 data_path, 

114 start_step=start_step, 

115 n_steps=n_steps, 

116 delta_t=delta_t, 

117 ) 

118 elif model_type == "cim": 

119 return create_case_from_cim( 

120 data_path, 

121 start_step=start_step, 

122 n_steps=n_steps, 

123 delta_t=delta_t, 

124 ) 

125 else: 

126 raise ValueError( 

127 f"Unsupported model type: '{model_type}'. " 

128 f"Supported types are: 'csv', 'dss', 'opendss', 'cim'" 

129 ) 

130 

131 

132def _detect_model_type(data_path: Path) -> str: 

133 """ 

134 Automatically detect the model type based on file/directory structure. 

135 

136 Parameters 

137 ---------- 

138 data_path : Path 

139 Path to examine 

140 

141 Returns 

142 ------- 

143 str 

144 Detected model type: "csv", "dss", or "cim" 

145 

146 Raises 

147 ------ 

148 ValueError 

149 If model type cannot be determined 

150 """ 

151 

152 if data_path.is_file(): 

153 # Check file extension 

154 suffix = data_path.suffix.lower() 

155 

156 if suffix == ".dss": 

157 return "dss" 

158 elif suffix == ".xml": 

159 # Could be CIM or other XML format 

160 # For now, assume CIM if it's XML 

161 # Could add more sophisticated detection by reading file content 

162 return "cim" 

163 else: 

164 raise ValueError( 

165 f"Cannot determine model type for file: {data_path}. " 

166 f"Expected .dss or .xml extension, got: {suffix}" 

167 ) 

168 

169 elif data_path.is_dir(): 

170 # Check for CSV files 

171 csv_files = { 

172 "branch_data.csv": data_path / "branch_data.csv", 

173 "bus_data.csv": data_path / "bus_data.csv", 

174 "gen_data.csv": data_path / "gen_data.csv", 

175 "cap_data.csv": data_path / "cap_data.csv", 

176 "reg_data.csv": data_path / "reg_data.csv", 

177 } 

178 

179 # Check if we have at least the essential CSV files 

180 essential_files = ["branch_data.csv", "bus_data.csv"] 

181 has_essential = all(csv_files[file].exists() for file in essential_files) 

182 

183 if has_essential: 

184 return "csv" 

185 

186 # Check for OpenDSS files in directory 

187 dss_files = list(data_path.glob("*.dss")) 

188 if dss_files: 

189 # If there are DSS files, this might be a DSS directory 

190 # But our current implementation expects a single .dss file 

191 raise ValueError( 

192 "Directory contains .dss files but create_case() expects a single .dss file. " 

193 "Please specify the exact .dss file path instead of the directory." 

194 ) 

195 

196 # Check for CIM files in directory 

197 xml_files = list(data_path.glob("*.xml")) 

198 if xml_files: 

199 # Similar issue as with DSS 

200 raise ValueError( 

201 "Directory contains .xml files but create_case() expects a single .xml file. " 

202 "Please specify the exact .xml file path instead of the directory." 

203 ) 

204 

205 raise ValueError( 

206 f"Cannot determine model type for directory: {data_path}. " 

207 f"Expected CSV files (branch_data.csv, bus_data.csv) not found." 

208 ) 

209 

210 else: 

211 raise ValueError(f"Path is neither a file nor a directory: {data_path}") 

212 

213 

214def _validate_case_data(case: Case) -> None: 

215 """ 

216 Validate that the Case has the minimum required data. 

217 

218 Parameters 

219 ---------- 

220 case : Case 

221 Case object to validate 

222 

223 Raises 

224 ------ 

225 ValueError 

226 If essential data is missing 

227 """ 

228 

229 if case.bus_data is None or len(case.bus_data) == 0: 

230 raise ValueError("Case must contain bus data") 

231 

232 if case.branch_data is None or len(case.branch_data) == 0: 

233 raise ValueError("Case must contain branch data") 

234 

235 # Check for swing bus 

236 if case.bus_data is not None: 

237 swing_buses = case.bus_data[case.bus_data.bus_type == "SWING"] 

238 if len(swing_buses) == 0: 

239 raise ValueError("Case must contain at least one SWING bus") 

240 elif len(swing_buses) > 1: 

241 raise ValueError("Case cannot contain more than one SWING bus") 

242 

243 

244# Enhanced versions of existing functions with better error handling 

245def create_case_from_csv( 

246 data_path: Path, 

247 start_step: int = 0, 

248 n_steps: int = 1, 

249 delta_t: float = 1, 

250) -> Case: 

251 """Enhanced version with better error handling and validation.""" 

252 

253 if not data_path.exists(): 

254 raise FileNotFoundError(f"Path does not exist: {data_path}") 

255 

256 if data_path.is_file(): 

257 raise ValueError( 

258 f"Expected directory containing CSV files, got file: {data_path}" 

259 ) 

260 

261 if not data_path.is_dir(): 

262 raise ValueError(f"Path is not a directory: {data_path}") 

263 

264 # Initialize data variables 

265 branch_data = None 

266 bus_data = None 

267 gen_data = None 

268 cap_data = None 

269 reg_data = None 

270 bat_data = None 

271 schedules = None 

272 

273 # Load CSV files 

274 csv_files = { 

275 "branch_data": data_path / "branch_data.csv", 

276 "bus_data": data_path / "bus_data.csv", 

277 "gen_data": data_path / "gen_data.csv", 

278 "cap_data": data_path / "cap_data.csv", 

279 "reg_data": data_path / "reg_data.csv", 

280 "bat_data": data_path / "bat_data.csv", 

281 "schedules": data_path / "schedules.csv", 

282 } 

283 

284 try: 

285 # Load branch data (required) 

286 if csv_files["branch_data"].exists(): 

287 branch_data = pd.read_csv(csv_files["branch_data"], header=0) 

288 else: 

289 raise FileNotFoundError( 

290 f"Required file not found: {csv_files['branch_data']}" 

291 ) 

292 

293 # Load bus data (required) 

294 if csv_files["bus_data"].exists(): 

295 bus_data = pd.read_csv(csv_files["bus_data"], header=0) 

296 else: 

297 raise FileNotFoundError(f"Required file not found: {csv_files['bus_data']}") 

298 

299 # Load optional files 

300 if csv_files["gen_data"].exists(): 

301 gen_data = pd.read_csv(csv_files["gen_data"], header=0) 

302 

303 if csv_files["cap_data"].exists(): 

304 cap_data = pd.read_csv(csv_files["cap_data"], header=0) 

305 

306 if csv_files["reg_data"].exists(): 

307 reg_data = pd.read_csv(csv_files["reg_data"], header=0) 

308 

309 if csv_files["bat_data"].exists(): 

310 bat_data = pd.read_csv(csv_files["bat_data"], header=0) 

311 

312 if csv_files["schedules"].exists(): 

313 schedules = pd.read_csv(csv_files["schedules"], header=0) 

314 

315 except Exception as e: 

316 raise ValueError(f"Error reading CSV files from {data_path}: {e}") 

317 

318 # Create and validate case 

319 case = Case( 

320 branch_data, 

321 bus_data, 

322 gen_data, 

323 cap_data, 

324 reg_data, 

325 bat_data, 

326 schedules, 

327 start_step=start_step, 

328 n_steps=n_steps, 

329 delta_t=delta_t, 

330 ) 

331 

332 _validate_case_data(case) 

333 return case 

334 

335 

336def create_case_from_dss( 

337 data_path: Path, 

338 start_step: int = 0, 

339 n_steps: int = 1, 

340 delta_t: float = 1, 

341) -> Case: 

342 """Enhanced version with better error handling.""" 

343 

344 if not data_path.exists(): 

345 raise FileNotFoundError(f"OpenDSS file does not exist: {data_path}") 

346 

347 if not data_path.is_file(): 

348 raise ValueError(f"Expected OpenDSS file, got directory: {data_path}") 

349 

350 if data_path.suffix.lower() != ".dss": 

351 raise ValueError(f"Expected .dss file extension, got: {data_path.suffix}") 

352 

353 try: 

354 dss_parser = DSSToCSVConverter(data_path) 

355 case = Case( 

356 dss_parser.branch_data, 

357 dss_parser.bus_data, 

358 dss_parser.gen_data, 

359 dss_parser.cap_data, 

360 dss_parser.reg_data, 

361 start_step=start_step, 

362 n_steps=n_steps, 

363 delta_t=delta_t, 

364 ) 

365 _validate_case_data(case) 

366 return case 

367 

368 except Exception as e: 

369 raise ValueError(f"Error converting OpenDSS file {data_path}: {e}") 

370 

371 

372def create_case_from_cim( 

373 data_path: Path, 

374 start_step: int = 0, 

375 n_steps: int = 1, 

376 delta_t: float = 1, 

377) -> Case: 

378 """Enhanced version with better error handling.""" 

379 

380 if not data_path.exists(): 

381 raise FileNotFoundError(f"CIM file does not exist: {data_path}") 

382 

383 if not data_path.is_file(): 

384 raise ValueError(f"Expected CIM XML file, got directory: {data_path}") 

385 

386 if data_path.suffix.lower() != ".xml": 

387 raise ValueError(f"Expected .xml file extension, got: {data_path.suffix}") 

388 

389 try: 

390 cim_parser = CIMToCSVConverter(data_path) 

391 data = cim_parser.convert() 

392 

393 case = Case( 

394 data["branch_data"], 

395 data["bus_data"], 

396 data["gen_data"], 

397 data["cap_data"], 

398 data["reg_data"], 

399 start_step=start_step, 

400 n_steps=n_steps, 

401 delta_t=delta_t, 

402 ) 

403 _validate_case_data(case) 

404 return case 

405 

406 except Exception as e: 

407 raise ValueError(f"Error converting CIM file {data_path}: {e}") 

408 

409 

410def modify_case( 

411 case: Case, 

412 load_mult=None, 

413 gen_mult=None, 

414 control_variable=None, 

415 v_swing=None, 

416 v_min=None, 

417 v_max=None, 

418 cvr_p=None, 

419 cvr_q=None, 

420): 

421 # Modify load multiplier 

422 if load_mult is not None: 

423 case.bus_data.loc[:, ["pl_a", "ql_a", "pl_b", "ql_b", "pl_c", "ql_c"]] *= ( 

424 load_mult 

425 ) 

426 # Modify generation multiplier 

427 if gen_mult is not None and case.gen_data is not None: 

428 case.gen_data.loc[:, ["pa", "pb", "pc"]] *= gen_mult 

429 case.gen_data.loc[:, ["qa", "qb", "qc"]] *= gen_mult 

430 case.gen_data.loc[:, ["sa_max", "sb_max", "sc_max"]] *= gen_mult 

431 # Modify control_variable 

432 if control_variable is not None and case.gen_data is not None: 

433 if control_variable == "": 

434 case.gen_data.control_variable = "P" 

435 if control_variable.upper() == "P": 

436 case.gen_data.control_variable = "P" 

437 if control_variable.upper() == "Q": 

438 case.gen_data.control_variable = "Q" 

439 if control_variable.upper() == "PQ": 

440 case.gen_data.control_variable = "PQ" 

441 

442 # Modify swing voltage 

443 if v_swing is not None: 

444 case.bus_data.loc[case.bus_data.bus_type == "SWING", ["v_a", "v_b", "v_c"]] = ( 

445 v_swing 

446 ) 

447 

448 if v_min is not None: 

449 case.bus_data.loc[:, "v_min"] = v_min 

450 

451 if v_max is not None: 

452 case.bus_data.loc[:, "v_max"] = v_max 

453 

454 if cvr_p is not None: 

455 case.bus_data.loc[:, "cvr_p"] = cvr_p 

456 

457 if cvr_q is not None: 

458 case.bus_data.loc[:, "cvr_q"] = cvr_q 

459 

460 return case