Coverage for C:\src\imod-python\imod\mf6\hfb.py: 90%

318 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-08 13:27 +0200

1import abc 

2import copy 

3import textwrap 

4import typing 

5from copy import deepcopy 

6from enum import Enum 

7from typing import TYPE_CHECKING, Optional, Tuple 

8 

9import cftime 

10import numpy as np 

11import xarray as xr 

12import xugrid as xu 

13from fastcore.dispatch import typedispatch 

14 

15from imod.logging import init_log_decorator 

16from imod.mf6.boundary_condition import BoundaryCondition 

17from imod.mf6.interfaces.ilinedatapackage import ILineDataPackage 

18from imod.mf6.mf6_hfb_adapter import Mf6HorizontalFlowBarrier 

19from imod.mf6.package import Package 

20from imod.mf6.utilities.grid import broadcast_to_full_domain 

21from imod.mf6.utilities.regrid import RegridderType 

22from imod.schemata import EmptyIndexesSchema 

23from imod.typing import GridDataArray 

24from imod.util.imports import MissingOptionalModule 

25 

26if TYPE_CHECKING: 

27 import geopandas as gpd 

28else: 

29 try: 

30 import geopandas as gpd 

31 except ImportError: 

32 gpd = MissingOptionalModule("geopandas") 

33 

34try: 

35 import shapely 

36except ImportError: 

37 shapely = MissingOptionalModule("shapely") 

38 

39 

40@typedispatch # type: ignore[no-redef] 

41def _derive_connected_cell_ids( 

42 idomain: xr.DataArray, grid: xu.Ugrid2d, edge_index: np.ndarray 

43): 

44 """ 

45 Derive the cell ids of the connected cells of an edge on a structured grid. 

46 

47 Parameters 

48 ---------- 

49 idomain: xr.DataArray 

50 The active domain 

51 grid : 

52 The unstructured grid of the domain 

53 edge_index : 

54 The indices of the edges from which the connected cell ids are computed 

55 

56 Returns A dataset containing the cell_id1 and cell_id2 data variables. The cell dimensions are stored in the 

57 cell_dims coordinates. 

58 ------- 

59 

60 """ 

61 edge_faces = grid.edge_face_connectivity 

62 cell2d = edge_faces[edge_index] 

63 

64 shape = (idomain["y"].size, idomain["x"].size) 

65 row_1, column_1 = np.unravel_index(cell2d[:, 0], shape) 

66 row_2, column_2 = np.unravel_index(cell2d[:, 1], shape) 

67 

68 cell_ids = xr.Dataset() 

69 

70 cell_ids["cell_id1"] = xr.DataArray( 

71 np.array([row_1 + 1, column_1 + 1]).T, 

72 coords={ 

73 "edge_index": edge_index, 

74 "cell_dims1": ["row_1", "column_1"], 

75 }, 

76 ) 

77 

78 cell_ids["cell_id2"] = xr.DataArray( 

79 np.array([row_2 + 1, column_2 + 1]).T, 

80 coords={ 

81 "edge_index": edge_index, 

82 "cell_dims2": ["row_2", "column_2"], 

83 }, 

84 ) 

85 

86 return cell_ids 

87 

88 

89@typedispatch # type: ignore[no-redef] 

90def _derive_connected_cell_ids( 

91 _: xu.UgridDataArray, grid: xu.Ugrid2d, edge_index: np.ndarray 

92): 

93 """ 

94 Derive the cell ids of the connected cells of an edge on an unstructured grid. 

95 

96 Parameters 

97 ---------- 

98 grid : 

99 The unstructured grid of the domain 

100 edge_index : 

101 The indices of the edges from which the connected cell ids are computed 

102 

103 Returns A dataset containing the cell_id1 and cell_id2 data variables. The cell dimensions are stored in the 

104 cell_dims coordinates. 

105 ------- 

106 

107 """ 

108 edge_faces = grid.edge_face_connectivity 

109 cell2d = edge_faces[edge_index] 

110 

111 cell2d_1 = cell2d[:, 0] 

112 cell2d_2 = cell2d[:, 1] 

113 

114 cell_ids = xr.Dataset() 

115 

116 cell_ids["cell_id1"] = xr.DataArray( 

117 np.array([cell2d_1 + 1]).T, 

118 coords={ 

119 "edge_index": edge_index, 

120 "cell_dims1": ["cell2d_1"], 

121 }, 

122 ) 

123 

124 cell_ids["cell_id2"] = xr.DataArray( 

125 np.array([cell2d_2 + 1]).T, 

126 coords={ 

127 "edge_index": edge_index, 

128 "cell_dims2": ["cell2d_2"], 

129 }, 

130 ) 

131 

132 return cell_ids 

133 

134 

135def to_connected_cells_dataset( 

136 idomain: GridDataArray, 

137 grid: xu.Ugrid2d, 

138 edge_index: np.ndarray, 

139 edge_values: dict, 

140) -> xr.Dataset: 

