Coverage for src / autoencodix / utils / _imgreader.py: 17%

147 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 10:09 +0200

1import os 

2from pathlib import Path 

3from typing import List, Literal, Optional, Union, Dict, Tuple 

4 

5import cv2 

6import numpy as np 

7import pandas as pd 

8from autoencodix.configs.default_config import DefaultConfig, DataInfo 

9from autoencodix.data._imgdataclass import ImgData 

10 

11 

12class ImageProcessingError(Exception): 

13 pass 

14 

15 

16class ImageSizeFinder: 

17 """Finds nearest quadratic image size that is dividable by 2^number_of_layers.ArithmeticError 

18 

19 Nearest quadratic image size is based on the given image size in the config file. 

20 

21 Attributes: 

22 config: Configuration object 

23 width: request image width by user. 

24 height: requested image height by user. 

25 

26 """ 

27 

28 def __init__(self, config: DefaultConfig): 

29 """Inits the ImageSizeFinder 

30 

31 Args: 

32 config: Configuration object. 

33 """ 

34 self.config = config 

35 found_image_type = False 

36 for data_type in config.data_config.data_info.keys(): 

37 if config.data_config.data_info[data_type].data_type == "IMG": 

38 self.width = config.data_config.data_info[data_type].img_width_resize 

39 self.height = config.data_config.data_info[data_type].img_height_resize 

40 found_image_type = True 

41 if not found_image_type: 

42 raise ValueError("You need to provide a DATA_TYPE of with the TYPE key IMG") 

43 

44 self.n_conv_layers = 5 

45 

46 def _image_size_is_legal(self) -> bool: 

47 """Checks if image size is dividable by 2^number_of_layers to the givensize in the config. 

48 

49 Returns: 

50 bool if image size is allowed. 

51 """ 

52 return self.width % 2**self.n_conv_layers == 0 

53 

54 def get_nearest_quadratic_image_size(self): 

55 """Finds nearest quadratic image size that is dividable by 2^number_of_layers 

56 

57 Nearest quadratic image size is based on the given image size in the config file. 

58 

59 Returns: 

60 Tuple of ints width and height with widht=height. 

61 Raies: 

62 ValueError: if not allowed image size can be found. 

63 """ 

64 

65 if self._image_size_is_legal(): 

66 print( 

67 f"Given image size is possible, rescaling images to: {self.width}x{self.height}" 

68 ) 

69 return self.width, self.height 

70 running_image_size = self.width 

71 while_loop_counter = 0 

72 while not running_image_size % 2**self.n_conv_layers == 0: 

73 running_image_size += 1 

74 while_loop_counter += 1 

75 if while_loop_counter > 10000: 

76 raise ValueError( 

77 f"Could not find a quadratic image size that is dividable by 2^{self.n_conv_layers}" 

78 ) 

79 print( 

80 f"Given image size{self.width}x{self.height} is not possible, rescaling to: {running_image_size}x{running_image_size}" 

81 ) 

82 if running_image_size is None: 

83 raise ValueError( 

84 f"Could not find a quadratic image size that is dividable by 2^{self.n_conv_layers}" 

85 ) 

86 

87 return running_image_size, running_image_size 

88 

89 

90class ImageDataReader: 

91 """Reads and processes image data. 

92 

93 Reads all images from the specified directory, processes them, 

94 and returns a list of ImgData objects. 

95 """ 

96 

97 def __init__(self, config: DefaultConfig): 

98 self.config = config 

99 

100 def validate_image_path(self, image_path: Union[str, Path]) -> bool: 

101 """Checks if file extension is allowed: 

102 

103 Allowed are (independent of capitalization): 

104 - jpg 

105 - jpeg 

106 - png 

107 - tif 

108 - tiff 

109 

110 Args: 

111 image_path: path or str of image to read 

112 """ 

113 path = Path(image_path) if isinstance(image_path, str) else image_path 

114 return ( 

115 path.exists() 

116 and path.is_file() 

117 and path.suffix.lower() in {".jpg", ".jpeg", ".png", ".tif", ".tiff"} 

118 ) 

119 

120 def parse_image_to_tensor( 

121 self, 

122 image_path: Union[str, Path], 

123 to_h: Optional[int] = None, 

124 to_w: Optional[int] = None, 

125 ) -> np.ndarray: 

126 """Reads an image from the given path, optionally resizes it, and converts it to a tensor. 

127 

128 Args: 

129 image_path: The path to the image file. 

130 to_h: The desired height of the output tensor, by default None. 

131 to_w: The desired width of the output tensor, by default None. 

132 

133 Returns: 

134 The processed image as a tensor. 

135 

136 Raises: 

137 FileNotFoundError: If the image path is invalid or the image cannot be read. 

138 ImageProcessingError: If the image format is unsupported or an unexpected error occurs during processing. 

139 """ 

140 

141 if not self.validate_image_path(image_path): 

