Coverage for C:\src\imod-python\imod\flow\model.py: 83%

259 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-08 14:15 +0200

1import abc 

2import collections 

3import os 

4import pathlib 

5import warnings 

6 

7import cftime 

8import jinja2 

9import numpy as np 

10import pandas as pd 

11import xarray as xr 

12 

13import imod 

14from imod.flow.pkgbase import BoundaryCondition 

15from imod.flow.pkggroup import PackageGroups 

16from imod.flow.timeutil import insert_unique_package_times 

17from imod.util.nested_dict import append_nested_dict, initialize_nested_dict 

18from imod.util.time import _compose_timestring, timestep_duration, to_datetime_internal 

19 

20 

21class IniFile(collections.UserDict, abc.ABC): 

22 """ 

23 Some basic support for iMOD ini files here 

24 

25 These files contain the settings that iMOD uses to run its batch 

26 functions. For example to convert its model description -- a projectfile 

27 containing paths to respective .IDFs for each package -- to a Modflow6 

28 model. 

29 """ 

30 

31 # TODO: Create own key mapping to avoid keys like "edate"? 

32 _template = jinja2.Template( 

33 "{%- for key, value in settings %}\n" "{{key}}={{value}}\n" "{%- endfor %}\n" 

34 ) 

35 

36 def _format_datetimes(self): 

37 for timekey in ["sdate", "edate"]: 

38 if timekey in self.keys(): 

39 # If not string assume it is in some kind of datetime format 

40 if not isinstance(self[timekey], str): 

41 self[timekey] = _compose_timestring(self[timekey]) 

42 

43 def render(self): 

44 self._format_datetimes() 

45 return self._template.render(settings=self.items()) 

46 

47 

48def _relpath(path, to): 

49 # Wraps os.path.relpath 

50 try: 

51 return pathlib.Path(os.path.relpath(path, to)) 

52 except ValueError: 

53 # Fails to switch between drives e.g. 

54 return pathlib.Path(os.path.abspath(path)) 

55 

56 

57# This class allows only imod packages as values 

58class Model(collections.UserDict): 

59 def __setitem__(self, key, value): 

60 # TODO: raise ValueError on setting certain duplicates 

61 # e.g. two solvers 

62 if self.check == "eager": 

63 value._pkgcheck() 

64 super().__setitem__(key, value) 

65 

66 def update(self, *args, **kwargs): 

67 for k, v in dict(*args, **kwargs).items(): 

68 self[k] = v 

69 

70 def _delete_empty_packages(self, verbose=False): 

71 to_del = [] 

72 for pkg in self.keys(): 

73 dv = list(self[pkg].dataset.data_vars)[0] 

74 if not self[pkg][dv].notnull().any().compute(): 

75 if verbose: 

76 warnings.warn( 

77 f"Deleting package {pkg}, found no data in parameter {dv}" 

78 ) 

79 to_del.append(pkg) 

80 for pkg in to_del: 

81 del self[pkg] 

82 

83 

84class ImodflowModel(Model): 

85 """ 

86 Class representing iMODFLOW model input. Running it requires iMOD5. 

87 

88 `Download iMOD5 here <https://oss.deltares.nl/web/imod/download-imod5>`_ 

89 

90 Attributes 

91 ---------- 

92 modelname : str check : str, optional 

93 When to perform model checks {None, "defer", "eager"}. Defaults to 

94 "defer". 

95 

96 Examples 

97 -------- 

98 

99 >>> m = Imodflow("example") 

100 >>> m["riv"] = River(...) 

101 >>> # ...etc. 

102 >>> m.create_time_discretization(endtime) 

103 >>> m.write() 

104 """ 

105 

106 # These templates end up here since they require global information 

107 # from more than one package 

108 _PACKAGE_GROUPS = PackageGroups 

109 

110 def __init__(self, modelname, check="defer"): 

111 super().__init__() 

112 self.modelname = modelname 

113 self.check = check 

114 

115 def _get_pkgkey(self, pkg_id): 

116 """ 

117 Get package key that belongs to a certain pkg_id, since the keys are 

118 user specified. 

119 """ 

