Coverage for C:\src\imod-python\imod\mf6\out\cbc.py: 93%

122 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-16 11:25 +0200

1""" 

2Cell-by-cell flows 

3""" 

4 

5import os 

6import struct 

7from collections import defaultdict 

8from typing import Any, BinaryIO, Dict, List, NamedTuple, Tuple, Union 

9 

10import dask 

11import numpy as np 

12import xarray as xr 

13 

14from .common import FilePath, FloatArray 

15 

16 

17class Imeth1Header(NamedTuple): 

18 kstp: int 

19 kper: int 

20 text: str 

21 ndim1: int 

22 ndim2: int 

23 ndim3: int 

24 imeth: int 

25 delt: float 

26 pertim: float 

27 totim: float 

28 pos: int 

29 

30 

31class Imeth6Header(NamedTuple): 

32 kstp: int 

33 kper: int 

34 text: str 

35 ndim1: int 

36 ndim2: int 

37 ndim3: int 

38 imeth: int 

39 delt: float 

40 pertim: float 

41 totim: float 

42 pos: int 

43 txt1id1: str 

44 txt2id1: str 

45 txt1id2: str 

46 txt2id2: str 

47 ndat: int 

48 auxtxt: List[str] 

49 nlist: int 

50 

51 

52def read_common_cbc_header(f: BinaryIO) -> Dict[str, Any]: 

53 """ 

54 Read the common part (shared by imeth=1 and imeth6) of a CBC header section. 

55 """ 

56 content = {} 

57 content["kstp"] = struct.unpack("i", f.read(4))[0] 

58 content["kper"] = struct.unpack("i", f.read(4))[0] 

59 content["text"] = f.read(16).decode("utf-8").strip().lower() 

60 content["ndim1"] = struct.unpack("i", f.read(4))[0] 

61 content["ndim2"] = struct.unpack("i", f.read(4))[0] 

62 content["ndim3"] = struct.unpack("i", f.read(4))[0] 

63 content["imeth"] = struct.unpack("i", f.read(4))[0] 

64 content["delt"] = struct.unpack("d", f.read(8))[0] 

65 content["pertim"] = struct.unpack("d", f.read(8))[0] 

66 content["totim"] = struct.unpack("d", f.read(8))[0] 

67 return content 

68 

69 

70def read_imeth6_header(f: BinaryIO) -> Dict[str, Any]: 

71 """ 

72 Read the imeth=6 specific data of a CBC header section. 

73 """ 

74 content: Dict[str, str | List[str]] = {} 

75 content["txt1id1"] = f.read(16).decode("utf-8").strip().lower() 

76 content["txt2id1"] = f.read(16).decode("utf-8").strip().lower() 

77 content["txt1id2"] = f.read(16).decode("utf-8").strip().lower() 

78 content["txt2id2"] = f.read(16).decode("utf-8").strip().lower() 

79 ndat = struct.unpack("i", f.read(4))[0] 

80 content["ndat"] = ndat 

81 content["auxtxt"] = [ 

82 f.read(16).decode("utf-8").strip().lower() for _ in range(ndat - 1) 

83 ] 

84 content["nlist"] = struct.unpack("i", f.read(4))[0] 

85 return content 

86 

87 

88def read_cbc_headers( 

89 cbc_path: FilePath, 

90) -> Dict[str, List[Union[Imeth1Header, Imeth6Header]]]: 

91 """ 

92 Read all the header data from a cell-by-cell (.cbc) budget file. 

93 

94 All budget data for a MODFLOW6 model is stored in a single file. This 

95 function collects all header data, as well as the starting byte position of 

96 the actual budget data. 

97 

98 This function groups the headers per TEXT record (e.g. "flow-ja-face", 

99 "drn", etc.). The headers are stored as a list of named tuples. 

100 flow-ja-face, storage-ss, and storage-sy are written using IMETH=1, all 

101 others with IMETH=6. 

102 

103 Parameters 

104 ---------- 

105 cbc_path: str, pathlib.Path 

106 Path to the budget file. 

107 

108 Returns 

109 ------- 

110 headers: Dict[List[UnionImeth1Header, Imeth6Header]] 

111 Dictionary containing a list of headers per TEXT record in the budget 

112 file. 

113 """ 