142 raise FileNotFoundError(f"Invalid image path: {image_path}") 

143 image_path = Path(image_path) 

144 SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".tif", ".tiff"} 

145 if image_path.suffix.lower() not in SUPPORTED_EXTENSIONS: 

146 raise ImageProcessingError( 

147 f"Unsupported image format: {image_path.suffix}. " 

148 f"Supported formats are: {', '.join(SUPPORTED_EXTENSIONS)}" 

149 ) 

150 try: 

151 if image_path.suffix.lower() in {".tif", ".tiff"}: 

152 image = cv2.imread(str(image_path), cv2.IMREAD_UNCHANGED) 

153 else: 

154 image = cv2.imread(str(image_path)) 

155 

156 if image is None: 

157 raise FileNotFoundError(f"Failed to read image: {image_path}") 

158 

159 h, w, _ = image.shape[:3] 

160 if to_h is None: 

161 to_h = h 

162 if to_w is None: 

163 to_w = w 

164 

165 if not (2 <= len(image.shape) <= 3): 

166 raise ImageProcessingError( 

167 f"Image has unsupported shape: {image.shape}. " 

168 "Supported shapes are 2D and 3D." 

169 ) 

170 

171 image = cv2.resize(image, (to_w, to_h), interpolation=cv2.INTER_AREA) 

172 

173 if len(image.shape) == 3: 

174 image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) 

175 if len(image.shape) == 2: 

176 image = np.expand_dims(image, axis=2) 

177 

178 image = image.transpose(2, 0, 1) 

179 return image 

180 

181 except Exception as e: 

182 raise e 

183 

184 def read_all_images_from_dir( 

185 self, 

186 img_dir: str, 

187 to_h: Optional[int], 

188 to_w: Optional[int], 

189 annotation_df: pd.DataFrame, 

190 is_paired: Union[bool, None] = None, 

191 ) -> List[ImgData]: 

192 """Reads all images from a specified directory, processes them, returns list of ImgData objects. 

193 

194 Args: 

195 img_dir: The directory containing the images. 

196 to_h: The desired height of the output tensors. 

197 to_w: The desired width of the output tensors. 

198 annotation_df: DataFrame containing image annotations. 

199 is_paired: Whether the images are paired with annotations. 

200 

201 Returns: 

202 List of processed image data objects. 

203 

204 Raises: 

205 ValueError: If the annotation DataFrame is missing required columns. 

206 """ 

207 if self.config.img_path_col not in annotation_df.columns: 

208 raise ValueError( 

209 f" The defined column for image paths: {self.config.img_path_col} column is missing in the annotation_df\ 

210 you can define this in the config via the param `img_path_col`" 

211 ) 

212 

213 SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".tif", ".tiff"} 

214 paths = [ 

215 os.path.join(img_dir, f) 

216 for f in os.listdir(img_dir) 

217 if Path(f).suffix.lower() in SUPPORTED_EXTENSIONS 

218 ] 

219 if is_paired or is_paired is None: 

220 paths = [ 

221 p 

222 for p in paths 

223 if os.path.basename(p) 

224 in annotation_df[self.config.img_path_col].tolist() 

225 ] 

226 imgs = [] 

227 for p in paths: 

228 img = self.parse_image_to_tensor(image_path=p, to_h=to_h, to_w=to_w) 

229 img_path = os.path.basename(p) 

230 subset: Union[pd.Series, pd.DataFrame] = annotation_df[ 

231 annotation_df[self.config.img_path_col] == img_path 

232 ] 

233 if not subset.empty: 

234 imgs.append( 

235 ImgData( 

236 img=img, 

237 sample_id=str(subset.index[0]), 

238 annotation=subset, 

239 ) 

240 ) 

241 return imgs 

242 

243 def read_annotation_file(self, data_info: DataInfo) -> pd.DataFrame: 

244 """Reads annotation file and returns DataFrame with file contents 

245 Args: 

246 data_info: specific part of the Configuration object for input data 

247 Returns: 

248 DataFrame with annotation data. 

249 

250 """ 

251 anno_file = ( 

252 os.path.join(data_info.file_path) 

253 if data_info.extra_anno_file is None 

254 else os.path.join(data_info.extra_anno_file) 

255 ) 

256 sep = data_info.sep 

257 if anno_file.endswith(".parquet"): 

258 annotation = pd.read_parquet(anno_file) 

259 elif anno_file.endswith((".csv", ".txt", ".tsv")): 

260 annotation = pd.read_csv(anno_file, sep=sep, index_col=0, engine="python") 

261 else: 

262 raise ValueError(f"Unsupported file type for: {anno_file}") 

263 return annotation 

264 

265 def read_data( 

266 self, config: DefaultConfig 

267 ) -> Tuple[Dict[str, List[ImgData]], Dict[str, pd.DataFrame]]: 