120 key = [pkgname for pkgname, pkg in self.items() if pkg._pkg_id == pkg_id] 

121 nkey = len(key) 

122 if nkey > 1: 

123 raise ValueError(f"Multiple instances of {key} detected") 

124 elif nkey == 1: 

125 return key[0] 

126 else: 

127 return None 

128 

129 def _group(self): 

130 """ 

131 Group multiple systems of a single package E.g. all river or drainage 

132 sub-systems 

133 """ 

134 groups = collections.defaultdict(dict) 

135 groupable = set(self._PACKAGE_GROUPS.__members__.keys()) 

136 for key, package in self.items(): 

137 pkg_id = package._pkg_id 

138 if pkg_id in groupable: 

139 groups[pkg_id][key] = package 

140 

141 package_groups = [] 

142 for pkg_id, group in groups.items(): 

143 # Create PackageGroup for every package 

144 # RiverGroup for rivers, DrainageGroup for drainage, etc. 

145 package_groups.append(self._PACKAGE_GROUPS[pkg_id].value(**group)) 

146 

147 return package_groups 

148 

149 def _use_cftime(self): 

150 """ 

151 Also checks if datetime types are homogeneous across packages. 

152 """ 

153 types = [] 

154 for pkg in self.values(): 

155 if pkg._hastime(): 

156 types.append(type(np.atleast_1d(pkg["time"].values)[0])) 

157 

158 # Types will be empty if there's no time dependent input 

159 set_of_types = set(types) 

160 if len(set_of_types) == 0: 

161 return None 

162 else: # there is time dependent input 

163 if not len(set_of_types) == 1: 

164 raise ValueError( 

165 f"Multiple datetime types detected: {set_of_types}. " 

166 "Use either cftime or numpy.datetime64[ns]." 

167 ) 

168 # Since we compare types and not instances, we use issubclass 

169 if issubclass(types[0], cftime.datetime): 

170 return True 

171 elif issubclass(types[0], np.datetime64): 

172 return False 

173 else: 

174 raise ValueError("Use either cftime or numpy.datetime64[ns].") 

175 

176 def time_discretization(self, times): 

177 warnings.warn( 

178 f"{self.__class__.__name__}.time_discretization() is deprecated. " 

179 f"In the future call {self.__class__.__name__}.create_time_discretization().", 

180 DeprecationWarning, 

181 ) 

182 self.create_time_discretization(additional_times=times) 

183 

184 def create_time_discretization(self, additional_times): 

185 """ 

186 Collect all unique times from model packages and additional given `times`. These 

187 unique times are used as stress periods in the model. All stress packages must 

188 have the same starting time. 

189 

190 The time discretization in imod-python works as follows: 

191 

192 - The datetimes of all packages you send in are always respected 

193 - Subsequently, the input data you use is always included fully as well 

194 - All times are treated as starting times for the stress: a stress is 

195 always applied until the next specified date 

196 - For this reason, a final time is required to determine the length of 

197 the last stress period 

198 - Additional times can be provided to force shorter stress periods & 

199 more detailed output 

200 - Every stress has to be defined on the first stress period (this is a 

201 modflow requirement) 

202 

203 Or visually (every letter a date in the time axes): 

204 

205 >>> recharge a - b - c - d - e - f 

206 >>> river g - - - - h - - - - j 

207 >>> times - - - - - - - - - - - i 

208 >>> model a - b - c h d - e - f i 

209 

210 

211 with the stress periods defined between these dates. I.e. the model times are the set of all times you include in the model. 

212 

213 Parameters 

214 ---------- 

215 times : str, datetime; or iterable of str, datetimes. 

216 Times to add to the time discretization. At least one single time 

217 should be given, which will be used as the ending time of the 

218 simulation. 

219 

220 Examples 

221 -------- 

222 Add a single time: 

223 

224 >>> m.create_time_discretization("2001-01-01") 

225 

226 Add a daterange: 

227 

228 >>> m.create_time_discretization(pd.daterange("2000-01-01", "2001-01-01")) 

229 

230 Add a list of times: 

231 

232 >>> m.create_time_discretization(["2000-01-01", "2001-01-01"]) 

233 

234 """ 