141 """ 

142 Converts a cell edge grid with values defined on the edges to a dataset with the cell ids of the connected cells, 

143 the layer of the cells and the value of the edge. The connected cells are returned in cellid notation e.g.(row, 

144 column) for structured grid, (mesh2d_nFaces) for unstructured grids. 

145 

146 Parameters 

147 ---------- 

148 idomain: GridDataArray 

149 active domain 

150 grid: xu.Ugrid2d, 

151 unstructured grid containing the edge to cell array 

152 edge_index: np.ndarray 

153 indices of the edges for which the edge values will be converted to values in the connected cells 

154 edge_values: dict 

155 dictionary containing the value name and the edge values that are applied on the edges identified by the 

156 edge_index 

157 

158 Returns 

159 ---------- 

160 a dataset containing: 

161 - cell_id1 

162 - cell_id2 

163 - layer 

164 - value name 

165 

166 """ 

167 barrier_dataset = _derive_connected_cell_ids(idomain, grid, edge_index) 

168 

169 for name, values in edge_values.items(): 

170 barrier_dataset[name] = xr.DataArray( 

171 values, 

172 dims=["layer", "edge_index"], 

173 coords={ 

174 "edge_index": edge_index, 

175 "layer": values.coords["layer"], 

176 }, 

177 ) 

178 

179 barrier_dataset = ( 

180 barrier_dataset.stack(cell_id=("layer", "edge_index"), create_index=False) 

181 .drop_vars("edge_index") 

182 .reset_coords() 

183 ) 

184 

185 return barrier_dataset.dropna("cell_id") 

186 

187 

188def _fraction_layer_overlap( 

189 snapped_dataset: xu.UgridDataset, 

190 edge_index: np.ndarray, 

191 top: xu.UgridDataArray, 

192 bottom: xu.UgridDataArray, 

193): 

194 """ 

195 Computes the fraction a barrier occupies inside a layer. 

196 """ 

197 left, right = snapped_dataset.ugrid.grid.edge_face_connectivity[edge_index].T 

198 top_mean = _mean_left_and_right(top, left, right) 

199 bottom_mean = _mean_left_and_right(bottom, left, right) 

200 

201 n_layer, n_edge = top_mean.shape 

202 layer_bounds = np.empty((n_edge, n_layer, 2), dtype=float) 

203 layer_bounds[..., 0] = typing.cast(np.ndarray, bottom_mean.values).T 

204 layer_bounds[..., 1] = typing.cast(np.ndarray, top_mean.values).T 

205 

206 hfb_bounds = np.empty((n_edge, n_layer, 2), dtype=float) 

207 hfb_bounds[..., 0] = ( 

208 snapped_dataset["zbottom"].values[edge_index].reshape(n_edge, 1) 

209 ) 

210 hfb_bounds[..., 1] = snapped_dataset["ztop"].values[edge_index].reshape(n_edge, 1) 

211 

212 overlap = _vectorized_overlap(hfb_bounds, layer_bounds) 

213 height = layer_bounds[..., 1] - layer_bounds[..., 0] 

214 # Avoid runtime warnings when diving by 0: 

215 height[height <= 0] = np.nan 

216 fraction = (overlap / height).T 

217 

218 return xr.ones_like(top_mean) * fraction 

219 

220 

221def _mean_left_and_right( 

222 cell_values: xu.UgridDataArray, left: np.ndarray, right: np.ndarray 

223) -> xr.Dataset: 

224 """ 

225 This method computes the mean value of cell pairs. The left array specifies the first cell, the right array 

226 the second cells. The mean is computed by (first_cell+second_cell/2.0) 

227 

228 Parameters 

229 ---------- 

230 cell_values: xu.UgridDataArray 

231 The array containing the data values of all the cells 

232 left : 

233 The array containing indices to the first cells 

234 right : 

235 The array containing indices to the second cells 

236 

237 Returns 

238 ------- 

239 The means of the cells 

240 

241 """ 

242 facedim = cell_values.ugrid.grid.face_dimension 

243 uda_left = cell_values.ugrid.obj.isel({facedim: left}).drop_vars(facedim) 

244 uda_right = cell_values.ugrid.obj.isel({facedim: right}).drop_vars(facedim) 

245 

246 return xr.concat((uda_left, uda_right), dim="two").mean("two") 

247 

248 

249def _vectorized_overlap(bounds_a: np.ndarray, bounds_b: np.ndarray): 

250 """ 

251 Vectorized overlap computation. Returns the overlap of 2 vectors along the same axis. 

252 If there is no overlap zero will be returned. 

253 

254 b1 

255 

256 a1 | 

257 ▲ | 

258 | | 

259 | ▼ 

260 ▼ b0 

261 a0 

262 

263 To compute the overlap of the 2 vectors the maximum of a0,b0, is subtracted from the minimum of a1,b1. 

264 

265 Compare with: 

266 

267 overlap = max(0, min(a[1], b[1]) - max(a[0], b[0])) 

268 """ 

269 return np.maximum( 

270 0.0, 

271 np.minimum(bounds_a[..., 1], bounds_b[..., 1]) 

272 - np.maximum(bounds_a[..., 0], bounds_b[..., 0]), 

273 ) 

274 

275 

276class BarrierType(Enum): 

277 HydraulicCharacteristic = 0 

278 Multiplier = 1 