114 headers: Dict[str, List[Union[Imeth1Header, Imeth6Header]]] = defaultdict(list) 

115 with open(cbc_path, "rb") as f: 

116 filesize = os.fstat(f.fileno()).st_size 

117 while f.tell() < filesize: 

118 header = read_common_cbc_header(f) 

119 if header["imeth"] == 1: 

120 # Multiply by -1 because ndim3 is stored as a negative for some reason. 

121 # (ndim3 is the integer size of the third dimension) 

122 datasize = ( 

123 header["ndim1"] * header["ndim2"] * header["ndim3"] * -1 

124 ) * 8 

125 header["pos"] = f.tell() 

126 key = header["text"] 

127 headers[key].append(Imeth1Header(**header)) 

128 elif header["imeth"] == 6: 

129 imeth6_header = read_imeth6_header(f) 

130 datasize = imeth6_header["nlist"] * (8 + imeth6_header["ndat"] * 8) 

131 header["pos"] = f.tell() 

132 # key-format: "package type"-"optional_package_variable"_"package name" 

133 # for river output: riv_sys1 

134 # for uzf output: uzf-gwrch_uzf_sys1 

135 key = header["text"] + "_" + imeth6_header["txt2id2"] 

136 # npf-key can be present multiple times in cases of saved saturation + specific discharge 

137 if header["text"].startswith("data-"): 

138 key = ( 

139 imeth6_header["txt2id2"] 

140 + "_" 

141 + header["text"].replace("data-", "") 

142 ) 

143 headers[key].append(Imeth6Header(**header, **imeth6_header)) 

144 else: 

145 raise ValueError( 

146 f"Invalid imeth value in CBC file {cbc_path}. " 

147 f"Should be 1 or 6, received: {header['imeth']}." 

148 ) 

149 # Skip the data 

150 f.seek(datasize, 1) 

151 return headers 

152 

153 

154def read_imeth1_budgets(cbc_path: FilePath, count: int, pos: int) -> FloatArray: 

155 """ 

156 Read the data for an imeth=1 budget section. 

157 

158 Parameters 

159 ---------- 

160 cbc_path: str, pathlib.Path 

161 count: int 

162 number of values to read 

163 pos: 

164 position in the file where the data for a timestep starts 

165 

166 Returns 

167 ------- 

168 1-D array of floats 

169 """ 

170 with open(cbc_path, "rb") as f: 

171 f.seek(pos) 

172 timestep_budgets = np.fromfile(f, np.float64, count) 

173 return timestep_budgets 

174 

175 

176def open_imeth1_budgets( 

177 cbc_path: FilePath, header_list: List[Imeth1Header] 

178) -> xr.DataArray: 

179 """ 

180 Open the data for an imeth==1 budget section. Data is read lazily per 

181 timestep. The cell data is not spatially labelled. 

182 

183 Parameters 

184 ---------- 

185 cbc_path: str, pathlib.Path 

186 header_list: List[Imeth1Header] 

187 

188 Returns 

189 ------- 

190 xr.DataArray with dims ("time", "linear_index") 

191 """ 

192 # Gather times from the headers 

193 dask_list = [] 

194 time = np.empty(len(header_list), dtype=np.float64) 

195 for i, header in enumerate(header_list): 

196 time[i] = header.totim 

197 count = header.ndim1 * header.ndim2 * header.ndim3 * -1 

198 a = dask.delayed(read_imeth1_budgets)(cbc_path, count, header.pos) 

199 x = dask.array.from_delayed(a, shape=(count,), dtype=np.float64) 

