Coverage for C:\src\imod-python\imod\mf6\utilities\clip.py: 97%

69 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-16 11:25 +0200

1from __future__ import annotations 

2 

3from copy import deepcopy 

4 

5import numpy as np 

6import xarray as xr 

7import xugrid as xu 

8from fastcore.dispatch import typedispatch 

9 

10from imod.mf6.interfaces.ilinedatapackage import ILineDataPackage 

11from imod.mf6.interfaces.ipackagebase import IPackageBase 

12from imod.mf6.interfaces.ipointdatapackage import IPointDataPackage 

13from imod.mf6.utilities.grid import get_active_domain_slice 

14from imod.typing import GridDataArray 

15from imod.typing.grid import bounding_polygon, is_spatial_grid 

16from imod.util.imports import MissingOptionalModule 

17 

18try: 

19 import shapely 

20except ImportError: 

21 shapely = MissingOptionalModule("shapely") 

22 

23 

24@typedispatch 

25def clip_by_grid(_: object, grid: object) -> None: 

26 raise TypeError( 

27 f"'grid' should be of type xr.DataArray, xu.Ugrid2d or xu.UgridDataArray, got {type(grid)}" 

28 ) 

29 

30 

31@typedispatch # type: ignore[no-redef] 

32def clip_by_grid(package: IPackageBase, active: xr.DataArray) -> IPackageBase: # noqa: F811 

33 domain_slice = get_active_domain_slice(active) 

34 x_min, x_max = domain_slice["x"].start, domain_slice["x"].stop 

35 y_min, y_max = domain_slice["y"].stop, domain_slice["y"].start 

36 

37 clipped_package = package.clip_box( 

38 x_min=x_min, x_max=x_max, y_min=y_min, y_max=y_max 

39 ) 

40 

41 _filter_inactive_cells(clipped_package, active.sel(domain_slice)) 

42 

43 return clipped_package 

44 

45 

46@typedispatch # type: ignore[no-redef] 

47def clip_by_grid(package: IPackageBase, active: xu.UgridDataArray) -> IPackageBase: # noqa: F811 

48 domain_slice = get_active_domain_slice(active) 

49 

50 clipped_dataset = package.dataset.isel(domain_slice, missing_dims="ignore") 

51 

52 cls = type(package) 

53 new = cls.__new__(cls) 

54 new.dataset = clipped_dataset 

55 return new 

56 

57 

58@typedispatch # type: ignore[no-redef] 

59def clip_by_grid( # noqa: F811 

60 package: IPointDataPackage, active: xu.UgridDataArray 

61) -> IPointDataPackage: 

62 """Clip PointDataPackage outside unstructured grid.""" 

63 

64 domain_slice = get_active_domain_slice(active) 

65 active_clipped = active.isel(domain_slice, missing_dims="ignore") 

66 

67 points = np.column_stack((package.x, package.y)) 

68 

69 is_inside_exterior = active_clipped.grid.locate_points(points) != -1 

70 selection = package.dataset.loc[{"index": is_inside_exterior}] 

71 

72 cls = type(package) 

73 new = cls.__new__(cls) 

74 new.dataset = selection 

75 return new 

76 

77 

78def _filter_inactive_cells(package, active): 

79 if package.is_grid_agnostic_package(): 

80 return 

81 

82 package_vars = package.dataset.data_vars 

83 for var in package_vars: 

84 if package_vars[var].shape != (): 

85 if is_spatial_grid(package.dataset[var]): 

86 if np.issubdtype(package.dataset[var].dtype, np.integer): 

87 other = 0 

88 else: 

89 other = np.nan 

90 package.dataset[var] = package.dataset[var].where( 

91 active > 0, other=other 

92 ) 

93 

94 

95@typedispatch # type: ignore[no-redef] 

96def clip_by_grid(package: ILineDataPackage, active: GridDataArray) -> ILineDataPackage: # noqa: F811 

97 """Clip LineDataPackage outside unstructured/structured grid.""" 

98 

99 # Clip line with polygon 

100 bounding_gdf = bounding_polygon(active) 

101 clipped_line_data = package.line_data.clip(bounding_gdf) 

102 

103 # Catch edge case: when line crosses only vertex of polygon, a point 

104 # or multipoint is returned. Drop these. 

105 type_ids = shapely.get_type_id(clipped_line_data.geometry) 

106 is_points = (type_ids == shapely.GeometryType.POINT) | ( 

107 type_ids == shapely.GeometryType.MULTIPOINT 

108 ) 

109 clipped_line_data = clipped_line_data[~is_points] 

110 

111 # Convert MultiLineStrings to LineStrings 

112 clipped_line_data = clipped_line_data.explode("geometry", ignore_index=True) 

113 

114 # Create new instance 

115 clipped_package = deepcopy(package) 

116 clipped_package.line_data = clipped_line_data 

117 return clipped_package