279 Resistance = 2 

280 

281 

282class HorizontalFlowBarrierBase(BoundaryCondition, ILineDataPackage): 

283 _pkg_id = "hfb" 

284 

285 _period_data = () 

286 _init_schemata = {} 

287 _write_schemata = {"geometry": [EmptyIndexesSchema()]} 

288 

289 _regrid_method: dict[str, Tuple[RegridderType, str]] = {} 

290 

291 def __init__( 

292 self, 

293 geometry: "gpd.GeoDataFrame", 

294 print_input: bool = False, 

295 ) -> None: 

296 dict_dataset = {"print_input": print_input} 

297 super().__init__(dict_dataset) 

298 

299 self.line_data = geometry 

300 

301 def get_regrid_methods(self) -> Optional[dict[str, Tuple[RegridderType, str]]]: 

302 return self._regrid_method 

303 

304 def _get_variable_names_for_gdf(self) -> list[str]: 

305 return [ 

306 self._get_variable_name(), 

307 "geometry", 

308 ] + self._get_vertical_variables() 

309 

310 @property 

311 def line_data(self) -> "gpd.GeoDataFrame": 

312 variables_for_gdf = self._get_variable_names_for_gdf() 

313 return gpd.GeoDataFrame( 

314 self.dataset[variables_for_gdf].to_dataframe(), 

315 geometry="geometry", 

316 ) 

317 

318 @line_data.setter 

319 def line_data(self, value: "gpd.GeoDataFrame") -> None: 

320 variables_for_gdf = self._get_variable_names_for_gdf() 

321 self.dataset = self.dataset.merge( 

322 value.to_xarray(), overwrite_vars=variables_for_gdf, join="right" 

323 ) 

324 

325 def render(self, directory, pkgname, globaltimes, binary): 

326 raise NotImplementedError( 

327 f"""{self.__class__.__name__} is a grid-agnostic package and does not have a render method. To render the 

328 package, first convert to a Modflow6 package by calling pkg.to_mf6_pkg()""" 

329 ) 

330 

331 def _netcdf_encoding(self): 

332 return {"geometry": {"dtype": "str"}} 

333 

334 @classmethod 

335 def from_file(cls, path, **kwargs): 

336 instance = super().from_file(path, **kwargs) 

337 instance.dataset["geometry"] = shapely.wkt.loads(instance.dataset["geometry"]) 

338 

339 return instance 

340 

341 def _compute_barrier_values( 

342 self, snapped_dataset, edge_index, idomain, top, bottom, k 

343 ): 

344 raise NotImplementedError() 

345 

346 def to_mf6_pkg( 

347 self, 

348 idomain: GridDataArray, 

349 top: GridDataArray, 

350 bottom: GridDataArray, 

351 k: GridDataArray, 

352 validate: bool = False, 

353 ) -> Mf6HorizontalFlowBarrier: 

354 """ 

355 Write package to Modflow 6 package. 

356 

357 Based on the model grid, top and bottoms, the layers in which the barrier belong are computed. If the 

358 barrier only partially occupies a layer an effective resistance or hydraulic conductivity for that layer is 

359 calculated. This calculation is skipped for the Multiplier type. 

360 

361 Parameters 

362 ---------- 

363 idomain: GridDataArray 

364 Grid with active cells. 

365 top: GridDataArray 

366 Grid with top of model layers. 

367 bottom: GridDataArray 

368 Grid with bottom of model layers. 

369 k: GridDataArray 

370 Grid with hydraulic conductivities. 

371 validate: bool 

372 Run validation before converting 

373 

374 Returns 

375 ------- 

376 

377 """ 

378 if validate: 

379 self._validate(self._write_schemata) 

380 

381 top, bottom = broadcast_to_full_domain(idomain, top, bottom) 

382 k = idomain * k 

383 unstructured_grid, top, bottom, k = ( 

384 self.__to_unstructured(idomain, top, bottom, k) 

385 if isinstance(idomain, xr.DataArray) 

386 else [idomain, top, bottom, k] 

387 ) 

388 snapped_dataset, edge_index = self.__snap_to_grid(idomain) 

389 edge_index = self.__remove_invalid_edges(unstructured_grid, edge_index) 

390 

391 barrier_values = self._compute_barrier_values( 

392 snapped_dataset, edge_index, idomain, top, bottom, k 

393 ) 

394 barrier_values = self.__remove_edge_values_connected_to_inactive_cells( 

395 barrier_values, unstructured_grid, edge_index 

396 ) 

397 

398 if (barrier_values.size == 0) | np.isnan(barrier_values).all(): 

399 raise ValueError( 

400 textwrap.dedent( 

401 """ 

402 No barriers could be assigned to cell edges, 

403 this is caused by either one of the following: 

404 \t- Barriers fall completely outside the model grid 

405 \t- Barriers were assigned to the edge of the model domain 

406 \t- Barriers were assigned to edges of inactive cells 

407 """ 

408 ) 

409 ) 

410 