235 

236 # Make sure it's an iterable 

237 if not isinstance( 

238 additional_times, (np.ndarray, list, tuple, pd.DatetimeIndex) 

239 ): 

240 additional_times = [additional_times] 

241 

242 # Loop through all packages, check if cftime is required. 

243 self.use_cftime = self._use_cftime() 

244 # use_cftime is None if you no datetimes are present in packages 

245 # use_cftime is False if np.datetimes present in packages 

246 # use_cftime is True if cftime.datetime present in packages 

247 for time in additional_times: 

248 if issubclass(type(time), cftime.datetime): 

249 if self.use_cftime is None: 

250 self.use_cftime = True 

251 if self.use_cftime is False: 

252 raise ValueError( 

253 "Use either cftime or numpy.datetime64[ns]. " 

254 f"Received: {type(time)}." 

255 ) 

256 if self.use_cftime is None: 

257 self.use_cftime = False 

258 

259 times = [ 

260 to_datetime_internal(time, self.use_cftime) for time in additional_times 

261 ] 

262 times, first_times = insert_unique_package_times(self.items(), times) 

263 

264 # Check if every transient package commences at the same time. 

265 for key, first_time in first_times.items(): 

266 time0 = times[0] 

267 if (first_time != time0) and not self[key]._is_periodic(): 

268 raise ValueError( 

269 f"Package {key} does not have a value specified for the " 

270 f"first time: {time0}. Every input must be present in the " 

271 "first stress period. Values are only filled forward in " 

272 "time." 

273 ) 

274 

275 duration = timestep_duration(times, self.use_cftime) 

276 # Generate time discretization, just rely on default arguments 

277 # Probably won't be used that much anyway? 

278 times = np.array(times) 

279 timestep_duration_da = xr.DataArray( 

280 duration, coords={"time": times[:-1]}, dims=("time",) 

281 ) 

282 self["time_discretization"] = imod.flow.TimeDiscretization( 

283 timestep_duration=timestep_duration_da, endtime=times[-1] 

284 ) 

285 

286 def _calc_n_entry(self, composed_package, is_boundary_condition): 

287 """Calculate amount of entries for each timestep and variable.""" 

288 

289 def first(d): 

290 """Get first value of dictionary values""" 

291 return next(iter(d.values())) 

292 

293 if is_boundary_condition: 

294 first_variable = first(first(composed_package)) 

295 n_entry = 0 

296 for sys in first_variable.values(): 

297 n_entry += len(sys) 

298 

299 return n_entry 

300 

301 else: # No time and no systems in regular packages 

302 first_variable = first(composed_package) 

303 return len(first_variable) 

304 

305 def _compose_timestrings(self, globaltimes): 

306 time_format = "%Y-%m-%d %H:%M:%S" 

307 time_composed = self["time_discretization"]._compose_values_time( 

308 "time", globaltimes 

309 ) 

310 time_composed = { 

311 timestep_nr: _compose_timestring(time, time_format=time_format) 

312 for timestep_nr, time in time_composed.items() 

313 } 

314 return time_composed 

315 

316 def _compose_periods(self): 

317 periods = {} 

318 

319 for key, package in self.items(): 

320 if package._is_periodic(): 

321 # Periodic stresses are defined for all variables 

322 first_var = list(package.dataset.data_vars)[0] 

323 periods.update(package.dataset[first_var].attrs["stress_periodic"]) 

324 

325 # Create timestrings for "Periods" section in projectfile 

326 # Basically swap around period attributes and compose timestring 

327 # Note that the timeformat for periods in the Projectfile is different 

328 # from that for stress periods 

329 time_format = "%d-%m-%Y %H:%M:%S" 

330 periods_composed = { 

331 value: _compose_timestring(time, time_format=time_format) 

332 for time, value in periods.items() 

333 } 

334 return periods_composed 

335 

336 def _compose_all_packages(self, directory, globaltimes): 

