Source code for dran.fitting.plot_qc

# =========================================================================== #
# File: plot_qc.py                                                            #
# Author: Pfesesani V. van Zyl                                                #
# Email: pfesi24@gmail.com                                                    #
# =========================================================================== #


# Library imports
# --------------------------------------------------------------------------- #
import numpy as np
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
# =========================================================================== #


[docs] @dataclass(slots=True) class ScanQualityResult: # Defaults represent "QC not computed yet". ok: bool = False flag: int = 0 message: str = "QC not computed" metrics: Dict[str, Any] = field(default_factory=dict)
[docs] def to_dict(self) -> dict: return { "flag": getattr(self, "flag", None), "message": getattr(self, "message", ""), "ok": getattr(self, "ok", None), "metrics": getattr(self, "metrics", {}), }
def _mad(x: np.ndarray) -> float: """ Compute the median absolute deviation (MAD) of a numeric array. MAD is defined as the median of the absolute differences between each value and the median of the data. It provides a robust measure of statistical dispersion and resists the influence of outliers. Non-finite values such as NaN and infinity are removed before computation. If no finite values remain, NaN is returned. Parameters ---------- x : np.ndarray Input array of numeric values. Returns ------- float The median absolute deviation of the input data, or NaN if the input contains no finite values. Notes ----- This function returns the raw MAD value. To approximate the standard deviation for normally distributed data, multiply the result by 1.4826. """ x = np.asarray(x, dtype=float) x = x[np.isfinite(x)] if x.size == 0: return float("nan") med = np.median(x) return float(np.median(np.abs(x - med))) def _local_maxima_indices(y: np.ndarray) -> np.ndarray: """ Return indices of local maxima in a one-dimensional numeric array. A local maximum is defined as a point whose value is greater than both its immediate neighbors. Endpoints are excluded, and flat plateaus are not considered maxima. Parameters ---------- y : np.ndarray Input array of numeric values. Returns ------- np.ndarray Array of integer indices where local maxima occur. Returns an empty array if fewer than three elements are provided. Notes ----- The implementation detects sign changes in the first derivative from positive to negative. This method is efficient and robust for smooth signals but does not detect flat or multi-point peaks. """ y = np.asarray(y, dtype=float) if y.size < 3: return np.array([], dtype=int) return (np.diff(np.sign(np.diff(y))) < 0).nonzero()[0] + 1
[docs] def check_scan_quality( x: np.ndarray, y_corrected: np.ndarray, y_spline: np.ndarray, hfnbw: float, hhpbw: float, peak_coeffs: Optional[np.ndarray], peak_x: Optional[np.ndarray], peak_y: Optional[np.ndarray], peak_model: Optional[np.ndarray], band: str, ) -> ScanQualityResult: x = np.asarray(x, dtype=float) y = np.asarray(y_corrected, dtype=float) ys = np.asarray(y_spline, dtype=float) metrics: Dict[str, Any] = {} if x.size < 20 or y.size != x.size: return ScanQualityResult(False, 34, "invalid input length", metrics) max_spline = np.nanmax(ys) peak_height = float(max_spline) if np.isfinite(max_spline) else float("nan") metrics["peak_height"] = peak_height # pmin=min(peak_y) # pmax=max(peak_y) # print(pmin,pmax, pmax-pmin) if not np.isfinite(peak_height) or peak_height <= 0: return ScanQualityResult(False, 35, "non-finite or non-positive peak height", metrics) # 1) Step detection dy = np.diff(y) mad_dy = _mad(dy) metrics["mad_dy"] = mad_dy if np.isfinite(mad_dy) and mad_dy > 0: step_k = 8.0 step_hits = int(np.count_nonzero(np.abs(dy) > step_k * mad_dy)) step_frac = step_hits / max(1, dy.size) metrics["step_hits"] = step_hits metrics["step_frac"] = step_frac if step_frac > 0.01: return ScanQualityResult(False, 36, "step-like jumps detected", metrics) # Regions main_mask = np.abs(x) <= hfnbw wing_mask = np.abs(x) >= hfnbw # 2) Multi-peak test inside FNBW main_max_idx = _local_maxima_indices(ys[main_mask]) if main_max_idx.size > 0: main_vals = ys[main_mask][main_max_idx] strong = main_vals >= 0.60 * float(np.nanmax(ys[main_mask])) strong_count = int(np.count_nonzero(strong)) metrics["main_local_max_count"] = int(main_max_idx.size) metrics["main_strong_max_count"] = strong_count if strong_count > 1: return ScanQualityResult(False, 37, "multiple strong peaks in main beam", metrics) # 3) Baseline flatness outside FNBW if int(np.count_nonzero(wing_mask)) >= 10: wing_y = y[wing_mask] wing_x = x[wing_mask] wing_mean = abs(float(np.nanmean(wing_y))) wing_std = float(np.nanstd(wing_y)) metrics["wing_mean"] = wing_mean metrics["wing_std"] = wing_std mean_lim = 0.3 * peak_height #0.05 std_lim = 0.3 * peak_height #0.10 if wing_std > 1.5*std_lim: return ScanQualityResult(False, 44, f"baseline too noisy or structured (>1.5*std), wing std: {wing_std:.2f}, peak std lim: {std_lim:.2f}", metrics) if wing_mean > mean_lim: return ScanQualityResult(False, 38, f"baseline mean too far from zero, wing mean: {wing_mean:.2f}, peak mean lim: {mean_lim:.2f} ", metrics) # if wing_std > std_lim: # return ScanQualityResult(False, 39, f"baseline too noisy or structured, wing std: {wing_std:.2f}, peak std lim: {std_lim:.2f}", metrics) # slope in wings try: coeff = np.polyfit(wing_x[np.isfinite(wing_y)], wing_y[np.isfinite(wing_y)], 1) wing_slope = float(coeff[0]) except Exception: wing_slope = float("nan") metrics["wing_slope"] = wing_slope slope_lim = (0.20 * peak_height) / max(hfnbw, 1e-6) if np.isfinite(wing_slope) and abs(wing_slope) > slope_lim: return ScanQualityResult(False, 40, "baseline slope too large", metrics) # 4) Peak concavity and vertex position if peak_coeffs is not None and len(peak_coeffs) >= 3: a = float(peak_coeffs[0]) b = float(peak_coeffs[1]) metrics["peak_coeff_a"] = a metrics["peak_coeff_b"] = b if not np.isfinite(a) or a >= 0.0: return ScanQualityResult(False, 41, "peak parabola not concave down", metrics) x0 = -b / (2.0 * a) metrics["peak_vertex_x"] = x0 if peak_x is not None and peak_x.size > 0: xmin = float(np.nanmin(peak_x)) xmax = float(np.nanmax(peak_x)) metrics["peak_fit_xmin"] = xmin metrics["peak_fit_xmax"] = xmax if not (xmin <= x0 <= xmax): return ScanQualityResult(False, 42, "peak vertex outside fit window", metrics) # 6) Peak residual quality if peak_y is not None and peak_model is not None and peak_y.size == peak_model.size and peak_y.size >= 10: resid = peak_y - peak_model peak_rms = float(np.sqrt(np.nanmean(resid**2))) rel = peak_rms / peak_height metrics["peak_rms"] = peak_rms metrics["peak_rel_rms"] = rel if np.isfinite(rel) and rel > 0.2: #0.08: # print(peak_rms) return ScanQualityResult(False, 43, f"peak fit residual too large, peak_rms: {peak_rms:.2f}, peak_rel_rms: {rel:.2f} ", metrics) # 7) Negative tail check # tail_min = float(np.nanmin(y[wing_mask])) if int(np.count_nonzero(wing_mask)) else float("nan") # metrics["tail_min"] = tail_min # if np.isfinite(tail_min) and tail_min < (-0.20 * peak_height): # return ScanQualityResult(False, 44, "negative tail too deep", metrics) return ScanQualityResult(True, 0, "OK", metrics)