411 barrier_dataset = typing.cast( 

412 xr.Dataset, 

413 to_connected_cells_dataset( 

414 idomain, 

415 unstructured_grid.ugrid.grid, 

416 edge_index, 

417 { 

418 "hydraulic_characteristic": self.__to_hydraulic_characteristic( 

419 barrier_values 

420 ) 

421 }, 

422 ), 

423 ) 

424 

425 barrier_dataset["print_input"] = self.dataset["print_input"] 

426 

427 return Mf6HorizontalFlowBarrier(**barrier_dataset.data_vars) 

428 

429 def is_empty(self) -> bool: 

430 if super().is_empty(): 

431 return True 

432 

433 linestrings = self.dataset["geometry"] 

434 only_empty_lines = all([ls.is_empty for ls in linestrings.values]) 

435 return only_empty_lines 

436 

437 def _resistance_layer( 

438 self, 

439 snapped_dataset: xu.UgridDataset, 

440 edge_index: np.ndarray, 

441 idomain: xu.UgridDataArray, 

442 ) -> xr.DataArray: 

443 """ 

444 Returns layered xarray with barrier resistance distributed over layers 

445 """ 

446 hfb_resistance = snapped_dataset[self._get_variable_name()].values[edge_index] 

447 hfb_layer = snapped_dataset["layer"].values[edge_index] 

448 nlay = idomain.layer.size 

449 model_layer = np.repeat(idomain.layer.values, hfb_resistance.size).reshape( 

450 (nlay, hfb_resistance.size) 

451 ) 

452 data = np.where(model_layer == hfb_layer, hfb_resistance, np.nan) 

453 return xr.DataArray( 

454 data=data, 

455 coords={ 

456 "layer": np.arange(nlay) + 1, 

457 }, 

458 dims=["layer", "mesh2d_nFaces"], 

459 ) 

460 

461 def _resistance_layer_overlap( 

462 self, 

463 snapped_dataset: xu.UgridDataset, 

464 edge_index: np.ndarray, 

465 top: xu.UgridDataArray, 

466 bottom: xu.UgridDataArray, 

467 k: xu.UgridDataArray, 

468 ) -> xr.DataArray: 

469 """ 

470 Computes the effective value of a barrier that partially overlaps a cell in the z direction. 

471 A barrier always lies on an edge in the xy-plane, however in doesn't have to fully extend in the z-direction to 

472 cover the entire layer. This method computes the effective resistance in that case. 

473 

474 Barrier 

475 ...................................... ▲ ▲ 

476 . @@@@ . | | 

477 . @Rb@ . | Lb | 

478 . Cell1 @@@@ Cell2 . ▼ | H 

479 . : : . | 

480 . : : . | 

481 .................:..:................. ▼ 

482 k1 k2 

483 

484 The effective value of a partially emerged barrier in a layer is computed by: 

485 c_total = 1.0 / (fraction / Rb + (1.0 - fraction) / c_aquifer) 

486 c_aquifer = 1.0 / k_mean = 1.0 / ((k1 + k2) / 2.0) 

487 fraction = Lb / H 

488 

489 """ 

490 left, right = snapped_dataset.ugrid.grid.edge_face_connectivity[edge_index].T 

491 k_mean = _mean_left_and_right(k, left, right) 

492 

493 resistance = self.__to_resistance( 

494 snapped_dataset[self._get_variable_name()] 

495 ).values[edge_index] 

496 

497 fraction = _fraction_layer_overlap(snapped_dataset, edge_index, top, bottom) 

498 

499 c_aquifer = 1.0 / k_mean 

500 inverse_c = (fraction / resistance) + ((1.0 - fraction) / c_aquifer) 

501 c_total = 1.0 / inverse_c 

502 

503 return self.__from_resistance(c_total) 

504 

505 def __to_resistance(self, value: xu.UgridDataArray) -> xu.UgridDataArray: 

506 match self._get_barrier_type(): 

507 case BarrierType.HydraulicCharacteristic: 

508 return 1.0 / value # type: ignore 

509 case BarrierType.Multiplier: 

510 return -1.0 / value # type: ignore 

511 case BarrierType.Resistance: 

512 return value 

513 

514 raise ValueError(r"Unknown barrier type {barrier_type}") 

515 

516 def __from_resistance(self, resistance: xr.DataArray) -> xr.DataArray: 

517 match self._get_barrier_type(): 

518 case BarrierType.HydraulicCharacteristic: 

519 return 1.0 / resistance 

520 case BarrierType.Multiplier: 

521 return -1.0 / resistance 

522 case BarrierType.Resistance: 

523 return resistance 

524 

525 raise ValueError(r"Unknown barrier type {barrier_type}") 

526 

527 def __to_hydraulic_characteristic(self, value: xr.DataArray) -> xr.DataArray: 

528 match self._get_barrier_type(): 

529 case BarrierType.HydraulicCharacteristic: 

530 return value 

531 case BarrierType.Multiplier: 

532 return -1.0 * value 

533 case BarrierType.Resistance: 

534 return 1.0 / value 

535 

536 raise ValueError(r"Unknown barrier type {barrier_type}") 

537 

538 @abc.abstractmethod 

539 def _get_barrier_type(self) -> BarrierType: 

540 raise NotImplementedError 

541 

542 @abc.abstractmethod 

