Coverage for C:\src\imod-python\imod\mf6\hfb.py: 94%
318 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-16 11:25 +0200
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-16 11:25 +0200
1import abc
2import copy
3import textwrap
4import typing
5from copy import deepcopy
6from enum import Enum
7from typing import TYPE_CHECKING, Optional, Tuple
9import cftime
10import numpy as np
11import xarray as xr
12import xugrid as xu
13from fastcore.dispatch import typedispatch
15from imod.logging import init_log_decorator
16from imod.mf6.boundary_condition import BoundaryCondition
17from imod.mf6.interfaces.ilinedatapackage import ILineDataPackage
18from imod.mf6.mf6_hfb_adapter import Mf6HorizontalFlowBarrier
19from imod.mf6.package import Package
20from imod.mf6.utilities.grid import broadcast_to_full_domain
21from imod.mf6.utilities.regrid import RegridderType
22from imod.schemata import EmptyIndexesSchema
23from imod.typing import GridDataArray
24from imod.util.imports import MissingOptionalModule
26if TYPE_CHECKING:
27 import geopandas as gpd
28else:
29 try:
30 import geopandas as gpd
31 except ImportError:
32 gpd = MissingOptionalModule("geopandas")
34try:
35 import shapely
36except ImportError:
37 shapely = MissingOptionalModule("shapely")
40@typedispatch
41def _derive_connected_cell_ids(
42 idomain: xr.DataArray, grid: xu.Ugrid2d, edge_index: np.ndarray
43):
44 """
45 Derive the cell ids of the connected cells of an edge on a structured grid.
47 Parameters
48 ----------
49 idomain: xr.DataArray
50 The active domain
51 grid :
52 The unstructured grid of the domain
53 edge_index :
54 The indices of the edges from which the connected cell ids are computed
56 Returns A dataset containing the cell_id1 and cell_id2 data variables. The cell dimensions are stored in the
57 cell_dims coordinates.
58 -------
60 """
61 edge_faces = grid.edge_face_connectivity
62 cell2d = edge_faces[edge_index]
64 shape = (idomain["y"].size, idomain["x"].size)
65 row_1, column_1 = np.unravel_index(cell2d[:, 0], shape)
66 row_2, column_2 = np.unravel_index(cell2d[:, 1], shape)
68 cell_ids = xr.Dataset()
70 cell_ids["cell_id1"] = xr.DataArray(
71 np.array([row_1 + 1, column_1 + 1]).T,
72 coords={
73 "edge_index": edge_index,
74 "cell_dims1": ["row_1", "column_1"],
75 },
76 )
78 cell_ids["cell_id2"] = xr.DataArray(
79 np.array([row_2 + 1, column_2 + 1]).T,
80 coords={
81 "edge_index": edge_index,
82 "cell_dims2": ["row_2", "column_2"],
83 },
84 )
86 return cell_ids
89@typedispatch # type: ignore[no-redef]
90def _derive_connected_cell_ids(
91 _: xu.UgridDataArray, grid: xu.Ugrid2d, edge_index: np.ndarray
92):
93 """
94 Derive the cell ids of the connected cells of an edge on an unstructured grid.
96 Parameters
97 ----------
98 grid :
99 The unstructured grid of the domain
100 edge_index :
101 The indices of the edges from which the connected cell ids are computed
103 Returns A dataset containing the cell_id1 and cell_id2 data variables. The cell dimensions are stored in the
104 cell_dims coordinates.
105 -------
107 """
108 edge_faces = grid.edge_face_connectivity
109 cell2d = edge_faces[edge_index]
111 cell2d_1 = cell2d[:, 0]
112 cell2d_2 = cell2d[:, 1]
114 cell_ids = xr.Dataset()
116 cell_ids["cell_id1"] = xr.DataArray(
117 np.array([cell2d_1 + 1]).T,
118 coords={
119 "edge_index": edge_index,
120 "cell_dims1": ["cell2d_1"],
121 },
122 )
124 cell_ids["cell_id2"] = xr.DataArray(
125 np.array([cell2d_2 + 1]).T,
126 coords={
127 "edge_index": edge_index,
128 "cell_dims2": ["cell2d_2"],
129 },
130 )
132 return cell_ids
135def to_connected_cells_dataset(
136 idomain: GridDataArray,
137 grid: xu.Ugrid2d,
138 edge_index: np.ndarray,
139 edge_values: dict,
140) -> xr.Dataset:
141 """
142 Converts a cell edge grid with values defined on the edges to a dataset with the cell ids of the connected cells,
143 the layer of the cells and the value of the edge. The connected cells are returned in cellid notation e.g.(row,
144 column) for structured grid, (mesh2d_nFaces) for unstructured grids.
146 Parameters
147 ----------
148 idomain: GridDataArray
149 active domain
150 grid: xu.Ugrid2d,
151 unstructured grid containing the edge to cell array
152 edge_index: np.ndarray
153 indices of the edges for which the edge values will be converted to values in the connected cells
154 edge_values: dict
155 dictionary containing the value name and the edge values that are applied on the edges identified by the
156 edge_index
158 Returns
159 ----------
160 a dataset containing:
161 - cell_id1
162 - cell_id2
163 - layer
164 - value name
166 """
167 barrier_dataset = _derive_connected_cell_ids(idomain, grid, edge_index)
169 for name, values in edge_values.items():
170 barrier_dataset[name] = xr.DataArray(
171 values,
172 dims=["layer", "edge_index"],
173 coords={
174 "edge_index": edge_index,
175 "layer": values.coords["layer"],
176 },
177 )
179 barrier_dataset = (
180 barrier_dataset.stack(cell_id=("layer", "edge_index"), create_index=False)
181 .drop_vars("edge_index")
182 .reset_coords()
183 )
185 return barrier_dataset.dropna("cell_id")
188def _fraction_layer_overlap(
189 snapped_dataset: xu.UgridDataset,
190 edge_index: np.ndarray,
191 top: xu.UgridDataArray,
192 bottom: xu.UgridDataArray,
193):
194 """
195 Computes the fraction a barrier occupies inside a layer.
196 """
197 left, right = snapped_dataset.ugrid.grid.edge_face_connectivity[edge_index].T
198 top_mean = _mean_left_and_right(top, left, right)
199 bottom_mean = _mean_left_and_right(bottom, left, right)
201 n_layer, n_edge = top_mean.shape
202 layer_bounds = np.empty((n_edge, n_layer, 2), dtype=float)
203 layer_bounds[..., 0] = typing.cast(np.ndarray, bottom_mean.values).T
204 layer_bounds[..., 1] = typing.cast(np.ndarray, top_mean.values).T
206 hfb_bounds = np.empty((n_edge, n_layer, 2), dtype=float)
207 hfb_bounds[..., 0] = (
208 snapped_dataset["zbottom"].values[edge_index].reshape(n_edge, 1)
209 )
210 hfb_bounds[..., 1] = snapped_dataset["ztop"].values[edge_index].reshape(n_edge, 1)
212 overlap = _vectorized_overlap(hfb_bounds, layer_bounds)
213 height = layer_bounds[..., 1] - layer_bounds[..., 0]
214 # Avoid runtime warnings when diving by 0:
215 height[height <= 0] = np.nan
216 fraction = (overlap / height).T
218 return xr.ones_like(top_mean) * fraction
221def _mean_left_and_right(
222 cell_values: xu.UgridDataArray, left: np.ndarray, right: np.ndarray
223) -> xr.Dataset:
224 """
225 This method computes the mean value of cell pairs. The left array specifies the first cell, the right array
226 the second cells. The mean is computed by (first_cell+second_cell/2.0)
228 Parameters
229 ----------
230 cell_values: xu.UgridDataArray
231 The array containing the data values of all the cells
232 left :
233 The array containing indices to the first cells
234 right :
235 The array containing indices to the second cells
237 Returns
238 -------
239 The means of the cells
241 """
242 facedim = cell_values.ugrid.grid.face_dimension
243 uda_left = cell_values.ugrid.obj.isel({facedim: left}).drop_vars(facedim)
244 uda_right = cell_values.ugrid.obj.isel({facedim: right}).drop_vars(facedim)
246 return xr.concat((uda_left, uda_right), dim="two").mean("two")
249def _vectorized_overlap(bounds_a: np.ndarray, bounds_b: np.ndarray):
250 """
251 Vectorized overlap computation. Returns the overlap of 2 vectors along the same axis.
252 If there is no overlap zero will be returned.
254 b1
255 ▲
256 a1 |
257 ▲ |
258 | |
259 | ▼
260 ▼ b0
261 a0
263 To compute the overlap of the 2 vectors the maximum of a0,b0, is subtracted from the minimum of a1,b1.
265 Compare with:
267 overlap = max(0, min(a[1], b[1]) - max(a[0], b[0]))
268 """
269 return np.maximum(
270 0.0,
271 np.minimum(bounds_a[..., 1], bounds_b[..., 1])
272 - np.maximum(bounds_a[..., 0], bounds_b[..., 0]),
273 )
276class BarrierType(Enum):
277 HydraulicCharacteristic = 0
278 Multiplier = 1
279 Resistance = 2
282class HorizontalFlowBarrierBase(BoundaryCondition, ILineDataPackage):
283 _pkg_id = "hfb"
285 _period_data = ()
286 _init_schemata = {}
287 _write_schemata = {"geometry": [EmptyIndexesSchema()]}
289 _regrid_method: dict[str, Tuple[RegridderType, str]] = {}
291 def __init__(
292 self,
293 geometry: "gpd.GeoDataFrame",
294 print_input: bool = False,
295 ) -> None:
296 dict_dataset = {"print_input": print_input}
297 super().__init__(dict_dataset)
299 self.line_data = geometry
301 def get_regrid_methods(self) -> Optional[dict[str, Tuple[RegridderType, str]]]:
302 return self._regrid_method
304 def _get_variable_names_for_gdf(self) -> list[str]:
305 return [
306 self._get_variable_name(),
307 "geometry",
308 ] + self._get_vertical_variables()
310 @property
311 def line_data(self) -> "gpd.GeoDataFrame":
312 variables_for_gdf = self._get_variable_names_for_gdf()
313 return gpd.GeoDataFrame(
314 self.dataset[variables_for_gdf].to_dataframe(),
315 geometry="geometry",
316 )
318 @line_data.setter
319 def line_data(self, value: "gpd.GeoDataFrame") -> None:
320 variables_for_gdf = self._get_variable_names_for_gdf()
321 self.dataset = self.dataset.merge(
322 value.to_xarray(), overwrite_vars=variables_for_gdf, join="right"
323 )
325 def render(self, directory, pkgname, globaltimes, binary):
326 raise NotImplementedError(
327 f"""{self.__class__.__name__} is a grid-agnostic package and does not have a render method. To render the
328 package, first convert to a Modflow6 package by calling pkg.to_mf6_pkg()"""
329 )
331 def _netcdf_encoding(self):
332 return {"geometry": {"dtype": "str"}}
334 @classmethod
335 def from_file(cls, path, **kwargs):
336 instance = super().from_file(path, **kwargs)
337 instance.dataset["geometry"] = shapely.wkt.loads(instance.dataset["geometry"])
339 return instance
341 def _compute_barrier_values(
342 self, snapped_dataset, edge_index, idomain, top, bottom, k
343 ):
344 raise NotImplementedError()
346 def to_mf6_pkg(
347 self,
348 idomain: GridDataArray,
349 top: GridDataArray,
350 bottom: GridDataArray,
351 k: GridDataArray,
352 validate: bool = False,
353 ) -> Mf6HorizontalFlowBarrier:
354 """
355 Write package to Modflow 6 package.
357 Based on the model grid, top and bottoms, the layers in which the barrier belong are computed. If the
358 barrier only partially occupies a layer an effective resistance or hydraulic conductivity for that layer is
359 calculated. This calculation is skipped for the Multiplier type.
361 Parameters
362 ----------
363 idomain: GridDataArray
364 Grid with active cells.
365 top: GridDataArray
366 Grid with top of model layers.
367 bottom: GridDataArray
368 Grid with bottom of model layers.
369 k: GridDataArray
370 Grid with hydraulic conductivities.
371 validate: bool
372 Run validation before converting
374 Returns
375 -------
377 """
378 if validate:
379 self._validate(self._write_schemata)
381 top, bottom = broadcast_to_full_domain(idomain, top, bottom)
382 k = idomain * k
383 unstructured_grid, top, bottom, k = (
384 self.__to_unstructured(idomain, top, bottom, k)
385 if isinstance(idomain, xr.DataArray)
386 else [idomain, top, bottom, k]
387 )
388 snapped_dataset, edge_index = self.__snap_to_grid(idomain)
389 edge_index = self.__remove_invalid_edges(unstructured_grid, edge_index)
391 barrier_values = self._compute_barrier_values(
392 snapped_dataset, edge_index, idomain, top, bottom, k
393 )
394 barrier_values = self.__remove_edge_values_connected_to_inactive_cells(
395 barrier_values, unstructured_grid, edge_index
396 )
398 if (barrier_values.size == 0) | np.isnan(barrier_values).all():
399 raise ValueError(
400 textwrap.dedent(
401 """
402 No barriers could be assigned to cell edges,
403 this is caused by either one of the following:
404 \t- Barriers fall completely outside the model grid
405 \t- Barriers were assigned to the edge of the model domain
406 \t- Barriers were assigned to edges of inactive cells
407 """
408 )
409 )
411 barrier_dataset = to_connected_cells_dataset(
412 idomain,
413 unstructured_grid.ugrid.grid,
414 edge_index,
415 {
416 "hydraulic_characteristic": self.__to_hydraulic_characteristic(
417 barrier_values
418 )
419 },
420 )
422 barrier_dataset["print_input"] = self.dataset["print_input"]
424 return Mf6HorizontalFlowBarrier(**barrier_dataset.data_vars)
426 def is_empty(self) -> bool:
427 if super().is_empty():
428 return True
430 linestrings = self.dataset["geometry"]
431 only_empty_lines = all(ls.is_empty for ls in linestrings.values)
432 return only_empty_lines
434 def _resistance_layer(
435 self,
436 snapped_dataset: xu.UgridDataset,
437 edge_index: np.ndarray,
438 idomain: xu.UgridDataArray,
439 ) -> xr.DataArray:
440 """
441 Returns layered xarray with barrier resistance distributed over layers
442 """
443 hfb_resistance = snapped_dataset[self._get_variable_name()].values[edge_index]
444 hfb_layer = snapped_dataset["layer"].values[edge_index]
445 nlay = idomain.layer.size
446 model_layer = np.repeat(idomain.layer.values, hfb_resistance.size).reshape(
447 (nlay, hfb_resistance.size)
448 )
449 data = np.where(model_layer == hfb_layer, hfb_resistance, np.nan)
450 return xr.DataArray(
451 data=data,
452 coords={
453 "layer": np.arange(nlay) + 1,
454 },
455 dims=["layer", "mesh2d_nFaces"],
456 )
458 def _resistance_layer_overlap(
459 self,
460 snapped_dataset: xu.UgridDataset,
461 edge_index: np.ndarray,
462 top: xu.UgridDataArray,
463 bottom: xu.UgridDataArray,
464 k: xu.UgridDataArray,
465 ) -> xr.DataArray:
466 """
467 Computes the effective value of a barrier that partially overlaps a cell in the z direction.
468 A barrier always lies on an edge in the xy-plane, however in doesn't have to fully extend in the z-direction to
469 cover the entire layer. This method computes the effective resistance in that case.
471 Barrier
472 ...................................... ▲ ▲
473 . @@@@ . | |
474 . @Rb@ . | Lb |
475 . Cell1 @@@@ Cell2 . ▼ | H
476 . : : . |
477 . : : . |
478 .................:..:................. ▼
479 k1 k2
481 The effective value of a partially emerged barrier in a layer is computed by:
482 c_total = 1.0 / (fraction / Rb + (1.0 - fraction) / c_aquifer)
483 c_aquifer = 1.0 / k_mean = 1.0 / ((k1 + k2) / 2.0)
484 fraction = Lb / H
486 """
487 left, right = snapped_dataset.ugrid.grid.edge_face_connectivity[edge_index].T
488 k_mean = _mean_left_and_right(k, left, right)
490 resistance = self.__to_resistance(
491 snapped_dataset[self._get_variable_name()]
492 ).values[edge_index]
494 fraction = _fraction_layer_overlap(snapped_dataset, edge_index, top, bottom)
496 c_aquifer = 1.0 / k_mean
497 inverse_c = (fraction / resistance) + ((1.0 - fraction) / c_aquifer)
498 c_total = 1.0 / inverse_c
500 return self.__from_resistance(c_total)
502 def __to_resistance(self, value: xu.UgridDataArray) -> xu.UgridDataArray:
503 match self._get_barrier_type():
504 case BarrierType.HydraulicCharacteristic:
505 return 1.0 / value
506 case BarrierType.Multiplier:
507 return -1.0 / value
508 case BarrierType.Resistance:
509 return value
511 raise ValueError(r"Unknown barrier type {barrier_type}")
513 def __from_resistance(self, resistance: xr.DataArray) -> xr.DataArray:
514 match self._get_barrier_type():
515 case BarrierType.HydraulicCharacteristic:
516 return 1.0 / resistance
517 case BarrierType.Multiplier:
518 return -1.0 / resistance
519 case BarrierType.Resistance:
520 return resistance
522 raise ValueError(r"Unknown barrier type {barrier_type}")
524 def __to_hydraulic_characteristic(self, value: xr.DataArray) -> xr.DataArray:
525 match self._get_barrier_type():
526 case BarrierType.HydraulicCharacteristic:
527 return value
528 case BarrierType.Multiplier:
529 return -1.0 * value
530 case BarrierType.Resistance:
531 return 1.0 / value
533 raise ValueError(r"Unknown barrier type {barrier_type}")
535 @abc.abstractmethod
536 def _get_barrier_type(self) -> BarrierType:
537 raise NotImplementedError
539 @abc.abstractmethod
540 def _get_variable_name(self) -> str:
541 raise NotImplementedError
543 @abc.abstractmethod
544 def _get_vertical_variables(self) -> list:
545 raise NotImplementedError
547 def clip_box(
548 self,
549 time_min: Optional[cftime.datetime | np.datetime64 | str] = None,
550 time_max: Optional[cftime.datetime | np.datetime64 | str] = None,
551 layer_min: Optional[int] = None,
552 layer_max: Optional[int] = None,
553 x_min: Optional[float] = None,
554 x_max: Optional[float] = None,
555 y_min: Optional[float] = None,
556 y_max: Optional[float] = None,
557 top: Optional[GridDataArray] = None,
558 bottom: Optional[GridDataArray] = None,
559 ) -> "HorizontalFlowBarrierBase":
560 """
561 Clip a package by a bounding box (time, layer, y, x).
563 Slicing intervals may be half-bounded, by providing None:
565 * To select 500.0 <= x <= 1000.0:
566 ``clip_box(x_min=500.0, x_max=1000.0)``.
567 * To select x <= 1000.0: ``clip_box(x_min=None, x_max=1000.0)``
568 or ``clip_box(x_max=1000.0)``.
569 * To select x >= 500.0: ``clip_box(x_min = 500.0, x_max=None.0)``
570 or ``clip_box(x_min=1000.0)``.
572 Parameters
573 ----------
574 time_min: optional
575 time_max: optional
576 layer_min: optional, int
577 layer_max: optional, int
578 x_min: optional, float
579 x_max: optional, float
580 y_min: optional, float
581 y_max: optional, float
582 top: optional, GridDataArray
583 bottom: optional, GridDataArray
585 Returns
586 -------
587 sliced : Package
588 """
589 cls = type(self)
590 new = cls.__new__(cls)
591 new.dataset = copy.deepcopy(self.dataset)
592 return new
594 def mask(self, _) -> Package:
595 """
596 The mask method is irrelevant for this package as it is
597 grid-agnostic, instead this method retuns a copy of itself.
598 """
599 return deepcopy(self)
601 @staticmethod
602 def __to_unstructured(
603 idomain: xr.DataArray, top: xr.DataArray, bottom: xr.DataArray, k: xr.DataArray
604 ) -> Tuple[
605 xu.UgridDataArray, xu.UgridDataArray, xu.UgridDataArray, xu.UgridDataArray
606 ]:
607 unstruct = xu.UgridDataArray.from_structured(idomain)
608 top = xu.UgridDataArray.from_structured(top)
609 bottom = xu.UgridDataArray.from_structured(bottom)
610 k = xu.UgridDataArray.from_structured(k)
612 return unstruct, top, bottom, k
614 def __snap_to_grid(
615 self, idomain: GridDataArray
616 ) -> Tuple[xu.UgridDataset, np.ndarray]:
617 if "layer" in self.dataset:
618 variable_names = [self._get_variable_name(), "geometry", "layer"]
619 else:
620 variable_names = [self._get_variable_name(), "geometry", "ztop", "zbottom"]
621 barrier_dataframe = self.dataset[variable_names].to_dataframe()
623 snapped_dataset, _ = typing.cast(
624 xu.UgridDataset,
625 xu.snap_to_grid(barrier_dataframe, grid=idomain, max_snap_distance=0.5),
626 )
627 edge_index = np.argwhere(
628 snapped_dataset[self._get_variable_name()].notnull().values
629 ).ravel()
631 return snapped_dataset, edge_index
633 @staticmethod
634 def __remove_invalid_edges(
635 unstructured_grid: xu.UgridDataArray, edge_index: np.ndarray
636 ) -> np.ndarray:
637 """
638 Remove invalid edges indices. An edge is considered invalid when:
639 - it lies on an exterior boundary (face_connectivity equals the grid fill value)
640 - The corresponding connected cells are inactive
641 """
642 grid = unstructured_grid.ugrid.grid
643 face_dimension = unstructured_grid.ugrid.grid.face_dimension
644 face_connectivity = grid.edge_face_connectivity[edge_index]
646 valid_edges = np.all(face_connectivity != grid.fill_value, axis=1)
648 connected_cells = -np.ones((len(edge_index), 2))
649 connected_cells[valid_edges, 0] = (
650 unstructured_grid.sel(layer=1)
651 .loc[{face_dimension: face_connectivity[valid_edges, 0]}]
652 .values
653 )
654 connected_cells[valid_edges, 1] = (
655 unstructured_grid.sel(layer=1)
656 .loc[{face_dimension: face_connectivity[valid_edges, 1]}]
657 .values
658 )
660 valid = (connected_cells > 0).all(axis=1)
662 return edge_index[valid]
664 @staticmethod
665 def __remove_edge_values_connected_to_inactive_cells(
666 values, unstructured_grid: xu.UgridDataArray, edge_index: np.ndarray
667 ):
668 face_dimension = unstructured_grid.ugrid.grid.face_dimension
670 face_connectivity = unstructured_grid.ugrid.grid.edge_face_connectivity[
671 edge_index
672 ]
673 connected_cells_left = unstructured_grid.loc[
674 {face_dimension: face_connectivity[:, 0]}
675 ]
676 connected_cells_right = unstructured_grid.loc[
677 {face_dimension: face_connectivity[:, 1]}
678 ]
680 return values.where(
681 (connected_cells_left.drop(face_dimension) > 0)
682 & (connected_cells_right.drop(face_dimension) > 0)
683 )
686class HorizontalFlowBarrierHydraulicCharacteristic(HorizontalFlowBarrierBase):
687 """
688 Horizontal Flow Barrier (HFB) package
690 Input to the Horizontal Flow Barrier (HFB) Package is read from the file
691 that has type "HFB6" in the Name File. Only one HFB Package can be
692 specified for a GWF model.
693 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf
695 Parameters
696 ----------
697 geometry: gpd.GeoDataFrame
698 Dataframe that describes:
699 - geometry: the geometries of the barriers,
700 - hydraulic_characteristic: the hydraulic characteristic of the barriers
701 - ztop: the top z-value of the barriers
702 - zbottom: the bottom z-value of the barriers
703 print_input: bool
705 Examples
706 --------
708 >>> barrier_x = [-1000.0, 0.0, 1000.0]
709 >>> barrier_y = [500.0, 250.0, 500.0]
710 >>> barrier_gdf = gpd.GeoDataFrame(
711 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),],
712 >>> data={
713 >>> "hydraulic_characteristic": [1e-3,],
714 >>> "ztop": [10.0,],
715 >>> "zbottom": [0.0,],
716 >>> },
717 >>> )
718 >>> hfb = imod.mf6.HorizontalFlowBarrierHydraulicCharacteristic(barrier_gdf)
720 """
722 @init_log_decorator()
723 def __init__(
724 self,
725 geometry: "gpd.GeoDataFrame",
726 print_input=False,
727 ):
728 super().__init__(geometry, print_input)
730 def _get_barrier_type(self):
731 return BarrierType.HydraulicCharacteristic
733 def _get_variable_name(self) -> str:
734 return "hydraulic_characteristic"
736 def _get_vertical_variables(self) -> list:
737 return ["ztop", "zbottom"]
739 def _compute_barrier_values(
740 self, snapped_dataset, edge_index, idomain, top, bottom, k
741 ):
742 barrier_values = self._resistance_layer_overlap(
743 snapped_dataset, edge_index, top, bottom, k
744 )
746 return barrier_values
749class LayeredHorizontalFlowBarrierHydraulicCharacteristic(HorizontalFlowBarrierBase):
750 """
751 Horizontal Flow Barrier (HFB) package
753 Input to the Horizontal Flow Barrier (HFB) Package is read from the file
754 that has type "HFB6" in the Name File. Only one HFB Package can be
755 specified for a GWF model.
756 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf
758 Parameters
759 ----------
760 geometry: gpd.GeoDataFrame
761 Dataframe that describes:
762 - geometry: the geometries of the barriers,
763 - hydraulic_characteristic: the hydraulic characteristic of the barriers
764 - layer: model layer for the barrier
765 print_input: bool
767 Examples
768 --------
770 >>> barrier_x = [-1000.0, 0.0, 1000.0]
771 >>> barrier_y = [500.0, 250.0, 500.0]
772 >>> barrier_gdf = gpd.GeoDataFrame(
773 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),],
774 >>> data={
775 >>> "hydraulic_characteristic": [1e-3,],
776 >>> "layer": [1,]
777 >>> },
778 >>> )
779 >>> hfb = imod.mf6.LayeredHorizontalFlowBarrierHydraulicCharacteristic(barrier_gdf)
781 """
783 @init_log_decorator()
784 def __init__(
785 self,
786 geometry: "gpd.GeoDataFrame",
787 print_input=False,
788 ):
789 super().__init__(geometry, print_input)
791 def _get_barrier_type(self):
792 return BarrierType.HydraulicCharacteristic
794 def _get_variable_name(self) -> str:
795 return "hydraulic_characteristic"
797 def _get_vertical_variables(self) -> list:
798 return ["layer"]
800 def _compute_barrier_values(
801 self, snapped_dataset, edge_index, idomain, top, bottom, k
802 ):
803 barrier_values = self._resistance_layer(
804 snapped_dataset,
805 edge_index,
806 idomain,
807 )
809 return barrier_values
812class HorizontalFlowBarrierMultiplier(HorizontalFlowBarrierBase):
813 """
814 Horizontal Flow Barrier (HFB) package
816 Input to the Horizontal Flow Barrier (HFB) Package is read from the file
817 that has type "HFB6" in the Name File. Only one HFB Package can be
818 specified for a GWF model.
819 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf
821 If parts of the barrier overlap a layer the multiplier is applied to the entire layer.
823 Parameters
824 ----------
825 geometry: gpd.GeoDataFrame
826 Dataframe that describes:
827 - geometry: the geometries of the barriers,
828 - multiplier: the multiplier of the barriers
829 - ztop: the top z-value of the barriers
830 - zbottom: the bottom z-value of the barriers
831 print_input: bool
833 Examples
834 --------
836 >>> barrier_x = [-1000.0, 0.0, 1000.0]
837 >>> barrier_y = [500.0, 250.0, 500.0]
838 >>> barrier_gdf = gpd.GeoDataFrame(
839 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),],
840 >>> data={
841 >>> "multiplier": [1.5,],
842 >>> "ztop": [10.0,],
843 >>> "zbottom": [0.0,],
844 >>> },
845 >>> )
846 >>> hfb = imod.mf6.HorizontalFlowBarrierMultiplier(barrier_gdf)
848 """
850 @init_log_decorator()
851 def __init__(
852 self,
853 geometry: "gpd.GeoDataFrame",
854 print_input=False,
855 ):
856 super().__init__(geometry, print_input)
858 def _get_barrier_type(self):
859 return BarrierType.Multiplier
861 def _get_variable_name(self) -> str:
862 return "multiplier"
864 def _get_vertical_variables(self) -> list:
865 return ["ztop", "zbottom"]
867 def _compute_barrier_values(
868 self, snapped_dataset, edge_index, idomain, top, bottom, k
869 ):
870 fraction = _fraction_layer_overlap(snapped_dataset, edge_index, top, bottom)
872 barrier_values = (
873 fraction.where(fraction)
874 * snapped_dataset[self._get_variable_name()].values[edge_index]
875 )
877 return barrier_values
880class LayeredHorizontalFlowBarrierMultiplier(HorizontalFlowBarrierBase):
881 """
882 Horizontal Flow Barrier (HFB) package
884 Input to the Horizontal Flow Barrier (HFB) Package is read from the file
885 that has type "HFB6" in the Name File. Only one HFB Package can be
886 specified for a GWF model.
887 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf
889 If parts of the barrier overlap a layer the multiplier is applied to the entire layer.
891 Parameters
892 ----------
893 geometry: gpd.GeoDataFrame
894 Dataframe that describes:
895 - geometry: the geometries of the barriers,
896 - multiplier: the multiplier of the barriers
897 - layer: model layer for the barrier
898 print_input: bool
900 Examples
901 --------
903 >>> barrier_x = [-1000.0, 0.0, 1000.0]
904 >>> barrier_y = [500.0, 250.0, 500.0]
905 >>> barrier_gdf = gpd.GeoDataFrame(
906 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),],
907 >>> data={
908 >>> "multiplier": [1.5,],
909 >>> "layer": [1,],
910 >>> },
911 >>> )
912 >>> hfb = imod.mf6.LayeredHorizontalFlowBarrierMultiplier(barrier_gdf)
914 """
916 @init_log_decorator()
917 def __init__(
918 self,
919 geometry: "gpd.GeoDataFrame",
920 print_input=False,
921 ):
922 super().__init__(geometry, print_input)
924 def _get_barrier_type(self):
925 return BarrierType.Multiplier
927 def _get_variable_name(self) -> str:
928 return "multiplier"
930 def _get_vertical_variables(self) -> list:
931 return ["layer"]
933 def _compute_barrier_values(
934 self, snapped_dataset, edge_index, idomain, top, bottom, k
935 ):
936 barrier_values = self.__multiplier_layer(snapped_dataset, edge_index, idomain)
938 return barrier_values
940 def __multiplier_layer(
941 self,
942 snapped_dataset: xu.UgridDataset,
943 edge_index: np.ndarray,
944 idomain: xu.UgridDataArray,
945 ) -> xr.DataArray:
946 """
947 Returns layered xarray with barrier multiplier distrubuted over layers
948 """
949 hfb_multiplier = snapped_dataset[self._get_variable_name()].values[edge_index]
950 hfb_layer = snapped_dataset["layer"].values[edge_index]
951 nlay = idomain.layer.size
952 model_layer = np.repeat(idomain.layer.values, hfb_multiplier.shape[0]).reshape(
953 (nlay, hfb_multiplier.shape[0])
954 )
955 data = np.where(model_layer == hfb_layer, hfb_multiplier, np.nan)
956 return xr.DataArray(
957 data=data,
958 coords={
959 "layer": np.arange(nlay) + 1,
960 },
961 dims=["layer", "mesh2d_nFaces"],
962 )
965class HorizontalFlowBarrierResistance(HorizontalFlowBarrierBase):
966 """
967 Horizontal Flow Barrier (HFB) package
969 Input to the Horizontal Flow Barrier (HFB) Package is read from the file
970 that has type "HFB6" in the Name File. Only one HFB Package can be
971 specified for a GWF model.
972 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf
974 Parameters
975 ----------
976 geometry: gpd.GeoDataFrame
977 Dataframe that describes:
978 - geometry: the geometries of the barriers,
979 - resistance: the resistance of the barriers
980 - ztop: the top z-value of the barriers
981 - zbottom: the bottom z-value of the barriers
982 print_input: bool
984 Examples
985 --------
987 >>> barrier_x = [-1000.0, 0.0, 1000.0]
988 >>> barrier_y = [500.0, 250.0, 500.0]
989 >>> barrier_gdf = gpd.GeoDataFrame(
990 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),],
991 >>> data={
992 >>> "resistance": [1e3,],
993 >>> "ztop": [10.0,],
994 >>> "zbottom": [0.0,],
995 >>> },
996 >>> )
997 >>> hfb = imod.mf6.HorizontalFlowBarrierResistance(barrier_gdf)
1000 """
1002 @init_log_decorator()
1003 def __init__(
1004 self,
1005 geometry: "gpd.GeoDataFrame",
1006 print_input=False,
1007 ):
1008 super().__init__(geometry, print_input)
1010 def _get_barrier_type(self):
1011 return BarrierType.Resistance
1013 def _get_variable_name(self) -> str:
1014 return "resistance"
1016 def _get_vertical_variables(self) -> list:
1017 return ["ztop", "zbottom"]
1019 def _compute_barrier_values(
1020 self, snapped_dataset, edge_index, idomain, top, bottom, k
1021 ):
1022 barrier_values = self._resistance_layer_overlap(
1023 snapped_dataset, edge_index, top, bottom, k
1024 )
1026 return barrier_values
1029class LayeredHorizontalFlowBarrierResistance(HorizontalFlowBarrierBase):
1030 """
1031 Horizontal Flow Barrier (HFB) package
1033 Input to the Horizontal Flow Barrier (HFB) Package is read from the file
1034 that has type "HFB6" in the Name File. Only one HFB Package can be
1035 specified for a GWF model.
1036 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf
1038 Parameters
1039 ----------
1040 geometry: gpd.GeoDataFrame
1041 Dataframe that describes:
1042 - geometry: the geometries of the barriers,
1043 - resistance: the resistance of the barriers
1044 - layer: model layer for the barrier
1045 print_input: bool
1047 Examples
1048 --------
1050 >>> barrier_x = [-1000.0, 0.0, 1000.0]
1051 >>> barrier_y = [500.0, 250.0, 500.0]
1052 >>> barrier_gdf = gpd.GeoDataFrame(
1053 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),],
1054 >>> data={
1055 >>> "resistance": [1e3,],
1056 >>> "layer": [2,],
1057 >>> },
1058 >>> )
1059 >>> hfb = imod.mf6.LayeredHorizontalFlowBarrierResistance(barrier_gdf)
1062 """
1064 @init_log_decorator()
1065 def __init__(
1066 self,
1067 geometry: "gpd.GeoDataFrame",
1068 print_input=False,
1069 ):
1070 super().__init__(geometry, print_input)
1072 def _get_barrier_type(self):
1073 return BarrierType.Resistance
1075 def _get_variable_name(self) -> str:
1076 return "resistance"
1078 def _get_vertical_variables(self) -> list:
1079 return ["layer"]
1081 def _compute_barrier_values(
1082 self, snapped_dataset, edge_index, idomain, top, bottom, k
1083 ):
1084 barrier_values = self._resistance_layer(
1085 snapped_dataset,
1086 edge_index,
1087 idomain,
1088 )
1090 return barrier_values