Source code for alexandria.analyzers.high_contrast

"""
Unified High Contrast Analyzer for CatPhan Phantom Analysis

This module combines the high-contrast/resolution analysis functionality from both
catphan404 and XVI-CatPhan projects, providing a comprehensive analyzer for CTP528
line pair module with both single-image and DICOM-series modes.
"""

import numpy as np
from scipy.interpolate import interpn, interp1d
from scipy.signal import find_peaks
from typing import Optional, Tuple, List, Dict, Any


[docs] class HighContrastAnalyzer: """ Analyzer for CatPhan's CTP528 high-contrast (line pair) module. """ def __init__( self, image: Optional[np.ndarray] = None, pixel_spacing: Optional[float] = None, center: Optional[Tuple[float, float]] = None, t_offset_deg: float = 0.0, rotation_offset: Optional[float] = None, # Alias for t_offset_deg dicom_set: Optional[List] = None, slice_index: Optional[int] = None, lp_r_mm: float = 48.0, samples_per_segment: int = 50, center_threshold: float = -980, center_threshold_fallback: float = -900.0, ): # Mode detection self.dicom_mode = dicom_set is not None and slice_index is not None self.dicom_set = dicom_set self.slice_index = slice_index self.averaged_image = None if image is not None: self.image = np.array(image, dtype=float) else: self.image = None self.pixel_spacing = float(pixel_spacing) if pixel_spacing is not None else None self.center = center self.center_threshold = center_threshold self.center_threshold_fallback = center_threshold_fallback if rotation_offset is not None: self.t_offset_deg = float(rotation_offset) else: self.t_offset_deg = float(t_offset_deg) self.lp_r_mm = float(lp_r_mm) self.samples_per_segment = int(samples_per_segment) if center is not None: self.center_x = float(center[0]) self.center_y = float(center[1]) else: self.center_x = None self.center_y = None self.theta_deg = np.array([10, 40, 62, 85, 103, 121, 140, 157, 173, 186]) + self.t_offset_deg self.npeaks = [[1, 2], [2, 3], [3, 4], [4, 4], [5, 4], [6, 5], [7, 5], [8, 5], [9, 5], [10, 5]] self.lpx = None self.lpy = None self.per_pair_mtf = [] self.profiles = [] self.peaks_max = [] self.peaks_min = [] self.peaks_combined = [] self.lp_x = [] self.lp_y = [] self.nMTF = None self.lp_axis = None self.mtf_points = {} self.line_pair_profiles = None self.results = {} self.mtf = None self.lp_frequencies = None
[docs] def prepare_image(self): if self.dicom_mode: try: from ..utils.geometry import CatPhanGeometry im, means, z_mean = CatPhanGeometry.select_optimal_ctp528_slices( self.dicom_set, self.slice_index ) self.averaged_image = im self.image = im except Exception: idx = self.slice_index if idx <= 0: im1 = self.dicom_set[idx].pixel_array im2 = self.dicom_set[idx + 1].pixel_array im3 = self.dicom_set[idx + 1].pixel_array elif idx >= len(self.dicom_set) - 1: im1 = self.dicom_set[idx].pixel_array im2 = self.dicom_set[idx - 1].pixel_array im3 = self.dicom_set[idx - 1].pixel_array else: im1 = self.dicom_set[idx].pixel_array im2 = self.dicom_set[idx + 1].pixel_array im3 = self.dicom_set[idx - 1].pixel_array self.averaged_image = (im1.astype(float) + im2.astype(float) + im3.astype(float)) / 3 self.image = self.averaged_image if self.pixel_spacing is None: self.pixel_spacing = float(self.dicom_set[self.slice_index].PixelSpacing[0]) if self.center is not None: self.center_x = float(self.center[0]) self.center_y = float(self.center[1]) return self.averaged_image else: if self.image is None: raise ValueError("Image must be provided in single-image mode") return self.image
def _compute_centers(self): if self.center_x is None or self.center_y is None: raise ValueError("Center coordinates must be provided") if self.pixel_spacing is None: raise ValueError("Pixel spacing must be provided") r_pixels = self.lp_r_mm / self.pixel_spacing thetas_rad = np.deg2rad(self.theta_deg) self.lpx = r_pixels * np.cos(thetas_rad) + self.center_x self.lpy = r_pixels * np.sin(thetas_rad) + self.center_y def _get_MTF_for_pair(self, x_coords: Tuple[float, float], y_coords: Tuple[float, float], npeaks_expected: List[int]): x1 = np.linspace(x_coords[0], x_coords[1], self.samples_per_segment) y1 = np.linspace(y_coords[0], y_coords[1], self.samples_per_segment) f1 = np.zeros(len(x1)) ny, nx = self.image.shape x = np.linspace(0, nx - 1, nx) y = np.linspace(0, ny - 1, ny) for i in range(len(x1)): f1[i] = interpn((y, x), self.image, [[y1[i], x1[i]]], method='linear', bounds_error=False, fill_value=0.0)[0] df1 = np.diff(f1) h = 50 peaks_max1, _ = find_peaks(df1, height=h) peaks_min1, _ = find_peaks(-df1, height=h) while (len(peaks_max1) < npeaks_expected[1]) or (len(peaks_min1) < npeaks_expected[1]): if h <= 10: return 0.0, f1, peaks_max1, peaks_min1, np.array([], dtype=int), x_coords, y_coords h -= 1 peaks_max1, _ = find_peaks(df1, height=h) peaks_min1, _ = find_peaks(-df1, height=h) peaks1 = np.hstack((peaks_max1, peaks_min1)) peaks1 = np.array(sorted(peaks1)) idxmax = [] idxmin = [] Imax = [] Imin = [] offset = 1 for k in range(len(peaks1) - 1): if k % 2 == 0: tmp_idx = np.array(f1[peaks1[k] - offset:peaks1[k + 1] + offset]).argmax() idx_at = tmp_idx - offset + peaks1[k] idxmax.append(idx_at) Imax.append(f1[idx_at]) else: tmp_idx = np.array(f1[peaks1[k] - offset:peaks1[k + 1] + offset]).argmin() idx_at = tmp_idx - offset + peaks1[k] idxmin.append(idx_at) Imin.append(f1[idx_at]) if (len(Imax) == 0) or (len(Imin) == 0): MTF_value = 0.0 else: MTF_value = (np.mean(Imax) - np.mean(Imin)) / (np.mean(Imax) + np.mean(Imin)) return float(MTF_value), f1, peaks_max1, peaks_min1, peaks1, x_coords, y_coords
[docs] def analyze(self, write_log: bool = False, verbose: bool = True) -> Dict[str, Any]: if self.image is None: self.prepare_image() if self.center is None: from alexandria.utils import find_center_edge_detection, compute_phantom_boundary, draw_boundary center_row, center_col, diameter_y, diameter_x = find_center_edge_detection( self.image, threshold=self.center_threshold, fallback_threshold=self.center_threshold_fallback, return_diameters=True ) self.center = (center_col, center_row) self.center_x = float(self.center[0]) self.center_y = float(self.center[1]) boundary_x, boundary_y = draw_boundary(self.center, diameter_x, diameter_y) if len(boundary_x) == 0: _, (boundary_x, boundary_y) = compute_phantom_boundary( self.image, self.center, self.pixel_spacing, threshold=self.center_threshold, fallback_threshold=self.center_threshold_fallback ) self.boundary = { 'x': boundary_x.tolist() if len(boundary_x) > 0 else [], 'y': boundary_y.tolist() if len(boundary_y) > 0 else [] } self._compute_centers() n_pairs = len(self.lpx) - 1 per_pair_mtf = [] profiles = [] pmax_list = [] pmin_list = [] pcomb_list = [] lp_x_list = [] lp_y_list = [] for i in range(n_pairs): npeaks = self.npeaks[i] x_coords = (self.lpx[i], self.lpx[i + 1]) y_coords = (self.lpy[i], self.lpy[i + 1]) mtf_val, f1, pmax, pmin, pcomb, tmpx, tmpy = self._get_MTF_for_pair( x_coords, y_coords, npeaks ) per_pair_mtf.append(mtf_val) profiles.append(f1) pmax_list.append(pmax) pmin_list.append(pmin) pcomb_list.append(pcomb) lp_x_list.append(tmpx) lp_y_list.append(tmpy) self.per_pair_mtf = per_pair_mtf self.profiles = profiles self.line_pair_profiles = profiles self.peaks_max = pmax_list self.peaks_min = pmin_list self.peaks_combined = pcomb_list self.lp_x = lp_x_list self.lp_y = lp_y_list mtf_array = np.array(self.per_pair_mtf, dtype=float) max_val = mtf_array.max() if mtf_array.size > 0 else 0.0 if max_val > 0: self.nMTF = mtf_array / max_val else: self.nMTF = mtf_array.copy() if len(self.per_pair_mtf) > 0: self.lp_axis = np.linspace(1, len(self.per_pair_mtf), len(self.per_pair_mtf)) / 10.0 else: self.lp_axis = np.array([]) if self.nMTF.size > 0 and self.lp_axis.size > 0: targets = (0.8, 0.5, 0.3, 0.1) try: fMTF = np.interp(targets, self.nMTF[::-1], self.lp_axis[::-1]) self.mtf_points = { "MTF80": float(fMTF[0]), "MTF50": float(fMTF[1]), "MTF30": float(fMTF[2]), "MTF10": float(fMTF[3]), } except Exception: self.mtf_points = {"MTF80": np.nan, "MTF50": np.nan, "MTF30": np.nan, "MTF10": np.nan} else: self.mtf_points = {"MTF80": np.nan, "MTF50": np.nan, "MTF30": np.nan, "MTF10": np.nan} self.results = { 'mtf_80': self.mtf_points["MTF80"], 'mtf_50': self.mtf_points["MTF50"], 'mtf_30': self.mtf_points["MTF30"], 'mtf_10': self.mtf_points["MTF10"], 'mtf_array': self.nMTF, 'lp_frequencies': self.lp_axis, 'line_pair_x': self.lpx, 'line_pair_y': self.lpy } self.mtf = self.nMTF self.lp_frequencies = self.lp_axis return self.results
[docs] def to_dict(self) -> Dict[str, Any]: return { "pixel_spacing_mm": self.pixel_spacing, "image_shape": self.image.shape if self.image is not None else None, "centers_x": self.lpx.tolist() if self.lpx is not None else None, "centers_y": self.lpy.tolist() if self.lpy is not None else None, "per_pair_mtf": [float(x) for x in self.per_pair_mtf], "MTF10_lp_per_mm": self.mtf_points.get("MTF10"), "MTF30_lp_per_mm": self.mtf_points.get("MTF30"), "MTF50_lp_per_mm": self.mtf_points.get("MTF50"), "MTF80_lp_per_mm": self.mtf_points.get("MTF80"), "lp_mm": self.lp_axis.tolist() if self.lp_axis is not None else None, "nmtf": self.nMTF.tolist() if self.nMTF is not None else None, }