Coverage for src / autoencodix / utils / _imgreader.py: 17%
147 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 10:09 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 10:09 +0200
1import os
2from pathlib import Path
3from typing import List, Literal, Optional, Union, Dict, Tuple
5import cv2
6import numpy as np
7import pandas as pd
8from autoencodix.configs.default_config import DefaultConfig, DataInfo
9from autoencodix.data._imgdataclass import ImgData
12class ImageProcessingError(Exception):
13 pass
16class ImageSizeFinder:
17 """Finds nearest quadratic image size that is dividable by 2^number_of_layers.ArithmeticError
19 Nearest quadratic image size is based on the given image size in the config file.
21 Attributes:
22 config: Configuration object
23 width: request image width by user.
24 height: requested image height by user.
26 """
28 def __init__(self, config: DefaultConfig):
29 """Inits the ImageSizeFinder
31 Args:
32 config: Configuration object.
33 """
34 self.config = config
35 found_image_type = False
36 for data_type in config.data_config.data_info.keys():
37 if config.data_config.data_info[data_type].data_type == "IMG":
38 self.width = config.data_config.data_info[data_type].img_width_resize
39 self.height = config.data_config.data_info[data_type].img_height_resize
40 found_image_type = True
41 if not found_image_type:
42 raise ValueError("You need to provide a DATA_TYPE of with the TYPE key IMG")
44 self.n_conv_layers = 5
46 def _image_size_is_legal(self) -> bool:
47 """Checks if image size is dividable by 2^number_of_layers to the givensize in the config.
49 Returns:
50 bool if image size is allowed.
51 """
52 return self.width % 2**self.n_conv_layers == 0
54 def get_nearest_quadratic_image_size(self):
55 """Finds nearest quadratic image size that is dividable by 2^number_of_layers
57 Nearest quadratic image size is based on the given image size in the config file.
59 Returns:
60 Tuple of ints width and height with widht=height.
61 Raies:
62 ValueError: if not allowed image size can be found.
63 """
65 if self._image_size_is_legal():
66 print(
67 f"Given image size is possible, rescaling images to: {self.width}x{self.height}"
68 )
69 return self.width, self.height
70 running_image_size = self.width
71 while_loop_counter = 0
72 while not running_image_size % 2**self.n_conv_layers == 0:
73 running_image_size += 1
74 while_loop_counter += 1
75 if while_loop_counter > 10000:
76 raise ValueError(
77 f"Could not find a quadratic image size that is dividable by 2^{self.n_conv_layers}"
78 )
79 print(
80 f"Given image size{self.width}x{self.height} is not possible, rescaling to: {running_image_size}x{running_image_size}"
81 )
82 if running_image_size is None:
83 raise ValueError(
84 f"Could not find a quadratic image size that is dividable by 2^{self.n_conv_layers}"
85 )
87 return running_image_size, running_image_size
90class ImageDataReader:
91 """Reads and processes image data.
93 Reads all images from the specified directory, processes them,
94 and returns a list of ImgData objects.
95 """
97 def __init__(self, config: DefaultConfig):
98 self.config = config
100 def validate_image_path(self, image_path: Union[str, Path]) -> bool:
101 """Checks if file extension is allowed:
103 Allowed are (independent of capitalization):
104 - jpg
105 - jpeg
106 - png
107 - tif
108 - tiff
110 Args:
111 image_path: path or str of image to read
112 """
113 path = Path(image_path) if isinstance(image_path, str) else image_path
114 return (
115 path.exists()
116 and path.is_file()
117 and path.suffix.lower() in {".jpg", ".jpeg", ".png", ".tif", ".tiff"}
118 )
120 def parse_image_to_tensor(
121 self,
122 image_path: Union[str, Path],
123 to_h: Optional[int] = None,
124 to_w: Optional[int] = None,
125 ) -> np.ndarray:
126 """Reads an image from the given path, optionally resizes it, and converts it to a tensor.
128 Args:
129 image_path: The path to the image file.
130 to_h: The desired height of the output tensor, by default None.
131 to_w: The desired width of the output tensor, by default None.
133 Returns:
134 The processed image as a tensor.
136 Raises:
137 FileNotFoundError: If the image path is invalid or the image cannot be read.
138 ImageProcessingError: If the image format is unsupported or an unexpected error occurs during processing.
139 """
141 if not self.validate_image_path(image_path):
142 raise FileNotFoundError(f"Invalid image path: {image_path}")
143 image_path = Path(image_path)
144 SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".tif", ".tiff"}
145 if image_path.suffix.lower() not in SUPPORTED_EXTENSIONS:
146 raise ImageProcessingError(
147 f"Unsupported image format: {image_path.suffix}. "
148 f"Supported formats are: {', '.join(SUPPORTED_EXTENSIONS)}"
149 )
150 try:
151 if image_path.suffix.lower() in {".tif", ".tiff"}:
152 image = cv2.imread(str(image_path), cv2.IMREAD_UNCHANGED)
153 else:
154 image = cv2.imread(str(image_path))
156 if image is None:
157 raise FileNotFoundError(f"Failed to read image: {image_path}")
159 h, w, _ = image.shape[:3]
160 if to_h is None:
161 to_h = h
162 if to_w is None:
163 to_w = w
165 if not (2 <= len(image.shape) <= 3):
166 raise ImageProcessingError(
167 f"Image has unsupported shape: {image.shape}. "
168 "Supported shapes are 2D and 3D."
169 )
171 image = cv2.resize(image, (to_w, to_h), interpolation=cv2.INTER_AREA)
173 if len(image.shape) == 3:
174 image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
175 if len(image.shape) == 2:
176 image = np.expand_dims(image, axis=2)
178 image = image.transpose(2, 0, 1)
179 return image
181 except Exception as e:
182 raise e
184 def read_all_images_from_dir(
185 self,
186 img_dir: str,
187 to_h: Optional[int],
188 to_w: Optional[int],
189 annotation_df: pd.DataFrame,
190 is_paired: Union[bool, None] = None,
191 ) -> List[ImgData]:
192 """Reads all images from a specified directory, processes them, returns list of ImgData objects.
194 Args:
195 img_dir: The directory containing the images.
196 to_h: The desired height of the output tensors.
197 to_w: The desired width of the output tensors.
198 annotation_df: DataFrame containing image annotations.
199 is_paired: Whether the images are paired with annotations.
201 Returns:
202 List of processed image data objects.
204 Raises:
205 ValueError: If the annotation DataFrame is missing required columns.
206 """
207 if self.config.img_path_col not in annotation_df.columns:
208 raise ValueError(
209 f" The defined column for image paths: {self.config.img_path_col} column is missing in the annotation_df\
210 you can define this in the config via the param `img_path_col`"
211 )
213 SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".tif", ".tiff"}
214 paths = [
215 os.path.join(img_dir, f)
216 for f in os.listdir(img_dir)
217 if Path(f).suffix.lower() in SUPPORTED_EXTENSIONS
218 ]
219 if is_paired or is_paired is None:
220 paths = [
221 p
222 for p in paths
223 if os.path.basename(p)
224 in annotation_df[self.config.img_path_col].tolist()
225 ]
226 imgs = []
227 for p in paths:
228 img = self.parse_image_to_tensor(image_path=p, to_h=to_h, to_w=to_w)
229 img_path = os.path.basename(p)
230 subset: Union[pd.Series, pd.DataFrame] = annotation_df[
231 annotation_df[self.config.img_path_col] == img_path
232 ]
233 if not subset.empty:
234 imgs.append(
235 ImgData(
236 img=img,
237 sample_id=str(subset.index[0]),
238 annotation=subset,
239 )
240 )
241 return imgs
243 def read_annotation_file(self, data_info: DataInfo) -> pd.DataFrame:
244 """Reads annotation file and returns DataFrame with file contents
245 Args:
246 data_info: specific part of the Configuration object for input data
247 Returns:
248 DataFrame with annotation data.
250 """
251 anno_file = (
252 os.path.join(data_info.file_path)
253 if data_info.extra_anno_file is None
254 else os.path.join(data_info.extra_anno_file)
255 )
256 sep = data_info.sep
257 if anno_file.endswith(".parquet"):
258 annotation = pd.read_parquet(anno_file)
259 elif anno_file.endswith((".csv", ".txt", ".tsv")):
260 annotation = pd.read_csv(anno_file, sep=sep, index_col=0, engine="python")
261 else:
262 raise ValueError(f"Unsupported file type for: {anno_file}")
263 return annotation
265 def read_data(
266 self, config: DefaultConfig
267 ) -> Tuple[Dict[str, List[ImgData]], Dict[str, pd.DataFrame]]:
268 """Read image data from the specified directory based on configuration.
270 Args:
271 config: The configuration object containing the data configuration.
273 Returns:
274 A Tuple of Dicts:
275 1. Dict with type of image data as key and actual List of ImgData as value.
276 2. Dict with type of image data as key and DataFrame of annotation data as value.
278 Raises:
279 Exception: If no image data is found in the configuration or other validation errors occur.
280 """
281 # Find all image data sources in config
282 image_sources = {
283 k: v
284 for k, v in config.data_config.data_info.items()
285 if v.data_type == "IMG"
286 }
288 if not image_sources:
289 raise ValueError("No image data found in the configuration.")
291 result = {}
292 annotation = {}
293 for key, img_info in image_sources.items():
294 try:
295 result[key], annotation[key] = self._read_data(config, img_info)
296 print(f"Successfully loaded {len(result[key])} images for {key}")
297 except Exception as e:
298 print(f"Error loading images for {key}: {str(e)}")
299 # Decide whether to raise or continue based on your requirements
301 return result, annotation
303 def _read_data(
304 self, config: DefaultConfig, img_info: DataInfo
305 ) -> Tuple[List[ImgData], pd.DataFrame]:
306 """Read data for a specific image source.
308 Args:
309 config: The configuration object containing the data configuration.
310 img_info: The specific image info configuration.
312 Returns:
313 A Tuple of Dicts:
314 1. Dict with type of image data as key and actual List of ImgData as value.
315 2. Dict with type of image data as key and DataFrame of annotation data as value.
317 """
318 img_dir = img_info.file_path
319 img_size_finder: ImageSizeFinder = ImageSizeFinder(config)
320 to_h, to_w = img_size_finder.get_nearest_quadratic_image_size()
322 if img_info.extra_anno_file is not None:
323 # Use image-specific annotation file if provided
324 annotation = self.read_annotation_file(img_info)
325 else:
326 # Otherwise use the global annotation file
327 try:
328 anno_info = next(
329 f
330 for f in config.data_config.data_info.values()
331 if f.data_type == "ANNOTATION"
332 )
333 annotation = self.read_annotation_file(anno_info)
334 except StopIteration:
335 raise ValueError("No annotation data found in the configuration.")
337 images = self.read_all_images_from_dir(
338 img_dir=img_dir,
339 to_h=to_h,
340 to_w=to_w,
341 annotation_df=annotation,
342 is_paired=config.requires_paired,
343 )
344 annotations: pd.DataFrame = pd.concat([img.annotation for img in images])
346 return images, annotations
349class ImageNormalizer:
350 """Implements Normalization analog to other data modalites for ImageData"""
352 @staticmethod
353 def normalize_image(
354 image: np.ndarray, method: Literal["STANDARD", "MINMAX", "ROBUST", "NONE"]
355 ) -> np.ndarray:
356 """Performs Image Normalization.
358 Supported methods are:
359 - STANDARD: (analog to StandardScaler of sklearn).
360 - MINMAX: (analog to MinMaxSclaer of sklearn).
361 - ROBUST: (analog to RobustScaler of sklearn).
362 - NONE: no normalization.
364 Args:
365 image: input image as array.
366 method: indicator string of which method to use
367 Returns:
368 The normalized images as np.ndarray
369 Raises:
370 ValueError: if unsupported normalization method is provided or Normalization fails for any other reason.
371 """
372 try:
373 if method == "NONE":
374 return image
376 if method == "MINMAX":
377 # Create a copy of the image for normalization
378 normalized = image.astype(np.float32)
379 cv2.normalize(
380 normalized,
381 normalized,
382 alpha=0,
383 beta=1,
384 norm_type=cv2.NORM_MINMAX,
385 dtype=cv2.CV_32F,
386 )
387 return normalized
389 elif method == "STANDARD":
390 mean = np.mean(image, axis=(1, 2), keepdims=True)
391 std = np.std(image, axis=(1, 2), keepdims=True)
392 return (image - mean) / (std + 1e-8)
394 elif method == "ROBUST":
395 median = np.median(image, axis=(1, 2), keepdims=True)
396 q75, q25 = np.percentile(image, [75, 25], axis=(1, 2), keepdims=True)
397 iqr = q75 - q25
398 return (image - median) / (iqr + 1e-8)
400 else:
401 raise ValueError(f"Unsupported normalization method: {method}")
403 except Exception as e:
404 raise ValueError(f"Failed to normalize image: {str(e)}")