543 def _get_variable_name(self) -> str: 

544 raise NotImplementedError 

545 

546 @abc.abstractmethod 

547 def _get_vertical_variables(self) -> list: 

548 raise NotImplementedError 

549 

550 def clip_box( 

551 self, 

552 time_min: Optional[cftime.datetime | np.datetime64 | str] = None, 

553 time_max: Optional[cftime.datetime | np.datetime64 | str] = None, 

554 layer_min: Optional[int] = None, 

555 layer_max: Optional[int] = None, 

556 x_min: Optional[float] = None, 

557 x_max: Optional[float] = None, 

558 y_min: Optional[float] = None, 

559 y_max: Optional[float] = None, 

560 top: Optional[GridDataArray] = None, 

561 bottom: Optional[GridDataArray] = None, 

562 state_for_boundary: Optional[GridDataArray] = None, 

563 ) -> "HorizontalFlowBarrierBase": 

564 """ 

565 Clip a package by a bounding box (time, layer, y, x). 

566 

567 Slicing intervals may be half-bounded, by providing None: 

568 

569 * To select 500.0 <= x <= 1000.0: 

570 ``clip_box(x_min=500.0, x_max=1000.0)``. 

571 * To select x <= 1000.0: ``clip_box(x_min=None, x_max=1000.0)`` 

572 or ``clip_box(x_max=1000.0)``. 

573 * To select x >= 500.0: ``clip_box(x_min = 500.0, x_max=None.0)`` 

574 or ``clip_box(x_min=1000.0)``. 

575 

576 Parameters 

577 ---------- 

578 time_min: optional 

579 time_max: optional 

580 layer_min: optional, int 

581 layer_max: optional, int 

582 x_min: optional, float 

583 x_max: optional, float 

584 y_min: optional, float 

585 y_max: optional, float 

586 top: optional, GridDataArray 

587 bottom: optional, GridDataArray 

588 state_for_boundary: optional, GridDataArray 

589 

590 Returns 

591 ------- 

592 sliced : Package 

593 """ 

594 cls = type(self) 

595 new = cls.__new__(cls) 

596 new.dataset = copy.deepcopy(self.dataset) 

597 return new 

598 

599 def mask(self, _) -> Package: 

600 """ 

601 The mask method is irrelevant for this package as it is 

602 grid-agnostic, instead this method retuns a copy of itself. 

603 """ 

604 return deepcopy(self) 

605 

606 @staticmethod 

607 def __to_unstructured( 

608 idomain: xr.DataArray, top: xr.DataArray, bottom: xr.DataArray, k: xr.DataArray 

609 ) -> Tuple[ 

610 xu.UgridDataArray, xu.UgridDataArray, xu.UgridDataArray, xu.UgridDataArray 

611 ]: 

612 unstruct = xu.UgridDataArray.from_structured(idomain) 

613 top = xu.UgridDataArray.from_structured(top) 

614 bottom = xu.UgridDataArray.from_structured(bottom) 

615 k = xu.UgridDataArray.from_structured(k) 

616 

617 return unstruct, top, bottom, k 

618 

619 def __snap_to_grid( 

620 self, idomain: GridDataArray 

621 ) -> Tuple[xu.UgridDataset, np.ndarray]: 

622 if "layer" in self.dataset: 

623 variable_names = [self._get_variable_name(), "geometry", "layer"] 

624 else: 

625 variable_names = [self._get_variable_name(), "geometry", "ztop", "zbottom"] 

626 barrier_dataframe = self.dataset[variable_names].to_dataframe() 

627 

628 snapped_dataset, _ = typing.cast( 

629 xu.UgridDataset, 

630 xu.snap_to_grid(barrier_dataframe, grid=idomain, max_snap_distance=0.5), 

631 ) 

632 edge_index = np.argwhere( 

633 snapped_dataset[self._get_variable_name()].notnull().values 

634 ).ravel() 

635 

636 return snapped_dataset, edge_index 

637 

638 @staticmethod 

639 def __remove_invalid_edges( 

640 unstructured_grid: xu.UgridDataArray, edge_index: np.ndarray 

641 ) -> np.ndarray: 

642 """ 

643 Remove invalid edges indices. An edge is considered invalid when: 

644 - it lies on an exterior boundary (face_connectivity equals the grid fill value) 

645 - The corresponding connected cells are inactive 

646 """ 

647 grid = unstructured_grid.ugrid.grid 

648 face_dimension = unstructured_grid.ugrid.grid.face_dimension 

649 face_connectivity = grid.edge_face_connectivity[edge_index] 

650 

651 valid_edges = np.all(face_connectivity != grid.fill_value, axis=1) 

652 

653 connected_cells = -np.ones((len(edge_index), 2)) 

654 connected_cells[valid_edges, 0] = ( 

655 unstructured_grid.sel(layer=1) 

656 .loc[{face_dimension: face_connectivity[valid_edges, 0]}] 

657 .values 

658 ) 

659 connected_cells[valid_edges, 1] = ( 

660 unstructured_grid.sel(layer=1) 

661 .loc[{face_dimension: face_connectivity[valid_edges, 1]}] 

662 .values 

663 ) 

