# =========================================================================== #
# File: rfi_cleaning.py #
# Author: Pfesesani V. van Zyl #
# Email: pfesi24@gmail.com #
# =========================================================================== #
# Library imports
# --------------------------------------------------------------------------- #
import logging
import numpy as np
from astropy.stats import sigma_clip
from typing import Sequence, Any
from typing import Optional, Union
from dran.fitting.models import (CleanedScan, ResidualFit,
IterativeCleaningResult)
from dran.fitting.spline_models import spline_fit_1d
# =========================================================================== #
[docs]
def clean_rfi_sigma_clip(x: np.ndarray, y: np.ndarray, log: logging.Logger
) -> CleanedScan:
"""
Sort x, build a spline, sigma-clip residuals, return inliers plus
artefacts.
"""
log.debug("RFI cleaning started")
order = np.argsort(x)
x_s = x[order]
y_s = y[order]
spl = spline_fit_1d(y_s, log=log)
resid = y_s - spl
clipped = sigma_clip(resid, sigma=2.5, cenfunc="median", stdfunc="mad_std", maxiters=10)
mask_inliers = ~clipped.mask
clean_x = x_s[mask_inliers]
clean_y = y_s[mask_inliers]
final_spl = spline_fit_1d(clean_y, log=log)
res = np.asarray(final_spl - clean_y, dtype=float)
clipped_rms = float(np.sqrt(np.nanmean(res ** 2)))
points_deleted = np.where(~mask_inliers)[0]
return CleanedScan(
x=clean_x,
y=clean_y,
rms=clipped_rms,
residual=np.asarray(clipped, dtype=float),
spline_max=float(np.max(final_spl)) if final_spl.size else float("nan"),
spline=np.asarray(final_spl, dtype=float),
points_deleted=np.asarray(points_deleted, dtype=int),
flag=0,
)
[docs]
def clean_data(
x: Union[np.ndarray, Sequence[float]],
y: Union[np.ndarray, Sequence[float]],
log: Optional[logging.Logger] = None,
) -> CleanedScan:
"""
Clean a drift scan using iterative RFI rejection and refitting.
Pipeline
1) Spline fit to y
2) Compute residuals and RMS
3) Iteratively reject outliers and refit until RMS no longer improves
Parameters
----------
x, y
1D arrays of equal length.
log
Optional logger.
Returns
-------
CleanedScan
Container holding cleaned arrays and diagnostic products.
"""
logger = log or logging.getLogger(__name__)
logger.debug("Iterative cleaning of RFI")
x_arr = np.asarray(x, dtype=float)
y_arr = np.asarray(y, dtype=float)
if x_arr.ndim != 1 or y_arr.ndim != 1:
# raise ValueError("x and y must be 1D arrays")
return CleanedScan(
x=x,
y=y,
rms=None,
residual=None,
spline_max=None,
spline=None,
points_deleted=None,
flag=32,
raw_rms=None,
raw_residuals=None,
message = "x and y must be 1D arrays"
)
if x_arr.size != y_arr.size:
# raise ValueError("x and y must have the same length")
return CleanedScan(
x=x,
y=y,
rms=None,
residual=None,
spline_max=None,
spline=None,
points_deleted=None,
flag=31,
raw_rms=None,
raw_residuals=None,
message = "x and y must have the same length"
)
if x_arr.size < 10:
# raise ValueError("scan is too short for cleaning")
return CleanedScan(
x=x,
y=y,
rms=None,
residual=None,
spline_max=None,
spline=None,
points_deleted=None,
flag=30,
raw_rms=None,
raw_residuals=None,
message = "scan is too short for cleaning"
)
# Initial spline and residuals on raw data
raw_spline = spline_fit_1d(y_arr,log=log)
raw_fit = calc_residual(raw_spline, y_arr, logger)
# Iterative cleaning
cleaning = clean_data_iterative_fitting(
x=x_arr,
y=y_arr,
res=raw_fit.residuals,
rms=raw_fit.rms,
log=logger,
)
return CleanedScan(
x=cleaning.x,
y=cleaning.y,
rms=cleaning.rms,
residual=cleaning.residual,
spline_max=cleaning.spline_max,
spline=cleaning.spline,
points_deleted=cleaning.points_deleted,
flag=cleaning.flag,
raw_rms=float(raw_fit.rms),
raw_residuals=np.asarray(raw_fit.residuals, dtype=float),
message = "Ok"
)
[docs]
def clean_data_iterative_fitting(
x: Union[np.ndarray, Sequence[float]],
y: Union[np.ndarray, Sequence[float]],
res: Union[np.ndarray, Sequence[float]],
rms: float,
log: logging.Logger,
x2: Optional[Sequence[Any]] = None,
cut: float = 3.0,
max_iters: int = 25,
) -> IterativeCleaningResult:
"""
Iteratively remove RFI-like outliers and refit a spline until RMS no longer
improves.
A point is kept if |residual| <= cut * rms.
Parameters
----------
x, y
1D arrays of equal length.
res
1D residual array aligned with x and y (for the current iteration).
rms
Current RMS estimate used for thresholding.
log
Logger instance.
x2
Optional aligned metadata (filenames, IDs). If provided, it is filtered too.
cut
Residual threshold multiplier.
max_iters
Safety cap on number of iterations.
Returns
-------
Without x2:
finalX, finalY, finalRms, finalRes, finalMaxSpl, finalSplinedData, pointsDeleted
With x2:
finalX, finalY, finalRms, finalRes, finalMaxSpl, finalSplinedData, pointsDeleted, finalNames
"""
log.debug("Performing iterative RFI cuts on data")
x_arr = np.asarray(x, dtype=float)
y_arr = np.asarray(y, dtype=float)
res_arr = np.asarray(res, dtype=float)
if x_arr.ndim != 1 or y_arr.ndim != 1 or res_arr.ndim != 1:
return IterativeCleaningResult(
x=x_arr.ravel(),
y=y_arr.ravel(),
rms=np.nan,
residual=res_arr.ravel(),
spline_max=np.nan,
spline=spline_fit_1d(y_arr.ravel(), log=log),
points_deleted=0,
flag=32,
names=list(x2) if x2 is not None else None,
message="x, y, and res must be 1D arrays",
)
n = x_arr.size
if y_arr.size != n or res_arr.size != n:
return IterativeCleaningResult(
x=x_arr,
y=y_arr,
rms=np.nan,
residual=res_arr,
spline_max=np.nan,
spline=spline_fit_1d(y_arr, log=log),
points_deleted=0,
flag=31,
names=list(x2) if x2 is not None else None,
message="x, y, and res must have the same length",
)
if x2 is not None:
names_arr = np.asarray(x2, dtype=object)
if names_arr.ndim != 1 or names_arr.size != n:
return IterativeCleaningResult(
x=x_arr,
y=y_arr,
rms=np.nan,
residual=res_arr,
spline_max=np.nan,
spline=spline_fit_1d(y_arr, log=log),
points_deleted=0,
flag=33,
names=list(x2),
message="x2 must be 1D and the same length as x",
)
else:
names_arr = None
if not np.isfinite(rms) or rms <= 0.0:
msg=f"RFI cleaning rms invalid (rms={rms}). Skipping further cleaning for this scan."
log.warning(msg)
spline_y = spline_fit_1d(y_arr, log=log)
spline_max = float(np.nanmax(spline_y)) if spline_y.size else float("nan")
return IterativeCleaningResult(
x=x_arr,
y=y_arr,
rms=np.nan,
residual=res_arr,
spline_max=spline_max,
spline=spline_y,
points_deleted=0,
flag=28,
names=names_arr.tolist() if names_arr is not None else None,
message=msg,
)
# Best state (start as original)
best_x = x_arr
best_y = y_arr
best_res = res_arr
best_rms = float(rms)
best_names = names_arr
best_spline = spline_fit_1d(best_y,log=log)
best_max_spline = float(np.nanmax(best_spline))
for it in range(1, max_iters + 1):
threshold = cut * best_rms
keep_mask = np.abs(best_res) <= threshold
# If nothing changes, stop.
kept_count = int(np.count_nonzero(keep_mask))
if kept_count == best_y.size:
log.debug(f"No additional points rejected at iteration {it}. Stopping.")
break
if kept_count < 10:
log.debug(f"Too few points remain ({kept_count}) at iteration {it}. Stopping.")
break
new_x = best_x[keep_mask]
new_y = best_y[keep_mask]
new_names = best_names[keep_mask] if best_names is not None else None
spline_y = spline_fit_1d(new_y,log=log)
max_spline = float(np.nanmax(spline_y))
residual_fit = calc_residual(spline_y, new_y, log)
new_res = np.asarray(residual_fit.residuals, dtype=float)
new_rms = float(residual_fit.rms)
log.debug(
f"Iter {it}: rejected={best_y.size - kept_count}, rms {best_rms:.6g} -> {new_rms:.6g}"
)
# Accept only if RMS improves.
if np.isfinite(new_rms) and new_rms < best_rms:
best_x = new_x
best_y = new_y
best_res = new_res
best_rms = new_rms
best_spline = spline_y
best_max_spline = max_spline
best_names = new_names
else:
break
points_deleted = int(n - best_y.size)
if best_names is None:
return IterativeCleaningResult(
x=best_x,
y=best_y,
rms=float(best_rms) if np.isfinite(best_rms) else float("nan"),
residual=best_res,
spline_max=float(best_max_spline),
spline=best_spline,
points_deleted=points_deleted,
flag=29,
names=best_names.tolist() if best_names is not None else None,
message="No names/dates given for iterative clean",
) #best_x, best_y, best_rms, best_res, best_max_spline, best_spline, points_deleted
log.debug(
f"Iterative cleaning removed {points_deleted} points. Remaining={best_names.size}"
)
return IterativeCleaningResult(
x=best_x,
y=best_y,
rms=float(best_rms) if np.isfinite(best_rms) else float("nan"),
residual=best_res,
spline_max=float(best_max_spline),
spline=best_spline,
points_deleted=points_deleted,
flag=0,
names=best_names.tolist() if best_names is not None else None,
message="OK",
)
[docs]
def calc_residual(model: np.ndarray,
data: np.ndarray,
log:logging.Logger) -> ResidualFit:
"""
Calculate the residual and rms between the model and the data.
Parameters:
model (array): 1D array containing the model data
data (array): 1D array containing the raw data
log (object): file logging object
Returns
-------
res: 1d array
the residual
rms: int
the rms value
"""
log.debug('Calculating residual error and rms between model and data')
res = (model - data)
res = np.asarray(model - data, dtype=float)
rms = float(np.sqrt(np.nanmean(res ** 2)))
return ResidualFit(residuals=res,
rms=rms)