337 """ 

338 Compose all transient packages before rendering. 

339 

340 Required because of outer timeloop 

341 

342 Returns 

343 ------- 

344 A tuple with lists of respectively the composed packages and boundary conditions 

345 """ 

346 bndkey = self._get_pkgkey("bnd") 

347 nlayer = self[bndkey]["layer"].size 

348 

349 composition = initialize_nested_dict(5) 

350 

351 group_packages = self._group() 

352 

353 # Get get pkg_id from first value in dictionary in group list 

354 group_pkg_ids = [next(iter(group.values()))._pkg_id for group in group_packages] 

355 

356 for group in group_packages: 

357 group_composition = group.compose( 

358 directory, 

359 globaltimes, 

360 nlayer, 

361 ) 

362 append_nested_dict(composition, group_composition) 

363 

364 for key, package in self.items(): 

365 if package._pkg_id not in group_pkg_ids: 

366 package_composition = package.compose( 

367 directory.joinpath(key), 

368 globaltimes, 

369 nlayer, 

370 ) 

371 append_nested_dict(composition, package_composition) 

372 

373 return composition 

374 

375 def _render_periods(self, periods_composed): 

376 _template_periods = jinja2.Template( 

377 "Periods\n" 

378 "{%- for key, timestamp in periods.items() %}\n" 

379 "{{key}}\n{{timestamp}}\n" 

380 "{%- endfor %}\n" 

381 ) 

382 

383 return _template_periods.render(periods=periods_composed) 

384 

385 def _render_projectfile(self, directory): 

386 """ 

387 Render projectfile. The projectfile has the hierarchy: 

388 package - time - system - layer 

389 """ 

390 diskey = self._get_pkgkey("dis") 

391 globaltimes = self[diskey]["time"].values 

392 

393 content = [] 

394 

395 composition = self._compose_all_packages(directory, globaltimes) 

396 

397 times_composed = self._compose_timestrings(globaltimes) 

398 

399 periods_composed = self._compose_periods() 

400 

401 # Add period strings to times_composed 

402 # These are the strings atop each stress period in the projectfile 

403 times_composed.update({key: key for key in periods_composed.keys()}) 

404 

405 # Add steady-state for packages without time specified 

406 times_composed["steady-state"] = "steady-state" 

407 

408 rendered = [] 

409 ignored = ["dis", "oc"] 

410 

411 for key, package in self.items(): 

412 pkg_id = package._pkg_id 

413 

414 if (pkg_id in rendered) or (pkg_id in ignored): 

415 continue # Skip if already rendered (for groups) or not necessary to render 

416 

417 kwargs = { 

418 "pkg_id": pkg_id, 

419 "name": package.__class__.__name__, 

420 "variable_order": package._variable_order, 

421 "package_data": composition[pkg_id], 

422 } 

423 

424 if isinstance(package, BoundaryCondition): 

425 kwargs["n_entry"] = self._calc_n_entry(composition[pkg_id], True) 

426 kwargs["times"] = times_composed 

427 else: 

428 kwargs["n_entry"] = self._calc_n_entry(composition[pkg_id], False) 

429 

430 content.append(package._render_projectfile(**kwargs)) 

431 rendered.append(pkg_id) 

432 

433 # Add periods definition 

434 content.append(self._render_periods(periods_composed)) 

435 

436 return "\n\n".join(content) 

437 

438 def _render_runfile(self, directory): 

439 """ 

440 Render runfile. The runfile has the hierarchy: 

441 time - package - system - layer 

442 """ 

443 raise NotImplementedError("Currently only projectfiles can be rendered.") 

444 

445 def render(self, directory, render_projectfile=True): 

446 """ 

447 Render the runfile as a string, package by package. 

448 """ 

449 if render_projectfile: 

450 return self._render_projectfile(directory) 

451 else: 

452 return self._render_runfile(directory) 

453 

454 def _model_path_management( 

455 self, directory, result_dir, resultdir_is_workdir, render_projectfile 

456 ): 

457 # Coerce to pathlib.Path 

458 directory = pathlib.Path(directory) 

459 if result_dir is None: 

460 result_dir = pathlib.Path("results") 