664 

665 valid = (connected_cells > 0).all(axis=1) 

666 

667 return edge_index[valid] 

668 

669 @staticmethod 

670 def __remove_edge_values_connected_to_inactive_cells( 

671 values, unstructured_grid: xu.UgridDataArray, edge_index: np.ndarray 

672 ): 

673 face_dimension = unstructured_grid.ugrid.grid.face_dimension 

674 

675 face_connectivity = unstructured_grid.ugrid.grid.edge_face_connectivity[ 

676 edge_index 

677 ] 

678 connected_cells_left = unstructured_grid.loc[ 

679 {face_dimension: face_connectivity[:, 0]} 

680 ] 

681 connected_cells_right = unstructured_grid.loc[ 

682 {face_dimension: face_connectivity[:, 1]} 

683 ] 

684 

685 return values.where( 

686 (connected_cells_left.drop(face_dimension) > 0) 

687 & (connected_cells_right.drop(face_dimension) > 0) 

688 ) 

689 

690 

691class HorizontalFlowBarrierHydraulicCharacteristic(HorizontalFlowBarrierBase): 

692 """ 

693 Horizontal Flow Barrier (HFB) package 

694 

695 Input to the Horizontal Flow Barrier (HFB) Package is read from the file 

696 that has type "HFB6" in the Name File. Only one HFB Package can be 

697 specified for a GWF model. 

698 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf 

699 

700 Parameters 

701 ---------- 

702 geometry: gpd.GeoDataFrame 

703 Dataframe that describes: 

704 - geometry: the geometries of the barriers, 

705 - hydraulic_characteristic: the hydraulic characteristic of the barriers 

706 - ztop: the top z-value of the barriers 

707 - zbottom: the bottom z-value of the barriers 

708 print_input: bool 

709 

710 Examples 

711 -------- 

712 

713 >>> barrier_x = [-1000.0, 0.0, 1000.0] 

714 >>> barrier_y = [500.0, 250.0, 500.0] 

715 >>> barrier_gdf = gpd.GeoDataFrame( 

716 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], 

717 >>> data={ 

718 >>> "hydraulic_characteristic": [1e-3,], 

719 >>> "ztop": [10.0,], 

720 >>> "zbottom": [0.0,], 

721 >>> }, 

722 >>> ) 

723 >>> hfb = imod.mf6.HorizontalFlowBarrierHydraulicCharacteristic(barrier_gdf) 

724 

725 """ 

726 

727 @init_log_decorator() 

728 def __init__( 

729 self, 

730 geometry: "gpd.GeoDataFrame", 

731 print_input=False, 

732 ): 

733 super().__init__(geometry, print_input) 

734 

735 def _get_barrier_type(self): 

736 return BarrierType.HydraulicCharacteristic 

737 

738 def _get_variable_name(self) -> str: 

739 return "hydraulic_characteristic" 

740 

741 def _get_vertical_variables(self) -> list: 

742 return ["ztop", "zbottom"] 

743 

744 def _compute_barrier_values( 

745 self, snapped_dataset, edge_index, idomain, top, bottom, k 

746 ): 

747 barrier_values = self._resistance_layer_overlap( 

748 snapped_dataset, edge_index, top, bottom, k 

749 ) 

750 

751 return barrier_values 

752 

753 

754class LayeredHorizontalFlowBarrierHydraulicCharacteristic(HorizontalFlowBarrierBase): 

755 """ 

756 Horizontal Flow Barrier (HFB) package 

757 

758 Input to the Horizontal Flow Barrier (HFB) Package is read from the file 

759 that has type "HFB6" in the Name File. Only one HFB Package can be 

760 specified for a GWF model. 

761 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf 

762 

763 Parameters 

764 ---------- 

765 geometry: gpd.GeoDataFrame 

766 Dataframe that describes: 

767 - geometry: the geometries of the barriers, 

768 - hydraulic_characteristic: the hydraulic characteristic of the barriers 

769 - layer: model layer for the barrier 

770 print_input: bool 

771 

772 Examples 

773 -------- 

774 

775 >>> barrier_x = [-1000.0, 0.0, 1000.0] 

776 >>> barrier_y = [500.0, 250.0, 500.0] 

777 >>> barrier_gdf = gpd.GeoDataFrame( 

778 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], 

779 >>> data={ 

780 >>> "hydraulic_characteristic": [1e-3,], 

781 >>> "layer": [1,] 

782 >>> }, 

783 >>> ) 

784 >>> hfb = imod.mf6.LayeredHorizontalFlowBarrierHydraulicCharacteristic(barrier_gdf) 

785 

786 """ 

787 

788 @init_log_decorator() 

789 def __init__( 

790 self, 

791 geometry: "gpd.GeoDataFrame", 

792 print_input=False, 

793 ): 

794 super().__init__(geometry, print_input) 

795 

796 def _get_barrier_type(self): 

797 return BarrierType.HydraulicCharacteristic 

798 

799 def _get_variable_name(self) -> str: 

800 return "hydraulic_characteristic" 

801 

802 def _get_vertical_variables(self) -> list: 

803 return ["layer"] 