200 dask_list.append(x) 

201 

202 return xr.DataArray( 

203 data=dask.array.stack(dask_list, axis=0), 

204 coords={"time": time}, 

205 dims=("time", "linear_index"), 

206 name=header_list[0].text, 

207 ) 

208 

209 

210def expand_indptr(ia) -> np.ndarray: 

211 n = np.diff(ia) 

212 return np.repeat(np.arange(ia.size - 1), n) 

213 

214 

215def open_face_budgets_as_flowja( 

216 cbc_path: FilePath, header_list: List[Imeth1Header], grb_content: Dict[str, Any] 

217) -> Tuple[xr.DataArray, xr.DataArray]: 

218 flowja = open_imeth1_budgets(cbc_path, header_list) 

219 flowja = flowja.rename({"linear_index": "connection"}) 

220 n = expand_indptr(grb_content["ia"]) 

221 m = grb_content["ja"] - 1 

222 nm = xr.DataArray( 

223 np.column_stack([n, m]), 

224 coords={"cell": ["n", "m"]}, 

225 dims=["connection", "cell"], 

226 ) 

227 return flowja, nm 

228 

229 

230def read_imeth6_budgets( 

231 cbc_path: FilePath, count: int, dtype: np.dtype, pos: int 

232) -> Any: 

233 """ 

234 Read the data for an imeth==6 budget section for a single timestep. 

235 

236 Returns a numpy structured array containing: 

237 * id1: the model cell number 

238 * id2: the boundary condition index 

239 * budget: the budget terms 

240 * and assorted auxiliary columns, if present 

241 

242 Parameters 

243 ---------- 

244 cbc_path: str, pathlib.Path 

245 count: int 

246 number of values to read 

247 dtype: numpy dtype 

248 Data type of the structured array. Contains at least "id1", "id2", and "budget". 

249 Optionally contains auxiliary columns. 

250 pos: 

251 position in the file where the data for a timestep starts 

252 

253 Returns 

254 ------- 

255 Numpy structured array of type dtype 

256 """ 

257 with open(cbc_path, "rb") as f: 

258 f.seek(pos) 

259 table = np.fromfile(f, dtype, count) 

260 return table 

261 

262 

263def read_imeth6_budgets_dense( 

264 cbc_path: FilePath, 

265 count: int, 

266 dtype: np.dtype, 

267 pos: int, 

268 size: int, 

269 shape: tuple, 

270 return_variable: str, 

271 indices: np.ndarray | None, 

272) -> FloatArray: 

273 """ 

274 Read the data for an imeth==6 budget section. 

275 

276 Utilizes the shape information from the DIS GRB file to create a dense numpy 

277 array. Always allocates for the entire domain (all layers, rows, columns). 

278 

279 Parameters 

280 ---------- 

281 cbc_path: str, pathlib.Path 

282 count: int 

283 number of values to read 

284 dtype: numpy dtype 

285 Data type of the structured array. Contains at least "id1", "id2", and "budget". 

286 Optionally contains auxiliary columns. 

287 pos: int 

288 position in the file where the data for a timestep starts 

289 size: int 

290 size of the entire model domain 

291 shape: tuple[int, int, int] 

292 Shape (nlayer, nrow, ncolumn) of entire model domain. 

293 return_variable: str 

294 variable name to return from budget table 

295 indices: np.ndarray | None 

296 optional array that contains the indices to map return_variable to model topology 

297 

298 Returns 

299 ------- 

300 Three-dimensional array of floats 

301 """ 

302 # Allocates a dense array for the entire domain 

303 out = np.full(size, np.nan, dtype=np.float64) 

304 table = read_imeth6_budgets(cbc_path, count, dtype, pos) 

305 if indices is None: 

306 indices = table["id1"] - 1 # Convert to 0 based index 

307 out[indices] = table[return_variable] 

308 return out.reshape(shape)