Coverage for C:\src\imod-python\imod\formats\gen\gen.py: 95%
200 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 14:15 +0200
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 14:15 +0200
1import io
2import warnings
3from pathlib import Path
4from typing import List, Optional, Tuple, Union
6import numpy as np
7import pandas as pd
8from scipy.io import FortranFile, FortranFormattingError
10from imod.formats.ipf import _infer_delimwhitespace
11from imod.util.imports import MissingOptionalModule
13try:
14 import shapely
15 import shapely.geometry as sg
16except ImportError:
17 sg = MissingOptionalModule("shapely")
18 shapely = MissingOptionalModule("shapely")
20try:
21 import geopandas as gpd
22except ImportError:
23 gpd = MissingOptionalModule("geopandas")
26# ASCII text format:
27# ------------------
28def parse_ascii_points(lines: List[str]) -> "geopandas.GeoDataFrame": # type: ignore # noqa
29 n = len(lines) - 1
30 fid = np.empty(n, dtype=int)
31 x = np.empty(n, dtype=float)
32 y = np.empty(n, dtype=float)
33 for i, line in enumerate(lines[:-1]):
34 fid[i], x[i], y[i] = line.split(",")
36 points = gpd.points_from_xy(x=x, y=y)
37 return gpd.GeoDataFrame(data={"id": fid}, geometry=points)
40def parse_ascii_segments(lines: List[str]):
41 end = [i for i, line in enumerate(lines) if line == "end"][:-1]
42 start = [-1] + end[:-1]
43 features = [lines[s + 1 : e] for s, e in zip(start, end)]
45 n_feature = len(features)
46 n_vertex = [len(f) - 1 for f in features]
48 # Process the feature id as a string. This adds support for id's containing
49 # commas and spaces
50 fid = np.full(n_feature, "", dtype=object)
51 is_polygon = np.empty(n_feature, dtype=bool)
52 indices = np.repeat(np.arange(n_feature), n_vertex)
54 first_coord = features[0][1:][0]
55 has_whitespace = _infer_delimwhitespace(first_coord, 2)
57 vertex_coords = []
58 for i, feature in enumerate(features):
59 fid[i] = feature[0]
60 # Use pd.read_csv instead of np.loadtxt, since it supports files
61 # delimited with multiple whitespace characters.
62 feature_buffer = io.StringIO(initial_value="\n".join(feature[1:]))
63 coords_df = pd.read_csv(
64 feature_buffer, delim_whitespace=has_whitespace, header=None
65 )
66 coords = coords_df.values
67 is_polygon[i] = (coords[0] == coords[-1]).all()
68 vertex_coords.append(coords)
70 coordinates = np.vstack(vertex_coords)
71 polygon = is_polygon.any()
72 if polygon != is_polygon.all():
73 raise ValueError("Some features are linestrings, some are polygons")
75 if polygon:
76 rings = shapely.linearrings(coordinates, indices=indices)
77 geometry = shapely.polygons(rings)
78 else:
79 geometry = shapely.linestrings(coordinates, indices=indices)
81 return gpd.GeoDataFrame(data={"id": fid}, geometry=geometry)
84def read_ascii(path: Union[Path, str]) -> "geopandas.GeoDataFrame": # type: ignore # noqa
85 """
86 Read a text ASCII GEN file to a geopandas GeoDataFrame.
88 Parameters
89 ----------
90 path: Union[str, Path]
92 Returns
93 -------
94 geodataframe: gpd.GeoDataFrame
95 """
96 # From gen itype to shapely geometry:
97 with open(path, "r") as f:
98 lines = [line.lower().strip() for line in f.readlines() if line.strip() != ""]
100 if len(lines[0].split(",")) == 3:
101 return parse_ascii_points(lines)
103 return parse_ascii_segments(lines)
106# Binary format:
107# --------------
108#
109# From the iMOD User Manual
110FLOAT_TYPE = np.float64
111INT_TYPE = np.int32
112HEADER_TYPE = np.int32
113CIRCLE = 1024
114POLYGON = 1025
115RECTANGLE = 1026
116POINT = 1027
117LINE = 1028
118MAX_NAME_WIDTH = 11
120# Map integer enumerators to strings
121GENTYPE_TO_NAME = {
122 CIRCLE: "circle",
123 POLYGON: "polygon",
124 RECTANGLE: "rectangle",
125 POINT: "point",
126 LINE: "line",
127}
128NAME_TO_GENTYPE = {v: k for k, v in GENTYPE_TO_NAME.items()}
131# Forward references, since shapely is optional
132Point = "shapely.geometry.Point"
133Polygon = "shapely.geometry.Polygon"
134LineString = "shapely.geometry.LineString"
137class MyFortranFile(FortranFile):
138 """
139 Unfortunately, the binary GEN files are written as Fortran Record files, so
140 they cannot be read directly with e.g. numpy.fromfile (like direct access)
141 The scipy FortranFile is mostly adequate, it just misses a method to read
142 char records (always ascii encoded; note all ascii is valid utf-8, but not
143 vice versa). Those are added here.
144 """
146 def read_char_record(self):
147 first_size = self._read_size(eof_ok=True)
148 string = self._fp.read(first_size).decode("utf-8")
149 if len(string) != first_size:
150 raise FortranFormattingError("End of file in the middle of a record")
151 second_size = self._read_size(eof_ok=True)
152 if first_size != second_size:
153 raise IOError("Sizes do not agree in the header and footer for this record")
154 return string
156 def write_char_record(self, string: str):
157 total_size = len(string)
158 bytes_string = string.encode("ascii")
159 nb = np.array([total_size], dtype=self._header_dtype)
160 nb.tofile(self._fp)
161 self._fp.write(bytes_string)
162 nb.tofile(self._fp)
165# The binary GEN file has some idiosyncratic geometries:
166# * A separate circle geometry
167# * A seperate rectangle geometry
168# In OGC (WKT) terms, these are polygons.
169#
170# Both are defined by two vertices:
171# * Circle: center and any other outside point (from which we can infer the radius)
172# * Rectangle: (left, lower), (right, upper); may also be (left, upper), (right, lower)
173#
174# The other shapely geometries can be generated directly from the vertices.
177def from_circle(xy: np.ndarray) -> Polygon:
178 radius = np.sqrt(np.sum((xy[1] - xy[0]) ** 2))
179 return sg.Point(xy[0]).buffer(radius)
182def from_point(xy: np.ndarray) -> Polygon:
183 return sg.Point(xy[0])
186def from_rectangle(xy: np.ndarray) -> Polygon:
187 return sg.box(xy[0, 0], xy[0, 1], xy[1, 0], xy[1, 1])
190def to_circle(geometry: Polygon) -> Tuple[np.ndarray, int]:
191 xy = np.array([geometry.centroid.coords[0], geometry.exterior.coords[0]])
192 return xy, 2
195def to_rectangle(geometry: Polygon) -> Tuple[np.ndarray, int]:
196 xy = np.array(geometry.exterior.coords)
197 if (geometry.area / geometry.minimum_rotated_rectangle.area) < 0.999:
198 raise ValueError("Feature_type is rectangle, but geometry is not a rectangular")
199 # First and third vertex will give (left, right) and (lower, upper)
200 return xy[[0, 2]], 2
203def to_polygon(geometry: Polygon) -> Tuple[np.ndarray, int]:
204 xy = np.array(geometry.exterior.coords)
205 return xy, xy.shape[0]
208def to_point(geometry: Point) -> Tuple[np.ndarray, int]:
209 return np.array(geometry.coords), 1
212def to_line(geometry: LineString) -> Tuple[np.ndarray, int]:
213 xy = np.array(geometry.coords)
214 return xy, xy.shape[0]
217def read_binary(path: Union[str, Path]) -> "geopandas.GeoDataFrame": # type: ignore # noqa
218 """
219 Read a binary GEN file to a geopandas GeoDataFrame.
221 Parameters
222 ----------
223 path: Union[str, Path]
225 Returns
226 -------
227 geodataframe: gpd.GeoDataFrame
228 """
229 # From gen itype to shapely geometry:
230 GENTYPE_TO_GEOM = {
231 CIRCLE: from_circle,
232 POLYGON: sg.Polygon,
233 RECTANGLE: from_rectangle,
234 POINT: from_point,
235 LINE: sg.LineString,
236 }
238 with warnings.catch_warnings(record=True):
239 warnings.filterwarnings(
240 "ignore", message="Given a dtype which is not unsigned."
241 )
242 with MyFortranFile(path, mode="r", header_dtype=HEADER_TYPE) as f:
243 f.read_reals(dtype=FLOAT_TYPE) # Skip the bounding box
244 n_feature, n_column = f.read_ints(dtype=INT_TYPE)
245 if n_column > 0:
246 widths = f.read_ints(dtype=INT_TYPE)
247 indices = range(0, (n_column + 1) * MAX_NAME_WIDTH, MAX_NAME_WIDTH)
248 string = f.read_char_record() # pylint:disable=no-member
249 names = [string[i:j].strip() for i, j in zip(indices[:-1], indices[1:])]
251 xy = []
252 rows = []
253 feature_type = np.empty(n_feature, dtype=INT_TYPE)
254 for i in range(n_feature):
255 _, ftype = f.read_ints(dtype=INT_TYPE)
256 feature_type[i] = ftype
257 if n_column > 0:
258 rows.append(f.read_char_record()) # pylint:disable=no-member
259 f.read_reals(dtype=FLOAT_TYPE) # skip the bounding box
260 xy.append(f.read_reals(dtype=FLOAT_TYPE).reshape((-1, 2)))
262 if n_column > 0:
263 df = pd.read_fwf(
264 io.StringIO("\n".join(rows)),
265 widths=widths,
266 names=names,
267 )
268 else:
269 df = pd.DataFrame()
270 df["feature_type"] = feature_type
271 df["feature_type"] = df["feature_type"].replace(GENTYPE_TO_NAME)
273 geometry = []
274 for ftype, geom in zip(feature_type, xy):
275 geometry.append(GENTYPE_TO_GEOM[ftype](geom))
277 return gpd.GeoDataFrame(df, geometry=geometry)
280def vertices(
281 geometry: Union[Point, Polygon, LineString], ftype: str
282) -> Tuple[int, np.ndarray, int]:
283 """
284 Infer from geometry, or convert from string, the feature type to the GEN
285 expected Enum (int).
287 Convert the geometry to the GEN expected vertices, and the number of
288 vertices.
289 """
290 # Checking names with actual geometry types
291 NAME_TO_GEOM = {
292 "circle": sg.Polygon,
293 "rectangle": sg.Polygon,
294 "polygon": sg.Polygon,
295 "point": sg.Point,
296 "line": sg.LineString,
297 }
298 GENTYPE_TO_VERTICES = {
299 CIRCLE: to_circle,
300 RECTANGLE: to_rectangle,
301 POLYGON: to_polygon,
302 POINT: to_point,
303 LINE: to_line,
304 }
305 # Infer gentype on the basis of shapely type
306 GEOM_TO_GENTYPE = {
307 sg.Polygon: POLYGON,
308 sg.Point: POINT,
309 sg.LineString: LINE,
310 }
312 if ftype != "":
313 # Start by checking whether the feature type matches the geometry
314 try:
315 expected = NAME_TO_GEOM[ftype]
316 except KeyError as e:
317 raise ValueError(
318 f"feature_type should be one of {set(NAME_TO_GEOM.keys())}. Got instead {ftype}"
319 ) from e
320 if not isinstance(geometry, expected):
321 raise ValueError(
322 f"Feature type is {ftype}, expected {expected}. Got instead: {type(geometry)}"
323 )
324 ftype: int = NAME_TO_GENTYPE[ftype]
325 else:
326 try:
327 ftype: int = GEOM_TO_GENTYPE[type(geometry)]
328 except KeyError as e:
329 raise TypeError(
330 "Geometry type not allowed. Should be Polygon, Linestring, or Point."
331 f" Got {type(geometry)} instead."
332 ) from e
334 xy, n_vertex = GENTYPE_TO_VERTICES[ftype](geometry)
335 return ftype, xy, n_vertex
338def read(path):
339 """
340 Read a GEN file to a geopandas GeoDataFrame. The function first tries to
341 read as binary, if this fails, it tries to read the gen file as ASCII.
343 If certain that a GEN file is ascii or binary, the user is adviced to use
344 the respective functions :func:`imod.gen.gen.read_ascii` or
345 :func:`imod.gen.gen.read_binary`.
347 Parameters
348 ----------
349 path: Union[str, Path]
351 Returns
352 -------
353 geodataframe: gpd.GeoDataFrame
354 """
356 try:
357 return read_binary(path)
358 except Exception:
359 try:
360 return read_ascii(path)
361 except Exception as e:
362 raise type(e)(f'{e}\nWhile reading GEN file "{path}"')
365def write(
366 path: Union[str, Path],
367 geodataframe: "geopandas.GeoDataFrame", # type: ignore # noqa
368 feature_type: Optional[str] = None,
369) -> None:
370 """
371 Write a GeoDataFrame to a binary GEN file.
373 Note that the binary GEN file has two geometry types, circles and
374 rectangles, which cannot be mapped directly from a shapely type. Points,
375 lines, and polygons can be converted automatically.
377 In shapely, circles and rectangles will also be represented by polygons. To
378 specifically write circles and rectangles to a binary GEN file, an
379 additional column of strings is required which specifies the geometry type.
381 Parameters
382 ----------
383 path : Union[str, Path]
384 geodataframe : gpd.GeoDataFrame
385 feature_type : Optional[str]
386 Which column to interpret as geometry type, one of: point, line, polygon, circle,
387 rectangle. Default value is ``None``.
389 Returns
390 -------
391 None
392 Writes file.
393 """
394 df = pd.DataFrame(geodataframe.drop(columns="geometry")).astype("string").fillna("")
395 if feature_type is not None:
396 types = df.pop(feature_type).str.lower()
397 else: # Create a dummy iterator
398 types = ("" for _ in range(len(df)))
400 n_feature, n_column = df.shape
401 # Truncate column names to 11 chars, then make everything at least 11 chars
402 column_names = "".join([c[:11].ljust(11) for c in df])
403 # Get the widths of the columns. Make room for at least 11
404 widths = []
405 for column in df:
406 width = max(11, df[column].str.len().max())
407 df[column] = df[column].str.pad(width, side="right")
408 widths.append(width)
410 with warnings.catch_warnings(record=True):
411 warnings.filterwarnings(
412 "ignore", message="Given a dtype which is not unsigned."
413 )
414 with MyFortranFile(path, mode="w", header_dtype=HEADER_TYPE) as f:
415 f.write_record(geodataframe.total_bounds.astype(FLOAT_TYPE))
416 f.write_record(np.array([n_feature, n_column], dtype=INT_TYPE))
417 if n_column > 0:
418 f.write_record(np.array(widths).astype(INT_TYPE))
419 f.write_char_record(column_names) # pylint:disable=no-member
420 for geometry, (_, row), ftype in zip(
421 geodataframe.geometry, df.iterrows(), types
422 ):
423 ftype, xy, n_vertex = vertices(geometry, ftype)
424 f.write_record(np.array([n_vertex, ftype], dtype=INT_TYPE))
425 if n_column > 0:
426 f.write_char_record("".join(row.values)) # pylint:disable=no-member
427 f.write_record(np.array(geometry.bounds).astype(FLOAT_TYPE))
428 f.write_record(xy.astype(FLOAT_TYPE))