804 

805 def _compute_barrier_values( 

806 self, snapped_dataset, edge_index, idomain, top, bottom, k 

807 ): 

808 barrier_values = self._resistance_layer( 

809 snapped_dataset, 

810 edge_index, 

811 idomain, 

812 ) 

813 

814 return barrier_values 

815 

816 

817class HorizontalFlowBarrierMultiplier(HorizontalFlowBarrierBase): 

818 """ 

819 Horizontal Flow Barrier (HFB) package 

820 

821 Input to the Horizontal Flow Barrier (HFB) Package is read from the file 

822 that has type "HFB6" in the Name File. Only one HFB Package can be 

823 specified for a GWF model. 

824 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf 

825 

826 If parts of the barrier overlap a layer the multiplier is applied to the entire layer. 

827 

828 Parameters 

829 ---------- 

830 geometry: gpd.GeoDataFrame 

831 Dataframe that describes: 

832 - geometry: the geometries of the barriers, 

833 - multiplier: the multiplier of the barriers 

834 - ztop: the top z-value of the barriers 

835 - zbottom: the bottom z-value of the barriers 

836 print_input: bool 

837 

838 Examples 

839 -------- 

840 

841 >>> barrier_x = [-1000.0, 0.0, 1000.0] 

842 >>> barrier_y = [500.0, 250.0, 500.0] 

843 >>> barrier_gdf = gpd.GeoDataFrame( 

844 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], 

845 >>> data={ 

846 >>> "multiplier": [1.5,], 

847 >>> "ztop": [10.0,], 

848 >>> "zbottom": [0.0,], 

849 >>> }, 

850 >>> ) 

851 >>> hfb = imod.mf6.HorizontalFlowBarrierMultiplier(barrier_gdf) 

852 

853 """ 

854 

855 @init_log_decorator() 

856 def __init__( 

857 self, 

858 geometry: "gpd.GeoDataFrame", 

859 print_input=False, 

860 ): 

861 super().__init__(geometry, print_input) 

862 

863 def _get_barrier_type(self): 

864 return BarrierType.Multiplier 

865 

866 def _get_variable_name(self) -> str: 

867 return "multiplier" 

868 

869 def _get_vertical_variables(self) -> list: 

870 return ["ztop", "zbottom"] 

871 

872 def _compute_barrier_values( 

873 self, snapped_dataset, edge_index, idomain, top, bottom, k 

874 ): 

875 fraction = _fraction_layer_overlap(snapped_dataset, edge_index, top, bottom) 

876 

877 barrier_values = ( 

878 fraction.where(fraction) 

879 * snapped_dataset[self._get_variable_name()].values[edge_index] 

880 ) 

881 

882 return barrier_values 

883 

884 

885class LayeredHorizontalFlowBarrierMultiplier(HorizontalFlowBarrierBase): 

886 """ 

887 Horizontal Flow Barrier (HFB) package 

888 

889 Input to the Horizontal Flow Barrier (HFB) Package is read from the file 

890 that has type "HFB6" in the Name File. Only one HFB Package can be 

891 specified for a GWF model. 

892 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf 

893 

894 If parts of the barrier overlap a layer the multiplier is applied to the entire layer. 

895 

896 Parameters 

897 ---------- 

898 geometry: gpd.GeoDataFrame 

899 Dataframe that describes: 

900 - geometry: the geometries of the barriers, 

901 - multiplier: the multiplier of the barriers 

902 - layer: model layer for the barrier 

903 print_input: bool 

904 

905 Examples 

906 -------- 

907 

908 >>> barrier_x = [-1000.0, 0.0, 1000.0] 

909 >>> barrier_y = [500.0, 250.0, 500.0] 

910 >>> barrier_gdf = gpd.GeoDataFrame( 

911 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], 

912 >>> data={ 

913 >>> "multiplier": [1.5,], 

914 >>> "layer": [1,], 

915 >>> }, 

916 >>> ) 

917 >>> hfb = imod.mf6.LayeredHorizontalFlowBarrierMultiplier(barrier_gdf) 

918 

919 """ 

920 

921 @init_log_decorator() 

922 def __init__( 

923 self, 

924 geometry: "gpd.GeoDataFrame", 

925 print_input=False, 

926 ): 

927 super().__init__(geometry, print_input) 

928 

929 def _get_barrier_type(self): 

930 return BarrierType.Multiplier 

931 

932 def _get_variable_name(self) -> str: 

933 return "multiplier" 

934 

935 def _get_vertical_variables(self) -> list: 

936 return ["layer"] 

937 

938 def _compute_barrier_values( 

939 self, snapped_dataset, edge_index, idomain, top, bottom, k 

940 ): 

941 barrier_values = self.__multiplier_layer(snapped_dataset, edge_index, idomain) 

942 

943 return barrier_values 

944 

945 def __multiplier_layer( 

946 self, 

947 snapped_dataset: xu.UgridDataset, 

948 edge_index: np.ndarray, 

949 idomain: xu.UgridDataArray, 

950 ) -> xr.DataArray: 

951 """ 

952 Returns layered xarray with barrier multiplier distrubuted over layers 

953 """ 