461 else: 

462 result_dir = pathlib.Path(result_dir) 

463 

464 # Create directories if necessary 

465 directory.mkdir(exist_ok=True, parents=True) 

466 result_dir.mkdir(exist_ok=True, parents=True) 

467 

468 if render_projectfile: 

469 ext = ".prj" 

470 else: 

471 ext = ".run" 

472 

473 runfilepath = directory / f"{self.modelname}{ext}" 

474 results_runfilepath = result_dir / f"{self.modelname}{ext}" 

475 

476 # Where will the model run? 

477 # Default is inputdir, next to runfile: 

478 # in that case, resultdir is relative to inputdir 

479 # If resultdir_is_workdir, inputdir is relative to resultdir 

480 # render_dir is the inputdir that is printed in the runfile. 

481 # result_dir is the resultdir that is printed in the runfile. 

482 # caching_reldir is from where to check for files. This location 

483 # is the same as the eventual model working dir. 

484 if resultdir_is_workdir: 

485 caching_reldir = result_dir 

486 if not directory.is_absolute(): 

487 render_dir = _relpath(directory, result_dir) 

488 else: 

489 render_dir = directory 

490 result_dir = pathlib.Path(".") 

491 else: 

492 caching_reldir = directory 

493 render_dir = pathlib.Path(".") 

494 if not result_dir.is_absolute(): 

495 result_dir = _relpath(result_dir, directory) 

496 

497 return result_dir, render_dir, runfilepath, results_runfilepath, caching_reldir 

498 

499 def write( 

500 self, 

501 directory=pathlib.Path("."), 

502 result_dir=None, 

503 resultdir_is_workdir=False, 

504 convert_to="mf2005_namfile", 

505 ): 

506 """ 

507 Writes model input files. 

508 

509 Parameters 

510 ---------- 

511 directory : str, pathlib.Path 

512 Directory into which the model input will be written. The model 

513 input will be written into a directory called modelname. 

514 result_dir : str, pathlib.Path 

515 Path to directory in which output will be written when running the 

516 model. Is written as the value of the ``result_dir`` key in the 

517 runfile. See the examples. 

518 resultdir_is_workdir: boolean, optional 

519 Wether the set all input paths in the runfile relative to the output 

520 directory. Because iMOD-wq generates a number of files in its 

521 working directory, it may be advantageous to set the working 

522 directory to a different path than the runfile location. 

523 convert_to: str 

524 The type of object to convert the projectfile to in the 

525 configuration ini file. Should be one of ``["mf2005_namfile", 

526 "mf6_namfile", "runfile"]``. 

527 

528 Returns 

529 ------- 

530 None 

531 

532 Examples 

533 -------- 

534 Say we wish to write the model input to a file called input, and we 

535 desire that when running the model, the results end up in a directory 

536 called output. We may run: 

537 

538 >>> model.write(directory="input", result_dir="output") 

539 

540 And in the ``config_run.ini``, a value of ``../../output`` will be 

541 written for ``result_dir``. This ``config_run.ini`` has to be called 

542 with iMOD 5 to convert the model projectfile to a Modflow 2005 namfile. 

543 To specify a conversion to a runfile, run: 

544 

545 >>> model.write(directory="input", convert_to="runfile") 

546 

547 You can then run the following command to convert the projectfile to a runfile: 

548 

549 >>> path/to/iMOD5.exe ./input/config_run.ini 

550 

551 `Download iMOD5 here <https://oss.deltares.nl/web/imod/download-imod5>`_ 

552 

553 """ 

554 directory = pathlib.Path(directory) 

555 

556 allowed_conversion_settings = ["mf2005_namfile", "mf6_namfile", "runfile"] 

557 if convert_to not in allowed_conversion_settings: 

558 raise ValueError( 

559 f"Got convert_setting: '{convert_to}', should be one of: {allowed_conversion_settings}" 

560 ) 

561 

562 # Currently only supported, no runfile can be directly written by iMOD Python 

563 # TODO: Add runfile support 

564 render_projectfile = True 

565 

566 # TODO: Find a cleaner way to pack and unpack these paths 