268 """Read image data from the specified directory based on configuration. 

269 

270 Args: 

271 config: The configuration object containing the data configuration. 

272 

273 Returns: 

274 A Tuple of Dicts: 

275 1. Dict with type of image data as key and actual List of ImgData as value. 

276 2. Dict with type of image data as key and DataFrame of annotation data as value. 

277 

278 Raises: 

279 Exception: If no image data is found in the configuration or other validation errors occur. 

280 """ 

281 # Find all image data sources in config 

282 image_sources = { 

283 k: v 

284 for k, v in config.data_config.data_info.items() 

285 if v.data_type == "IMG" 

286 } 

287 

288 if not image_sources: 

289 raise ValueError("No image data found in the configuration.") 

290 

291 result = {} 

292 annotation = {} 

293 for key, img_info in image_sources.items(): 

294 try: 

295 result[key], annotation[key] = self._read_data(config, img_info) 

296 print(f"Successfully loaded {len(result[key])} images for {key}") 

297 except Exception as e: 

298 print(f"Error loading images for {key}: {str(e)}") 

299 # Decide whether to raise or continue based on your requirements 

300 

301 return result, annotation 

302 

303 def _read_data( 

304 self, config: DefaultConfig, img_info: DataInfo 

305 ) -> Tuple[List[ImgData], pd.DataFrame]: 

306 """Read data for a specific image source. 

307 

308 Args: 

309 config: The configuration object containing the data configuration. 

310 img_info: The specific image info configuration. 

311 

312 Returns: 

313 A Tuple of Dicts: 

314 1. Dict with type of image data as key and actual List of ImgData as value. 

315 2. Dict with type of image data as key and DataFrame of annotation data as value. 

316 

317 """ 

318 img_dir = img_info.file_path 

319 img_size_finder: ImageSizeFinder = ImageSizeFinder(config) 

320 to_h, to_w = img_size_finder.get_nearest_quadratic_image_size() 

321 

322 if img_info.extra_anno_file is not None: 

323 # Use image-specific annotation file if provided 

324 annotation = self.read_annotation_file(img_info) 

325 else: 

326 # Otherwise use the global annotation file 

327 try: 

328 anno_info = next( 

329 f 

330 for f in config.data_config.data_info.values() 

331 if f.data_type == "ANNOTATION" 

332 ) 

333 annotation = self.read_annotation_file(anno_info) 

334 except StopIteration: 

335 raise ValueError("No annotation data found in the configuration.") 

336 

337 images = self.read_all_images_from_dir( 

338 img_dir=img_dir, 

339 to_h=to_h, 

340 to_w=to_w, 

341 annotation_df=annotation, 

342 is_paired=config.requires_paired, 

343 ) 

344 annotations: pd.DataFrame = pd.concat([img.annotation for img in images]) 

345 

346 return images, annotations 

347 

348 

349class ImageNormalizer: 

350 """Implements Normalization analog to other data modalites for ImageData""" 

351 

352 @staticmethod 

353 def normalize_image( 

354 image: np.ndarray, method: Literal["STANDARD", "MINMAX", "ROBUST", "NONE"] 

355 ) -> np.ndarray: 

356 """Performs Image Normalization. 

357 

358 Supported methods are: 

359 - STANDARD: (analog to StandardScaler of sklearn). 

360 - MINMAX: (analog to MinMaxSclaer of sklearn). 

361 - ROBUST: (analog to RobustScaler of sklearn). 

362 - NONE: no normalization. 

363 

364 Args: 

365 image: input image as array. 

366 method: indicator string of which method to use 

367 Returns: 

368 The normalized images as np.ndarray 

369 Raises: 

370 ValueError: if unsupported normalization method is provided or Normalization fails for any other reason. 

371 """ 

372 try: 

373 if method == "NONE": 

374 return image 

375 

376 if method == "MINMAX": 

377 # Create a copy of the image for normalization 

378 normalized = image.astype(np.float32) 

379 cv2.normalize( 

380 normalized, 

381 normalized, 

382 alpha=0, 

383 beta=1, 

384 norm_type=cv2.NORM_MINMAX, 

385 dtype=cv2.CV_32F, 

386 ) 

387 return normalized 

388 

389 elif method == "STANDARD": 

390 mean = np.mean(image, axis=(1, 2), keepdims=True) 

391 std = np.std(image, axis=(1, 2), keepdims=True) 

392 return (image - mean) / (std + 1e-8) 

393 

394 elif method == "ROBUST": 

395 median = np.median(image, axis=(1, 2), keepdims=True) 

396 q75, q25 = np.percentile(image, [75, 25], axis=(1, 2), keepdims=True) 

397 iqr = q75 - q25 

398 return (image - median) / (iqr + 1e-8) 

399 

400 else: 

401 raise ValueError(f"Unsupported normalization method: {method}") 

402 

403 except Exception as e: 

404 raise ValueError(f"Failed to normalize image: {str(e)}")