954 hfb_multiplier = snapped_dataset[self._get_variable_name()].values[edge_index] 

955 hfb_layer = snapped_dataset["layer"].values[edge_index] 

956 nlay = idomain.layer.size 

957 model_layer = np.repeat(idomain.layer.values, hfb_multiplier.shape[0]).reshape( 

958 (nlay, hfb_multiplier.shape[0]) 

959 ) 

960 data = np.where(model_layer == hfb_layer, hfb_multiplier, np.nan) 

961 return xr.DataArray( 

962 data=data, 

963 coords={ 

964 "layer": np.arange(nlay) + 1, 

965 }, 

966 dims=["layer", "mesh2d_nFaces"], 

967 ) 

968 

969 

970class HorizontalFlowBarrierResistance(HorizontalFlowBarrierBase): 

971 """ 

972 Horizontal Flow Barrier (HFB) package 

973 

974 Input to the Horizontal Flow Barrier (HFB) Package is read from the file 

975 that has type "HFB6" in the Name File. Only one HFB Package can be 

976 specified for a GWF model. 

977 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf 

978 

979 Parameters 

980 ---------- 

981 geometry: gpd.GeoDataFrame 

982 Dataframe that describes: 

983 - geometry: the geometries of the barriers, 

984 - resistance: the resistance of the barriers 

985 - ztop: the top z-value of the barriers 

986 - zbottom: the bottom z-value of the barriers 

987 print_input: bool 

988 

989 Examples 

990 -------- 

991 

992 >>> barrier_x = [-1000.0, 0.0, 1000.0] 

993 >>> barrier_y = [500.0, 250.0, 500.0] 

994 >>> barrier_gdf = gpd.GeoDataFrame( 

995 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], 

996 >>> data={ 

997 >>> "resistance": [1e3,], 

998 >>> "ztop": [10.0,], 

999 >>> "zbottom": [0.0,], 

1000 >>> }, 

1001 >>> ) 

1002 >>> hfb = imod.mf6.HorizontalFlowBarrierResistance(barrier_gdf) 

1003 

1004 

1005 """ 

1006 

1007 @init_log_decorator() 

1008 def __init__( 

1009 self, 

1010 geometry: "gpd.GeoDataFrame", 

1011 print_input=False, 

1012 ): 

1013 super().__init__(geometry, print_input) 

1014 

1015 def _get_barrier_type(self): 

1016 return BarrierType.Resistance 

1017 

1018 def _get_variable_name(self) -> str: 

1019 return "resistance" 

1020 

1021 def _get_vertical_variables(self) -> list: 

1022 return ["ztop", "zbottom"] 

1023 

1024 def _compute_barrier_values( 

1025 self, snapped_dataset, edge_index, idomain, top, bottom, k 

1026 ): 

1027 barrier_values = self._resistance_layer_overlap( 

1028 snapped_dataset, edge_index, top, bottom, k 

1029 ) 

1030 

1031 return barrier_values 

1032 

1033 

1034class LayeredHorizontalFlowBarrierResistance(HorizontalFlowBarrierBase): 

1035 """ 

1036 Horizontal Flow Barrier (HFB) package 

1037 

1038 Input to the Horizontal Flow Barrier (HFB) Package is read from the file 

1039 that has type "HFB6" in the Name File. Only one HFB Package can be 

1040 specified for a GWF model. 

1041 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf 

1042 

1043 Parameters 

1044 ---------- 

1045 geometry: gpd.GeoDataFrame 

1046 Dataframe that describes: 

1047 - geometry: the geometries of the barriers, 

1048 - resistance: the resistance of the barriers 

1049 - layer: model layer for the barrier 

1050 print_input: bool 

1051 

1052 Examples 

1053 -------- 

1054 

1055 >>> barrier_x = [-1000.0, 0.0, 1000.0] 

1056 >>> barrier_y = [500.0, 250.0, 500.0] 

1057 >>> barrier_gdf = gpd.GeoDataFrame( 

1058 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], 

1059 >>> data={ 

1060 >>> "resistance": [1e3,], 

1061 >>> "layer": [2,], 

1062 >>> }, 

1063 >>> ) 

1064 >>> hfb = imod.mf6.LayeredHorizontalFlowBarrierResistance(barrier_gdf) 

1065 

1066 

1067 """ 

1068 

1069 @init_log_decorator() 

1070 def __init__( 

1071 self, 

1072 geometry: "gpd.GeoDataFrame", 

1073 print_input=False, 

1074 ): 

1075 super().__init__(geometry, print_input) 

1076 

1077 def _get_barrier_type(self): 

1078 return BarrierType.Resistance 

1079 

1080 def _get_variable_name(self) -> str: 

1081 return "resistance" 

1082 

1083 def _get_vertical_variables(self) -> list: 

1084 return ["layer"] 

1085 

1086 def _compute_barrier_values( 

1087 self, snapped_dataset, edge_index, idomain, top, bottom, k 

1088 ): 

1089 barrier_values = self._resistance_layer( 

1090 snapped_dataset, 

1091 edge_index, 

1092 idomain, 

1093 ) 

1094 

1095 return barrier_values