567 ( 

568 result_dir, 

569 render_dir, 

570 runfilepath, 

571 results_runfilepath, 

572 caching_reldir, 

573 ) = self._model_path_management( 

574 directory, result_dir, resultdir_is_workdir, render_projectfile 

575 ) 

576 

577 directory = directory.resolve() # Force absolute paths 

578 

579 # TODO 

580 # Check if any caching packages are present, and set necessary states. 

581 # self._set_caching_packages(caching_reldir) 

582 

583 if self.check is not None: 

584 self.package_check() 

585 

586 # TODO Necessary? 

587 # Delete packages without data 

588 # self._delete_empty_packages(verbose=True) 

589 

590 runfile_content = self.render( 

591 directory=directory, render_projectfile=render_projectfile 

592 ) 

593 

594 # Start writing 

595 # Write the runfile 

596 with open(runfilepath, "w") as f: 

597 f.write(runfile_content) 

598 # Also write the runfile in the workdir 

599 if resultdir_is_workdir: 

600 with open(results_runfilepath, "w") as f: 

601 f.write(runfile_content) 

602 

603 # Write iMOD TIM file 

604 diskey = self._get_pkgkey("dis") 

605 time_path = directory / f"{diskey}.tim" 

606 self[diskey].save(time_path) 

607 

608 # Create and write INI file to configure conversion/simulation 

609 ockey = self._get_pkgkey("oc") 

610 bndkey = self._get_pkgkey("bnd") 

611 nlayer = self[bndkey]["layer"].size 

612 

613 if ockey is None: 

614 raise ValueError("No OutputControl was specified for the model") 

615 else: 

616 oc_configuration = self[ockey]._compose_oc_configuration(nlayer) 

617 

618 outfilepath = directory / runfilepath 

619 

620 RUNFILE_OPTIONS = { 

621 "mf2005_namfile": { 

622 "sim_type": 2, 

623 "namfile_out": outfilepath.with_suffix(".nam"), 

624 }, 

625 "runfile": {"sim_type": 1, "runfile_out": outfilepath.with_suffix(".run")}, 

626 "mf6_namfile": { 

627 "sim_type": 3, 

628 "namfile_out": outfilepath.with_suffix(".nam"), 

629 }, 

630 } 

631 

632 conversion_settings = RUNFILE_OPTIONS[convert_to] 

633 

634 config = IniFile( 

635 function="runfile", 

636 prjfile_in=directory / runfilepath.name, 

637 iss=1, 

638 timfname=directory / time_path.name, 

639 output_folder=result_dir, 

640 **conversion_settings, 

641 **oc_configuration, 

642 ) 

643 config_content = config.render() 

644 

645 with open(directory / "config_run.ini", "w") as f: 

646 f.write(config_content) 

647 

648 # Write all IDFs and IPFs 

649 for pkgname, pkg in self.items(): 

650 if ( 

651 "x" in pkg.dataset.coords and "y" in pkg.dataset.coords 

652 ) or pkg._pkg_id in ["wel", "hfb"]: 

653 try: 

654 pkg.save(directory=directory / pkgname) 

655 except Exception as error: 

656 raise type(error)( 

657 f"{error}/nAn error occured during saving of package: {pkgname}." 

658 ) 

659 

660 def _check_top_bottom(self): 

661 """Check whether bottom of a layer does not exceed a top somewhere.""" 

662 basic_ids = ["top", "bot"] 

663 

664 topkey, botkey = [self._get_pkgkey(pkg_id) for pkg_id in basic_ids] 

665 top, bot = [self[key] for key in (topkey, botkey)] 

666 

667 if (top["top"] < bot["bottom"]).any(): 

668 raise ValueError( 

669 f"top should be larger than bottom in {topkey} and {botkey}" 

670 ) 

671 

672 def package_check(self): 

673 bndkey = self._get_pkgkey("bnd") 

674 active_cells = self[bndkey]["ibound"] != 0 

675 

676 self._check_top_bottom() 

677 

678 for pkg in self.values(): 

679 pkg._pkgcheck(active_cells=active_cells)