# =========================================================================== #
# File: pipeline.py #
# Author: Pfesesani V. van Zyl #
# Email: pfesi24@gmail.com #
# =========================================================================== #
# Library imports
# --------------------------------------------------------------------------- #
import logging
from typing import Optional,Any
import numpy as np
from dataclasses import dataclass
from dran.utils.config import ProjectPaths
from dran.fitting.config import DualBeamQCConfig
from dran.fitting.baseline_correction import correct_baseline_linear
from dran.fitting.baseline_windows import build_baseline_windows, filter_invalid_minima
from dran.fitting.gaussian_fit import fit_gaussian_test
from dran.fitting.models import BeamFitResult, DualBeamFitResult, ScanQualityResult, DualBeamQCResult
from dran.fitting.plotting import plot_diagnostics, plot_fail
from dran.fitting.peak_fitting import fit_quadratic_peak, calc_residual, peak_location_index, peak_window_indices
from dran.fitting.rfi_cleaning import clean_data
from dran.fitting.validation import validate_xy
from dran.fitting.plot_qc import check_scan_quality
# =========================================================================== #
[docs]
def find_step_locations(y: np.ndarray, sigma: float, sigma_mult: float
) -> np.ndarray:
"""Identify step change locations in a signal.
Finds indices where the absolute first difference exceeds a multiple
of the noise level.
"""
if y.size < 3 or sigma <= 0.0:
return np.array([], dtype=int)
dy = np.diff(y)
idx = np.where(np.abs(dy) > (sigma_mult * sigma))[0]
return idx
[docs]
def plot_dual_beam_final(
*,
x: np.ndarray,
y: np.ndarray,
y_corr: np.ndarray,
a_idx: np.ndarray,
b_idx: np.ndarray,
a_model_x: np.ndarray,
a_model_y: np.ndarray,
b_model_x: np.ndarray,
b_model_y: np.ndarray,
a_label: str,
b_label: str,
qc_flag: int,
qc_reasons: list[str],
base_sigma: float,
save: str,
paths: Any,
log: Any,
) -> None:
"""Generate and save the final dual-beam diagnostic plot.
Detects step edges in the corrected scan, builds plotting series for
the corrected data, beam windows, fitted beam models, and optional step
markers, then calls plot_diagnostics to render the final QC plot.
"""
sigma = base_sigma if base_sigma > 0 else float(np.std(y_corr))
step_idx = find_step_locations(y_corr, sigma, 12.0)
lines = []
lines.append(f"flag={qc_flag}")
if base_sigma > 0:
lines.append(f"base_rms={base_sigma:.4g}")
if step_idx.size:
lines.append(f"step_edges={step_idx.size}")
if qc_reasons:
lines.extend(qc_reasons[:6])
qc_text = "\n".join(lines)
series = [
{"x": x, "y": y_corr, "lab": "corrected", "fmt": "k"},
{"x": x[a_idx], "y": y_corr[a_idx], "lab": "beam A window", "fmt": ""},
{"x": x[b_idx], "y": y_corr[b_idx], "lab": "beam B window", "fmt": ""},
{"x": a_model_x, "y": a_model_y, "lab": a_label, "fmt": ""},
{"x": b_model_x, "y": b_model_y, "lab": b_label, "fmt": ""},
]
# Add step markers if present
if step_idx.size:
step_x = x[step_idx]
step_y = y_corr[step_idx]
series.append({"x": step_x, "y": step_y, "lab": "step", "fmt": "x"})
plot_diagnostics(
series,
paths,
log,
save,
plot_type="final",
suffix="_dual_beam_final.png",
# annotation=qc_text,
# annotation_loc="upper left",
)
[docs]
def robust_sigma(y: np.ndarray) -> float:
"""Estimate robust noise level using the median absolute deviation.
Falls back to standard deviation for small samples and returns zero for
empty input.
"""
if y.size < 5:
return float(np.std(y)) if y.size else 0.0
med = float(np.median(y))
mad = float(np.median(np.abs(y - med)))
return 1.4826 * mad
[docs]
def step_metrics(y: np.ndarray, sigma: float, sigma_mult: float
) -> tuple[float, int]:
"""Compute step change metrics for a signal.
Returns the maximum absolute step size and the count of steps exceeding
a noise-scaled threshold.
"""
if y.size < 3:
return 0.0, 0
dy = np.diff(y)
if sigma <= 0.0:
return float(np.max(np.abs(dy))) if dy.size else 0.0, 0
threshold = sigma_mult * sigma
step_count = int(np.sum(np.abs(dy) > threshold))
step_max = float(np.max(np.abs(dy))) if dy.size else 0.0
return step_max, step_count
[docs]
def max_gap_fraction(x: np.ndarray) -> float:
"""Compute the largest sampling gap relative to the median spacing.
Returns the ratio of the maximum gap to the median step size, or zero
for insufficient or degenerate input.
"""
if x.size < 3:
return 0.0
dx = np.diff(np.sort(x))
med = float(np.median(dx)) if dx.size else 0.0
if med <= 0.0:
return 0.0
max_gap = float(np.max(dx))
return max_gap / med
[docs]
def edge_distance_fraction(
peak_x: float,
window_left: float,
window_right: float,
) -> float:
"""Measure normalized distance of a peak from the window edges.
Returns the minimum distance to either edge divided by the window
width, with a fallback value for invalid windows.
"""
width = window_right - window_left
if width <= 0:
return 1.0
d_left = abs(peak_x - window_left)
d_right = abs(window_right - peak_x)
d_edge = min(d_left, d_right)
return d_edge / width
[docs]
def compute_dual_beam_qc(
x: np.ndarray,
y_corrected: np.ndarray,
baseline_sigma: float,
a_idx: np.ndarray,
b_idx: np.ndarray,
a_peak: float,
a_peak_x: float,
a_resid: np.ndarray,
b_peak: float,
b_peak_x: float,
b_resid: np.ndarray,
a_window: tuple[float, float],
b_window: tuple[float, float],
cfg: DualBeamQCConfig,
) -> DualBeamQCResult:
"""Evaluate dual-beam scan quality and return QC diagnostics.
Computes robust noise, step discontinuities, sampling gaps, per-beam
point counts, residual RMS, SNR, amplitude balance, edge proximity,
and peak sign consistency. Produces a QC flag and reason list, plus
summary metrics used for logging and plotting.
"""
reasons: list[str] = []
noise_sigma = robust_sigma(y_corrected) if baseline_sigma <= 0 else baseline_sigma
step_max, step_count = step_metrics(y_corrected, noise_sigma, cfg.step_sigma_mult)
gap_frac = max_gap_fraction(x)
if step_max > cfg.step_sigma_mult * noise_sigma:
reasons.append(f"large step discontinuity. step_max={step_max:.4g}, sigma={noise_sigma:.4g}")
if step_count > cfg.step_count_max:
reasons.append(f"too many step edges. step_count={step_count}")
if gap_frac > (1.0 + cfg.max_gap_frac):
reasons.append(f"large gap in x sampling. gap_frac={gap_frac:.3f}")
if a_idx.size < cfg.min_points_per_beam:
reasons.append(f"too few points in A beam. n={a_idx.size}")
if b_idx.size < cfg.min_points_per_beam:
reasons.append(f"too few points in B beam. n={b_idx.size}")
resid_rms_a = float(np.sqrt(np.mean(a_resid * a_resid))) if a_resid.size else float("inf")
resid_rms_b = float(np.sqrt(np.mean(b_resid * b_resid))) if b_resid.size else float("inf")
if baseline_sigma > 0.0:
if resid_rms_a / baseline_sigma > cfg.max_resid_to_baseline_rms:
reasons.append(f"A fit residual too large. resid_rms={resid_rms_a:.4g}, base_rms={baseline_sigma:.4g}")
if resid_rms_b / baseline_sigma > cfg.max_resid_to_baseline_rms:
reasons.append(f"B fit residual too large. resid_rms={resid_rms_b:.4g}, base_rms={baseline_sigma:.4g}")
snr_a = abs(a_peak) / baseline_sigma if baseline_sigma > 0 else 0.0
snr_b = abs(b_peak) / baseline_sigma if baseline_sigma > 0 else 0.0
if snr_a < cfg.min_snr:
reasons.append(f"low SNR on A. snr={snr_a:.2f}")
if snr_b < cfg.min_snr:
reasons.append(f"low SNR on B. snr={snr_b:.2f}")
if abs(a_peak) < 0.5*abs(b_peak):
reasons.append('Peak A is less than 50% peak B')
if abs(b_peak) < 0.5*abs(a_peak):
reasons.append('Peak B is less than 50% peak A')
amp_ratio = (abs(a_peak) / max(abs(b_peak), 1e-12)) if b_peak != 0 else float("inf")
if amp_ratio > cfg.max_amp_ratio or amp_ratio < (1.0 / cfg.max_amp_ratio):
reasons.append(f"amplitude imbalance. ratio={amp_ratio:.2f}")
# a_edge = edge_distance_fraction(a_peak_x, a_window[0], a_window[1])
# b_edge = edge_distance_fraction(b_peak_x, b_window[0], b_window[1])
# if a_edge < cfg.max_edge_frac:
# reasons.append("A peak too close to window edge")
# if b_edge < cfg.max_edge_frac:
# reasons.append("B peak too close to window edge")
# Opposite-sign requirement for dual beam
if np.sign(a_peak) == np.sign(b_peak):
reasons.append("A and B peaks have same sign")
is_bad = len(reasons) > 0
# Flag mapping. Keep your existing numeric scheme, but make it consistent.
# 0 ok
# 60 QC bad scan
# 61 strong step
flag = 0
if is_bad:
flag = 60
if any("step discontinuity" in r for r in reasons):
flag = 61
return DualBeamQCResult(
is_bad=is_bad,
flag=flag,
reasons=reasons,
step_max=step_max,
step_count=step_count,
max_gap_frac=gap_frac,
snr_a=float(snr_a),
snr_b=float(snr_b),
resid_rms_a=resid_rms_a,
resid_rms_b=resid_rms_b,
)
[docs]
def dual_beam_qc_to_scan_quality(qc: DualBeamQCResult) -> ScanQualityResult:
metrics = {
"step_max": qc.step_max,
"step_count": qc.step_count,
"max_gap_frac": qc.max_gap_frac,
"snr_a": qc.snr_a,
"snr_b": qc.snr_b,
"resid_rms_a": qc.resid_rms_a,
"resid_rms_b": qc.resid_rms_b,
"reasons": list(qc.reasons),
}
return ScanQualityResult(
ok=not qc.is_bad,
flag=qc.flag,
message="OK" if not qc.is_bad else " | ".join(qc.reasons),
metrics=metrics,
)
[docs]
def fit_scan(
x: np.ndarray,
y: np.ndarray,
band: str,
hfnbw: float,
hhpbw: float,
force: bool,
log: logging.Logger,
save: str,
theofit: str,
autofit: str,
paths:ProjectPaths,
) -> BeamFitResult:
"""
Refactored pipeline wrapper around the existing logic.
Long, scan-specific heuristics stay in this file, while reusable parts live in modules.
This version removes sys.exit calls. It returns BeamFitResult with flag and message.
"""
result = BeamFitResult()
# print('validate: ',x,y)
try:
validate_xy(x, y, log)
except ValueError as exc:
flag=1
msg='Failed to validate data'
result.flag = flag
result.message = str(exc)
if len(x)!=len(y):
if len(x)>len(y):
y = np.full_like(x, y)
else:
x = np.full_like(y, x)
plot_fail(x,y , paths, msg, log, save, flag, fmt="red")
else:
plot_fail(x, y, paths, msg, log, save, flag, fmt="red")
return result
plot_diagnostics([{"x": x, "y": y, "lab": "raw", "fmt": ""}],
paths,
log,
save,
suffix="_raw.png")
# Quick Gaussian gate
p0 = [float(np.max(y)), float(np.mean(x)), float(hhpbw * 2.0), 0.01, 0.0]
_, _, gauss_flag = fit_gaussian_test(x, y, p0, log)
if gauss_flag is not None:
result.flag = 8
result.message = "Gaussian test fit failed"
plot_fail(x, y,paths, result.message, log, save, flag=result.flag, fmt="orange")
return result
# RFI cleaning
clean = clean_data(x, y, log)
result.clean_rms = clean.rms
# Derivative step masking (kept from your flow, packaged here)
dy = np.diff(clean.y)
std = float(np.std(dy)) if dy.size else 0.0
threshold = 2.0 * std
step_idx = np.where(np.abs(dy) >= threshold)[0]
mask = np.zeros(len(dy), dtype=bool)
mask[step_idx] = True
clean2 = clean_data(clean.x[:-1][~mask], clean.y[:-1][~mask], log)
plot_diagnostics(
[
{"x": clean.x, "y": clean.y, "lab": "cleaned", "fmt": ""},
{"x": clean2.x, "y": clean2.y, "lab": "cleaned_new", "fmt": ""},
],
paths,
log,
save,
suffix="_cleaned_new.png",
)
clean = clean2
# # Quick Gaussian gate after clean
# p0 = [float(np.max(clean.y)), float(np.mean(clean.x)), float(hhpbw * 2.0), 0.01, 0.0]
# _, _, gauss_flag = fit_gaussian_test(clean.x, clean.y, p0, log)
# if gauss_flag is not None:
# plot_fail(clean.x, clean.y,paths, "Gaussian test fit failed", log, save, flag=5, fmt="orange")
# result.flag = 5
# result.message = "Gaussian test fit failed"
# return result
# Baseline point discovery via spline extrema
if clean.spline is None:
msg= "Failed to spline data"
result.message =msg
result.flag = 11
plot_fail(x, y, paths,msg, log, save, flag=result.flag, fmt="tomato")
return result
# print(result)
# sys.exit()
else:
local_min = (np.diff(np.sign(np.diff(clean.spline))) > 0).nonzero()[0] + 1
local_max = (np.diff(np.sign(np.diff(clean.spline))) < 0).nonzero()[0] + 1
max_points = 50
local_min = filter_invalid_minima(local_max, local_min, max_points=max_points, log=log)
if local_min.size == 0 and local_max.size == 0:
result.flag = 12
result.message = "Failed to locate minima and maxima"
plot_fail(x, y, paths,result.message, log, save, flag=result.flag, fmt="coral")
return result
# Spline peak and beam markers
spline_peak_idx = int(np.argmax(clean.spline))
x_spline_peak = float(clean.x[spline_peak_idx])
xhpbw_left = x_spline_peak - hhpbw
xhpbw_right = x_spline_peak + hhpbw
xfnbw_left = x_spline_peak - hfnbw
xfnbw_right = x_spline_peak + hfnbw
if not (-hfnbw <= x_spline_peak <= hfnbw):
result.flag = 13
result.message = "Peak not within expected half-first-null width"
plot_fail(x, y, paths,result.message, log, save, flag=result.flag, fmt="m")
return result
# print(clean.x, xhpbw_left, xhpbw_right,xfnbw_left )
left_hpbw_idx = int(np.where(clean.x >= xhpbw_left)[0][0])
try:
right_hpbw_idx = int(np.where(clean.x >= xhpbw_right)[0][0])
except:
log.debug('Right hpbw couldnt be estimated')
result.flag = 14
result.message = "Right hpbw estimation failed"
plot_fail(x, y, paths,"Right hpbw estimation failed", log, save, flag=result.flag, fmt="m")
return result
left_fnbw_idx = int(np.where(clean.x >= xfnbw_left)[0][0])
right_fnbw_idx: Optional[int]
rr = np.where(clean.x >= xfnbw_right)[0]
right_fnbw_idx = int(rr[0]) if rr.size else None
# Split minima by side
left_mins = [int(v) for v in local_min if clean.x[int(v)] < xhpbw_left]
right_mins = [int(v) for v in local_min if clean.x[int(v)] > xhpbw_right]
baseline_indices, base_flag = build_baseline_windows(
x=clean.x,
left_mins=left_mins,
right_mins=right_mins,
left_hpbw_idx=left_hpbw_idx,
right_hpbw_idx=right_hpbw_idx,
left_fnbw_idx=left_fnbw_idx,
right_fnbw_idx=right_fnbw_idx,
max_points=max_points,
log=log,
)
y_corrected, y_corr_spline, base_coeffs, base_res, base_rms = correct_baseline_linear(
x=clean.x,
y=clean.y,
baseline_indices=baseline_indices,
log=log,
)
result.baseline_coeffs = base_coeffs
result.baseline_residual = base_res
result.baseline_rms = float(base_rms)
result.baseline_window_indices = baseline_indices
result.y_corrected = y_corrected
result.y_corrected_spline = y_corr_spline
plot_diagnostics(
[
{"x": clean.x, "y": y_corrected, "lab": "corrected", "fmt": ""},
{"x": clean.x[baseline_indices], "y": y_corrected[baseline_indices], "lab": "base locs", "fmt": "."},
{"x": clean.x, "y": y_corr_spline, "lab": "splined", "fmt": ""},
],
paths,
log,
save,
suffix="_corrected.png",
)
# Peak selection and quadratic fit
# max_spline = float(np.max(y_corr_spline))
# spline_peak2_idx = int(np.argmax(y_corr_spline))
# x_spline_peak2 = float(clean.x[spline_peak2_idx])
# Cut fraction heuristic preserved
cut_fraction = 0.7
fx_idx = peak_window_indices(clean.x, y_corr_spline, hfnbw=hfnbw, cut_fraction=cut_fraction)
if fx_idx.size == 0:
result.flag = 24
result.message = "No peak points selected"
plot_fail(x, y, paths,result.message, log, save, flag=result.flag, fmt="darkorange")
return result
px = clean.x[fx_idx]
py = y_corrected[fx_idx]
peak_coeffs, peak_model, peak_rms = fit_quadratic_peak(px, py, log)
# print('coeffs: ', peak_coeffs)
if len(peak_coeffs)==0:
result.flag = 60
result.message = "Peak coeffs = 0, failed peak fit"
plot_fail(x, y, paths,result.message, log, save, flag=result.flag, fmt="r")
return result
is_concave_down = np.isfinite(peak_coeffs[0]) and (peak_coeffs[0] < 0.0)
if not is_concave_down:
result.flag = 25
result.message = "Failed to fit peak accurately"
plot_fail(x, y, paths,result.message, log, save, flag=5, fmt="sienna")
return result
peak_loc = peak_location_index(peak_model)
ta_peak = float(np.max(peak_model)) if peak_model.size else float("nan")
rel_error = (peak_rms / ta_peak) * 100.0 if ta_peak and np.isfinite(ta_peak) else float("inf")
if not np.isfinite(rel_error) or rel_error >= 100.0:
result.flag = 23
result.message = "Peak fit relative error too high"
plot_fail(x, y, paths,result.message, log, save, flag=result.flag, fmt="r")
# result.qc={}
return result
result.peak_coeffs = peak_coeffs
result.peak_model = peak_model
result.ta_peak = ta_peak
result.ta_peak_err = float(peak_rms)
result.peak_loc_index = int(peak_loc)
result.flag = 0
result.message = "OK"
# apply qc check only if you want strict outlier detection
qc=check_scan_quality(
x=clean.x,
y_corrected=y_corrected,
y_spline=y_corr_spline,
hfnbw=hfnbw,
hhpbw=hhpbw,
peak_coeffs=peak_coeffs,
peak_x=px,
peak_y=py,
peak_model=peak_model,
band=band
)
result.qc=qc
log.info(f"T$_A$ [K]: {ta_peak:.3f} +- {peak_rms:.3f}")
if qc.ok==False:
log.warning("QC failed: %s", qc.message)
result.flag = qc.flag
result.message = qc.message
if qc.flag==34:
c = "darkmagenta"
elif qc.flag==35:
c = "darkorange"
# elif qc.flag==102:
# c = "indigo"
# elif qc.flag==103:
# c = "indigo"
elif qc.flag==36:
c = "darkblue"
elif qc.flag==37:
c = "darkgreen"
elif qc.flag==38:
c = "crimson"
elif qc.flag==39:
c = "navy"
elif qc.flag==40:
c = "r"
elif qc.flag==41:
c = "steelblue"
elif qc.flag==42:
c = "salmon"
elif qc.flag==43:
c = "violet"
elif qc.flag==44:
c = "darkolivegreen"
# elif qc.flag==211:
# c = "y"
# elif qc.flag==212:
# c = "grey"
# elif qc.flag==213:
# c = "grey"
plot_fail(clean.x,y_corrected, paths,qc.message, log, save, flag=5, fmt=c)
result.qc = qc
return result
plot_diagnostics(
[
{"x": clean.x, "y": y_corrected, "lab": "corrected", "fmt": "k"},
{"x": px, "y": py, "lab": "peak", "fmt": ""},
{"x": px, "y": peak_model, "lab": f"T$_A$ [K]: {ta_peak:.3f} +- {peak_rms:.3f}", "fmt": ""},
],
paths,
log,
save,
suffix="_corrected_final.png",
)
plot_diagnostics(
[
{"x": clean.x, "y": y_corrected, "lab": "corrected", "fmt": "k"},
{"x": px, "y": py, "lab": "peak", "fmt": ""},
{"x": px, "y": peak_model, "lab": f"T$_A$ [K]: {ta_peak:.3f} +- {peak_rms:.3f}", "fmt": ""},
],
paths,
log,
save,
plot_type="final",
suffix="_corrected_final.png",
)
return result
[docs]
def fit_scan_db(
x: np.ndarray,
y: np.ndarray,
band: str,
hfnbw: float,
hhpbw: float,
force: bool,
log: logging.Logger,
save: str,
theofit: str,
autofit: str,
factor:int,
paths:ProjectPaths,
) -> DualBeamFitResult:
"""
Refactored pipeline wrapper around the existing logic.
Long, scan-specific heuristics stay in this file, while reusable parts live in modules.
This version removes sys.exit calls. It returns DualBeamFitResult with flag and message.
"""
result = DualBeamFitResult()
cfg = DualBeamQCConfig(
step_sigma_mult=12.0,
step_count_max=6,
min_points_per_beam=20,
min_snr=2.0 ,#3.0,
max_resid_to_baseline_rms=2.5,
)
# try:
# print(len(x),len(y))
# validate_xy(x, y, log)
# except ValueError as exc:
# result.flag = 1
# result.message = str(exc)
# return result
plot_diagnostics([{"x": x, "y": y, "lab": "raw", "fmt": ""}],
paths,log, save, suffix="_raw.png")
# RFI cleaning
clean = clean_data(x, y, log)
result.clean_rms = clean.rms
# from matplotlib import pyplot as plt
# plt.plot(clean.x,clean.y)
# plt.show()
# sys.exit()
# Derivative step masking (kept from your flow, packaged here)
dy = np.diff(clean.y)
std = float(np.std(dy)) if dy.size else 0.0
threshold = 2.0 * std
step_idx = np.where(np.abs(dy) >= threshold)[0]
mask = np.zeros(len(dy), dtype=bool)
mask[step_idx] = True
if clean.rms > 1:
log.warning("High rms: %s", clean.rms)
msg=f"High rms: {clean.rms}"
result.flag=2
plot_fail(x, y, paths,msg, log, save, flag=result.flag, fmt="r")
result.message = msg
log.debug(msg)
# sys.exit()
return result
clean2 = clean_data(clean.x[:-1][~mask], clean.y[:-1][~mask], log)
plot_diagnostics(
[
{"x": clean.x, "y": clean.y, "lab": "cleaned", "fmt": ""},
{"x": clean2.x, "y": clean2.y, "lab": "cleaned_new", "fmt": ""},
],
paths,
log,
save,
suffix="_cleaned_new.png",
)
clean = clean2
# Baseline point discovery via spline extrema
if clean.spline is None:
msg="Missing spline"
result.flag=3
plot_fail(x, y, paths,msg, log, save, flag=result.flag, fmt="tomato")
result.message = msg
log.debug(msg)
# sys.exit()
return result
scanLen = len(clean.x)
midLeftLoc = int(scanLen/4) # estimated location of peak on left beam
midRightLoc = midLeftLoc * 3 # estimated location of peak on right beam
fl = 0 # failed left gaussian fit
fr = 0 # failed right gaussian fit
# LOCATE BASELINE BLOCKS
# we don't worry about sidelobes here so baseline
# blocks are set to edges or fnbw points
ptLimit = int(scanLen*0.04) # Point limit, number of allowed points per base block
baseLocsLeft = np.arange(0,ptLimit,1)
baseLocsRight = np.arange(scanLen-ptLimit,scanLen,1)
baseline_indices = list(baseLocsLeft)+list(baseLocsRight)
if len(baseLocsLeft) == 0 or len(baseLocsRight) == 0:
msg = "failed to locate base locs"
log.error(msg)
result.flag = 4
plot_fail(x, y,paths,msg, log, save, flag=result.flag, fmt="orange")
result.message = msg
return result
log.debug(f'BaseLocsLeft: {baseLocsLeft[0]} to {baseLocsLeft[-1]} = {len(baseLocsLeft)} pts')
log.debug(f'BaseLocsLeft: {baseLocsRight[0]} to {baseLocsRight[-1]} = {len(baseLocsRight)} pts')
# correct drift
y_corrected, y_corr_spline, base_coeffs, base_res, base_rms = correct_baseline_linear(
x=clean.x,
y=clean.y,
baseline_indices=baseline_indices,
log=log,
)
result.baseline_coeffs = base_coeffs
result.baseline_residuals = base_res
result.baseline_rms = float(base_rms)
result.baseline_window_indices = baseline_indices
result.y_corrected = y_corrected
result.y_corrected_spline = y_corr_spline
plot_diagnostics(
[
{"x": clean.x, "y": y_corrected, "lab": "corrected", "fmt": ""},
{"x": clean.x[baseline_indices], "y": y_corrected[baseline_indices], "lab": "base locs", "fmt": "."},
{"x": clean.x, "y": y_corr_spline, "lab": "splined", "fmt": ""},
],
paths,
log,
save,
suffix="_corrected.png",
)
# Spline the data and get global max/min
ysplmaxloc = np.argmax(y_corr_spline)
ysplminloc = np.argmin(y_corr_spline)
ysplmax = y_corr_spline[ysplmaxloc]
ysplmin = y_corr_spline[ysplminloc]
plot_diagnostics(
[
{"x": clean.x, "y": y_corrected, "lab": "corrected", "fmt": ""},
{"x": clean.x[baseline_indices], "y": y_corrected[baseline_indices], "lab": "base locs", "fmt": "."},
{"x": clean.x, "y": y_corr_spline, "lab": "splined", "fmt": ""},
{"x": clean.x[ysplmaxloc], "y": ysplmax, "lab": "top peak", "fmt": "*"},
{"x": clean.x[ysplminloc], "y": ysplmin, "lab": "bottom peak", "fmt": "*"},
],
paths,
log,
save,
suffix="_corrected_peaks.png",
)
# A/B BEAM DATA PROCESSING
factoredfnbw = (hfnbw*2)*factor # fnbw multiplied by factor
AbeamScan = np.where(np.logical_and(clean.x > -factoredfnbw, clean.x < 0))[0]
BbeamScan = np.where(np.logical_and(clean.x > 0, clean.x < factoredfnbw))[0]
if len(AbeamScan) == 0 or len(BbeamScan) == 0:
msg="A/B beam scan data == 0, no data"
log.error(msg)
result.flag = 5
result.message = msg
plot_fail(x, y,paths,msg, log, save, flag=result.flag, fmt="orange")
return result
log.debug("- Beam seperation:")
log.debug("Left beam indeces: {} to {}".format(AbeamScan[0], AbeamScan[-1]))
log.debug("Right beam indeces: {} to {}\n".format(
BbeamScan[0], BbeamScan[-1]))
log.debug("- Data Limits")
log.debug("base left: {}, drift A left: {}, peak A: {}, drift A right: {}".format(baseLocsLeft[-1], AbeamScan[0], ysplminloc, AbeamScan[-1]))
log.debug("drift B left: {}, peak B: {}, drift B right: {}, base right: {}\n".format(
BbeamScan[0], ysplmaxloc, BbeamScan[-1], baseLocsRight[0]))
# figure out orientation of beam. With some scans the beams
# are flipped e.g, pks2326-502, A beam is positive, whereas
# j0450-81 A beam is negative.
# find value closest to zero and use the other value to determine
# which side to fit positive/negative peak
lstA=[min(y_corr_spline[AbeamScan]), max(y_corr_spline[AbeamScan])]
minA = min(lstA,key=abs)
fa=""
if minA==min(y_corr_spline[AbeamScan]):
# fit A beam positive B beam negative
log.debug("Fitting positive")
fa="p"
else:
# fit A beam negative B beam positive
log.debug("Fitting negative")
fa="n"
# TODO: should make this an option
# Decided to change this because the software now treats both
# target sources and calibrators the same way
beamCut = 0.6 # fitting top 40%, 0.7 (30% for cals) or 0.5 (50% for targets)
# Ensure peak is within accepted limits
if fa=="p":
if ysplmaxloc > AbeamScan[-1]:# or ysplminloc > beamScan[0]:
msg = "Max is beyond left baseline block"
log.error(msg)
result.flag = 6
plot_fail(x, y,paths,msg, log, save, flag=result.flag, fmt="orange")
result.message = msg
return result
if ysplminloc < BbeamScan[0]:#baseLocsRight[0]: # ysplminloc < BbeamScan[0] or
msg="Min is beyond right baseline block"
log.error(msg)
result.flag = 7
plot_fail(x, y,paths,msg, log, save, flag=result.flag, fmt="orange")
result.message = msg
return result
# Try to fit a gaussian to determine peak parameters
# try:
# Quick Gaussian gate
# set initial parameters for data fitting
p0 = [float(np.max(y_corrected)), float(clean.x[midLeftLoc]), float(hhpbw * 2.0), 0.01, 0.0]
# Try to fit a gaussian to determine peak location
# this also works as a bad scan filter
_, _, gauss_flag = fit_gaussian_test(clean.x[AbeamScan], y_corrected[AbeamScan], p0, log)
if gauss_flag is not None:
result.flag = 8
result.message = "Gaussian test fit failed"
plot_fail(x, y, paths,result.message , log, save, flag=result.flag, fmt="orange")
fitLeft = y[AbeamScan]
fl=1
# return result
# set initial parameters for data fitting
p = [float(min(y_corrected)), float(clean.x[midRightLoc]), float(hhpbw * 2.0), 0.01, 0.0]
_, _, gauss_flag = fit_gaussian_test(clean.x[BbeamScan], y_corrected[BbeamScan], p, log)
# coeffr, fitr, flagr = test_gauss_fit(x[AbeamScan], y[AbeamScan], p,log)
if gauss_flag is not None:
# fr = 1
# fitRight = y[BbeamScan]
result.flag = 8
result.message = "gaussian curve_fit algorithm failed"
plot_fail(x, y, paths,result.message , log, save, flag=result.flag, fmt="tomato")
# fitLeft = y[AbeamScan]
# fl=1
# return result
# Determine peak fitting location
BbeamLeftLimit = clean.x[ysplminloc]-hhpbw #coeffl[1] - hhpbw # *2*.6
BbeamRightLimit = clean.x[ysplminloc]+hhpbw
AbeamLeftLimit = clean.x[ysplmaxloc]-hhpbw #coeffl[1] - hhpbw # *2*.6
AbeamRightLimit = clean.x[ysplmaxloc]+hhpbw
leftlocs = np.where(np.logical_and(
clean.x >= AbeamLeftLimit, clean.x <= AbeamRightLimit))[0]
rightlocs = np.where(np.logical_and(
clean.x >= BbeamLeftLimit, clean.x <= BbeamRightLimit))[0]
# plt.plot(clean.x,y_corrected)
# plt.plot(clean.x[leftlocs],y_corrected[leftlocs])
# plt.plot(clean.x[rightlocs],y_corrected[rightlocs])
# plt.show()
# sys.exit()
# select part of beam to fit
if ysplmax < 0.1:
flag = 9
msg = "fit entire left beam, max yspl < 0.1"
log.warning(msg)
leftMainBeamLocs = leftlocs
else:
topCut = np.where(y_corr_spline[leftlocs] >= ysplmax*beamCut)[0]
leftMainBeamLocs = leftlocs[0]+np.array(topCut)
if ysplmin > -0.1:
flag = 10
msg = "fit entire right beam, min yspl > -0.1"
log.warning(msg)
rightMainBeamLocs = rightlocs
else:
# try:
bottomCut = np.where(y_corr_spline[rightlocs] <= ysplmin*beamCut)[0]
rightMainBeamLocs = rightlocs[0]+np.array(bottomCut)
# except IndexError:
# msg="Index out of bound error, bottomCut"
# log.warning(msg)
# flag = 3
# result.flag = flag
# result.message = msg
# plot_fail(x, y, msg, log, save, flag, fmt="tomato")
# return result
if fa=="n":
if ysplmaxloc < AbeamScan[-1]:# or ysplminloc > beamScan[0]:
msg = "Max is beyond left baseline block"
log.error(msg)
result.flag = 26
plot_fail(x, y,paths,msg, log, save, flag=result.flag, fmt="orange")
result.message = msg
return result
if ysplminloc > BbeamScan[0]:#baseLocsRight[0]: # ysplminloc < BbeamScan[0] or
msg="Min is beyond right baseline block"
log.error(msg)
result.flag = 27
plot_fail(x, y,paths,msg, log, save, flag=result.flag, fmt="orange")
result.message = msg
return result
# Try to fit a gaussian to determine peak parameters
# try:
# Quick Gaussian gate
# set initial parameters for data fitting
p0 = [float(np.min(y_corrected)), float(clean.x[midLeftLoc]), float(hhpbw * 2.0), 0.01, 0.0]
# Try to fit a gaussian to determine peak location
# this also works as a bad scan filter
_, _, gauss_flag = fit_gaussian_test(clean.x[AbeamScan], y_corrected[AbeamScan], p0, log)
if gauss_flag is not None:
result.flag = 8
result.message = "Gaussian test fit failed"
plot_fail(x, y, paths,result.message, log, save, flag=result.flag, fmt="orange")
# fitLeft = y[AbeamScan]
# fl=1
return result
# set initial parameters for data fitting
p = [float(max(y_corrected)), float(clean.x[midRightLoc]), float(hhpbw * 2.0), 0.01, 0.0]
_, _, gauss_flag = fit_gaussian_test(clean.x[BbeamScan], y_corrected[BbeamScan], p, log)
# coeffr, fitr, flagr = test_gauss_fit(x[AbeamScan], y[AbeamScan], p,log)
if gauss_flag is not None:
# fr = 1
msg = "gaussian curve_fit algorithm failed"
# msg_wrapper("debug", log.debug, msg)
# fitRight = y[BbeamScan]
result.flag = 8
result.message = "Gaussian test fit failed"
# fitLeft = y[AbeamScan]
# fl=1
plot_fail(x, y, paths,result.message, log, save, flag=result.flag, fmt="tomato")
return result
# Determine peak fitting location
AbeamLeftLimit = clean.x[ysplminloc]-hhpbw #coeffl[1] - hhpbw # *2*.6
AbeamRightLimit = clean.x[ysplminloc]+hhpbw
BbeamLeftLimit = clean.x[ysplmaxloc]-hhpbw #coeffl[1] - hhpbw # *2*.6
BbeamRightLimit = clean.x[ysplmaxloc]+hhpbw
leftlocs = np.where(np.logical_and(
clean.x >= AbeamLeftLimit, clean.x <= AbeamRightLimit))[0]
rightlocs = np.where(np.logical_and(
clean.x >= BbeamLeftLimit, clean.x <= BbeamRightLimit))[0]
# plt.plot(clean.x,y_corrected)
# plt.plot(clean.x[leftlocs],y_corrected[leftlocs])
# plt.plot(clean.x[rightlocs],y_corrected[rightlocs])
# plt.show()
# print('testing')
# sys.exit()
# select part of beam to fit
if ysplmax < 0.1:
flag = 9
msg = "fit entire left beam, max yspl < 0.1"
log.warning(msg)
rightMainBeamLocs = rightlocs
else:
# try:
topCut = np.where(y_corr_spline[rightlocs] >= ysplmax*beamCut)[0]
rightMainBeamLocs = rightlocs[0]+np.array(topCut)
# except IndexError:
# msg="Index out of bound error, topCut"
# log.warning(msg)
# flag = 3
# result.flag = flag
# result.message = msg
# plot_fail(x, y, paths,msg, log, save, flag, fmt="tomato")
# return result
if ysplmin > -0.1:
flag = 10
msg = "fit entire right beam, min yspl > -0.1"
log.warning(msg)
leftMainBeamLocs = leftlocs
else:
# try:
bottomCut = np.where(y_corr_spline[rightlocs] <= ysplmin*beamCut)[0]
leftMainBeamLocs = leftlocs[0]+np.array(bottomCut)
# except IndexError:
# msg="Index out of bound error, bottomCut"
# log.warning(msg)
# flag = 3
# result.flag = flag
# result.message = msg
# plot_fail(x, y, paths,msg, log, save, flag, fmt="tomato")
# return result
if len(clean.x[leftMainBeamLocs]) == 0:
# fr = 1
# # msg_wrapper("debug", log.debug, msg)
# fitRight = y[BbeamScan]
result.flag = 21
result.message = "Failed to find left main beam locs"
# fitLeft = y[AbeamScan]
# fl=1
plot_fail(x, y, paths,result.message, log, save, result.flag, fmt="tomato")
return result
# fit left peak
ypeakl = np.polyval(
np.polyfit(clean.x[leftMainBeamLocs], y_corrected[leftMainBeamLocs], 2), clean.x[leftMainBeamLocs])
fitResl, err_peakl = calc_residual(y_corrected[leftMainBeamLocs], ypeakl)
# fit right peak
# try:
ypeakr = np.polyval(np.polyfit(
clean.x[rightMainBeamLocs], y_corrected[rightMainBeamLocs], 2), clean.x[rightMainBeamLocs])
fitResr, err_peakr = calc_residual(y_corrected[rightMainBeamLocs], ypeakr)
# except:
# msg="Failed to fit right peak"
# log.warning(msg)
# flag = 3
# result.flag = flag
# result.message = msg
# plot_fail(x, y, paths,msg, log, save, flag, fmt="tomato")
# return result
if fa=="p":
ymin = min(ypeakr)
ymax = max(ypeakl)
else:
ymin = min(ypeakl)
ymax = max(ypeakr)
log.debug("A/B beam peak")
if fa=="p":
log.debug( "left: {:.3f}, max: {:.3f}, right: {:.3f}".format(
ypeakl[0], ymax, ypeakl[-1]))
log.debug("left: {:.3f}, min: {:.3f}, right{:.3f}".format(
ypeakr[0], ymin, ypeakr[-1]))
# ypeakrdata = clean.x[rightMainBeamLocs]
# ypeakldata = clean.x[leftMainBeamLocs]
# check data doesn't overlap
overlapRight = set(baseLocsRight) & set(rightMainBeamLocs)
overlapLeft = set(baseLocsLeft) & set(leftMainBeamLocs)
# overlapbeams = set(leftMainBeamLocs) & set(rightMainBeamLocs)
msg=("checking for overlapping beams: ")
log.debug(msg)
if len(overlapLeft) != 0:
msg = "beams don't overlap on left"
log.debug(msg)
if leftMainBeamLocs[0] > baseLocsLeft[int(
len(baseLocsLeft)*.8)]:
pass
else:
overlap = next(iter(overlapLeft))
shift = list(leftMainBeamLocs).index(int(overlap))
msg = "Overlap found on A beam"
flag = 20
log.warning(msg)
# move beam to the left
f = abs(len(leftMainBeamLocs)-shift)
leftMainBeamLocs = abs(leftMainBeamLocs+f)
# fit left peak
ypeakl = np.polyval(np.polyfit(
clean.x[leftMainBeamLocs], y_corrected[leftMainBeamLocs], 2), clean.x[leftMainBeamLocs])
fitResl, err_peakl = calc_residual(
y_corrected[leftMainBeamLocs], ypeakl)
ymax = max(ypeakl)
msg="left: {:.3f}, max: {:.3f}, right: {:.3f}".format(
ypeakl[0], ymax, ypeakl[-1])
log.debug(msg)
if(ypeakl[0] >= ymax or ypeakl[-1] >= ymax):
ymax = np.nan
err_peakl = np.nan
else:
flag = 22
msg = "fit entire left beam"
log.debug(msg)
# msg = "failed to locate base locs"
log.error(msg)
plot_fail(x, y,paths,msg, log, save, flag=flag, fmt="orange")
result.flag = flag
result.message = msg
return result
if len(overlapRight) != 0:
overlap = next(iter(overlapRight))
shift = list(rightMainBeamLocs).index(int(overlap))
msg = "Overlap found on B beam"
flag = 19
log.warning(msg)
# move beam to the RIGHT
f = abs(len(rightMainBeamLocs)-shift)
rightMainBeamLocs = abs(rightMainBeamLocs-f)
msg="beam shifted to left by {} points".format(f)
log.debug(msg)
# fit right peak
ypeakr = np.polyval(np.polyfit(
clean.x[rightMainBeamLocs], y_corrected[rightMainBeamLocs], 2), clean.x[rightMainBeamLocs])
fitResr, err_peakr = calc_residual(
y_corrected[rightMainBeamLocs], ypeakr)
ymin = min(ypeakr)
msg="left: {:.3f}, min: {:.3f}, right{:.3f}".format(
ypeakr[0], ymin, ypeakr[-1])
log.debug(msg)
if(ypeakr[0] <= ymin or ypeakr[-1] <= ymin):
ymin = np.nan
err_peakr = np.nan
else:
flag = 18
msg = "fit entire right beam"
log.debug(msg)
# ypeakrdata = clean.x[rightMainBeamLocs]
# msg = "failed to locate base locs"
log.error(msg)
plot_fail(x, y,paths,msg, log, save, flag=flag, fmt="orange")
result.flag = flag
result.message = msg
return result
if ((x[leftMainBeamLocs])[-1] > 0):
flag = 17
msg = "left beam data goes beyond midpoint"
# msg = "failed to locate base locs"
log.warning(msg)
plot_fail(x, y,paths,msg, log, save, flag=flag, fmt="orange")
result.flag = flag
result.message = msg
return result
# msg_wrapper("warning", log.warning, msg)
if ((x[rightMainBeamLocs])[-1] < 0):
flag = 16
msg = "right beam data goes beyond midpoint"
# msg_wrapper("warning", log.warning, msg)
# msg = "failed to locate base locs"
log.error(msg)
plot_fail(x, y,paths,msg, log, save, flag=flag, fmt="orange")
result.flag = flag
result.message = msg
return result
log.info("\n")
log.info("-"*30)
log.info("Fit the peaks")
log.info("-"*30)
msg="\npeak left: {:.3f} +- {:.3f} K\npeak right: {:.3f} +- {:.3f} K\n".format(
ymax, err_peakl, ymin, err_peakr)
log.info(msg)
# plt.plot(clean.x,y_corrected)
# plt.plot(clean.x[leftlocs],y_corrected[leftlocs])
# plt.plot(clean.x[rightMainBeamLocs],y_corrected[rightMainBeamLocs])
# plt.show()
# sys.exit()
# find final peak loc
ploca = np.where(ypeakl == ymax)[0]
if len(ploca)==0:
peakLoca=np.nan
else:
peakLoca = (clean.x[leftMainBeamLocs])[ploca[0]]
# find final peak loc
plocb = np.where(ypeakr == ymin)[0]
if len(plocb)==0:
peakLocb=np.nan
else:
peakLocb = (clean.x[rightMainBeamLocs])[plocb[0]]
log.debug('fit passed')
# flag=0
# plot_fail(x, y,paths,msg, log, save, flag=30, fmt="orange")
# result.apeak_coeffs =
result.apeak_residuals = fitResl
result.ata_peak = ymax
result.ata_peak_err = err_peakl
result.apeak_loc_index = peakLoca
# result.bpeak_coeffs =
result.bpeak_residuals = fitResr
result.bta_peak = ymin
result.bta_peak_err = err_peakr
result.bpeak_loc_index = peakLocb
result.baseline_coeffs = base_coeffs
result.baseline_residuals = base_res
result.baseline_rms = float(base_rms)
result.baseline_window_indices = baseline_indices
result.y_corrected = y_corrected
result.y_corrected_spline = y_corr_spline
result.clean_rms = clean.rms
# result.flag = flag
result.message = msg
a_idx = AbeamScan #np.where((x > -factored_fnbw) & (x < 0.0))[0]
b_idx = BbeamScan #np.where((x > 0.0) & (x < factored_fnbw))[0]
a_window = (float(x[a_idx[0]]), float(x[a_idx[-1]]))
b_window = (float(x[b_idx[0]]), float(x[b_idx[-1]]))
qc = compute_dual_beam_qc(
x=x,
y_corrected=y_corrected,
baseline_sigma=float(base_rms),
a_idx=a_idx,
b_idx=b_idx,
a_peak=float(result.ata_peak),
a_peak_x=float(result.apeak_loc_index),
a_resid=np.asarray(result.apeak_residuals) if result.apeak_residuals is not None else np.array([]),
b_peak=float(result.bta_peak),
b_peak_x=float(result.bpeak_loc_index),
b_resid=np.asarray(result.bpeak_residuals) if result.bpeak_residuals is not None else np.array([]),
a_window=a_window,
b_window=b_window,
cfg=cfg,
)
scan_qc = dual_beam_qc_to_scan_quality(qc)
result.qc=scan_qc
# print(qc)#;sys.exit()
if qc.is_bad:
result.flag = qc.flag
result.message = "bad scan. " + " | ".join(qc.reasons)
if qc.flag==60:
c = "fuchsia"
elif qc.flag==61:
c = "r"
log.error(msg)
plot_fail(x, y,paths,result.message , log, save, flag=qc.flag, fmt=c)
# result.flag = qc.flag
# result.message = msg
return result
else:
result.flag = 0
result.message = "ok"
# # print(save);sys.exit()
# plot_dual_beam_final(
# x=x,
# y=y,
# y_corr=y_corrected,
# a_idx=a_idx,
# b_idx=b_idx,
# a_model_x=x[leftMainBeamLocs],
# a_model_y=ypeakl,
# b_model_x=x[rightMainBeamLocs],
# b_model_y=ypeakr,
# a_label=f"A T_A [K]: {result.ata_peak:.3f} +- {result.ata_peak_err:.3f}",
# b_label=f"B T_A [K]: {result.bta_peak:.3f} +- {result.bta_peak_err:.3f}",
# qc_flag=result.flag,
# qc_reasons=qc.reasons,
# base_sigma=float(base_rms),
# save=save,
# paths=paths,
# log=log,
# )
# sys.exit()
plot_diagnostics(
[
{"x": clean.x, "y": y_corrected, "lab": "corrected", "fmt": "k"},
{"x": clean.x[leftMainBeamLocs], "y": y_corrected[leftMainBeamLocs], "lab": "peak left", "fmt": ""},
{"x": clean.x[leftMainBeamLocs], "y": ypeakl, "lab": "peak left", "fmt": ""},
{"x": clean.x[rightMainBeamLocs], "y": y_corrected[rightMainBeamLocs], "lab": "peak right", "fmt": ""},
{"x": clean.x[rightMainBeamLocs], "y": ypeakr, "lab": "peak right", "fmt": ""},
{"x": clean.x[leftMainBeamLocs], "y": ypeakl, "lab": f"T$_A$ [K]: {ymax:.3f} +- {err_peakl:.3f}", "fmt": ""},
{"x": clean.x[rightMainBeamLocs], "y": ypeakr, "lab": f"T$_A$ [K]: {ymin:.3f} +- {err_peakr:.3f}", "fmt": ""},
# "\npeak left: {:.3f} +- {:.3f} K\npeak right: {:.3f} +- {:.3f} K\n".format(
# ymax, err_peakl, ymin, err_peakr)
],
paths,
log,
save,
suffix="_corrected_final.png",
)
plot_diagnostics(
[
{"x": clean.x, "y": y_corrected, "lab": "corrected", "fmt": "k"},
# {"x": clean.x[leftMainBeamLocs], "y": y_corrected[leftMainBeamLocs], "lab": "", "fmt": ""},
{"x": clean.x[leftMainBeamLocs], "y": ypeakl, "lab": "peak left", "fmt": ""},
# {"x": clean.x[rightMainBeamLocs], "y": y_corrected[rightMainBeamLocs], "lab": "", "fmt": ""},
{"x": clean.x[rightMainBeamLocs], "y": ypeakr, "lab": "peak right", "fmt": ""},
{"x": clean.x[leftMainBeamLocs], "y": ypeakl, "lab": f"T$_A$ [K]: {ymax:.3f} +- {err_peakl:.3f}", "fmt": ""},
{"x": clean.x[rightMainBeamLocs], "y": ypeakr, "lab": f"T$_A$ [K]: {ymin:.3f} +- {err_peakr:.3f}", "fmt": ""},
],
paths,
log,
save,
plot_type="final",
suffix="_corrected_final.png",
)
# sys.exit()
# result.qc=qc
return result
else:
log.debug("left: {:.3f}, min: {:.3f}, right: {:.3f}".format(
ypeakl[0], ymin, ypeakl[-1]))
log.debug("left: {:.3f}, max: {:.3f}, right{:.3f}".format(
ypeakr[0], ymax, ypeakr[-1]))
ypeakrdata = clean.x[rightMainBeamLocs]
ypeakldata = clean.x[leftMainBeamLocs]
# check data doesn't overlap
overlapRight = set(baseLocsRight) & set(rightMainBeamLocs)
overlapLeft = set(baseLocsLeft) & set(leftMainBeamLocs)
# overlapbeams = set(leftMainBeamLocs) & set(rightMainBeamLocs)
msg=("checking for overlapping beams: ")
log.debug(msg)
if len(overlapLeft) != 0:
msg = "beams don't overlap on left"
log.debug(msg)
if leftMainBeamLocs[0] > baseLocsLeft[int(
len(baseLocsLeft)*.8)]:
pass
else:
overlap = next(iter(overlapLeft))
shift = list(leftMainBeamLocs).index(int(overlap))
msg = "Overlap found on A beam"
flag = 20
log.warning(msg)
# move beam to the left
f = abs(len(leftMainBeamLocs)-shift)
leftMainBeamLocs = abs(leftMainBeamLocs+f)
# fit left peak
ypeakl = np.polyval(np.polyfit(
clean.x[leftMainBeamLocs], y_corrected[leftMainBeamLocs], 2), clean.x[leftMainBeamLocs])
fitResl, err_peakl = calc_residual(
y_corrected[leftMainBeamLocs], ypeakl)
ymax = max(ypeakl)
msg="left: {:.3f}, max: {:.3f}, right: {:.3f}".format(
ypeakl[0], ymax, ypeakl[-1])
log.debug(msg)
if(ypeakl[0] <= ymax or ypeakl[-1] <= ymax):
ymax = np.nan
err_peakl = np.nan
else:
flag = 22
msg = "fit entire left beam"
log.debug(msg)
ypeakldata = clean.x[leftMainBeamLocs]
log.error(msg)
plot_fail(x, y,paths,msg, log, save, flag=flag, fmt="orange")
result.flag = flag
result.message = msg
return result
if len(overlapRight) != 0:
overlap = next(iter(overlapRight))
shift = list(rightMainBeamLocs).index(int(overlap))
msg = "Overlap found on B beam"
flag = 19
log.warning( msg)
# move beam to the RIGHT
f = abs(len(rightMainBeamLocs)-shift)
rightMainBeamLocs = abs(rightMainBeamLocs-f)
msg="beam shifted to left by {} points".format(f)
log.debug(msg)
# fit right peak
ypeakr = np.polyval(np.polyfit(
clean.x[rightMainBeamLocs], y_corrected[rightMainBeamLocs], 2), clean.x[rightMainBeamLocs])
fitResr, err_peakr = calc_residual(
y_corrected[rightMainBeamLocs], ypeakr)
ymin = min(ypeakr)
msg="left: {:.3f}, min: {:.3f}, right{:.3f}".format(
ypeakr[0], ymin, ypeakr[-1])
log.debug(msg)
if(ypeakr[0] <= ymin or ypeakr[-1] <= ymin):
ymin = np.nan
err_peakr = np.nan
else:
flag = 18
msg = "fit entire right beam"
log.debug(msg)
# ypeakrdata = clean.x[rightMainBeamLocs]
# return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
# "driftCoeffs":[], "baseLocsCombined":[],
# "baseLocsLeft":[],"baseLocsRight":[],
# "leftPeakData":[],"leftPeakModelData":[],
# "leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
# "rightPeakData":[],"rightPeakModelData":[],
# "rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
# "msg":"","midXValueLeft":[],"midXValueRight":[],
# "flag":flag
# }
log.error(msg)
plot_fail(x, y,paths,msg, log, save, flag=flag, fmt="orange")
result.flag = flag
result.message = msg
return result
if ((x[leftMainBeamLocs])[-1] > 0):
flag = 17
msg = "left beam data goes beyond midpoint"
log.warning(msg)
# return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
# "driftCoeffs":[], "baseLocsCombined":[],
# "baseLocsLeft":[],"baseLocsRight":[],
# "leftPeakData":[],"leftPeakModelData":[],
# "leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
# "rightPeakData":[],"rightPeakModelData":[],
# "rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
# "msg":"","midXValueLeft":[],"midXValueRight":[],
# "flag":flag
# }
# log.error(msg)
plot_fail(x, y,paths,msg, log, save, flag=flag, fmt="orange")
result.flag = flag
result.message = msg
return result
if ((x[rightMainBeamLocs])[-1] < 0):
flag = 16
msg = "right beam data goes beyond midpoint"
log.warning(msg)
# return [], [], [], np.nan, [], \
# [], [], np.nan, np.nan, [], \
# [], [], np.nan, np.nan, [],\
# msg, flag, np.nan, np.nan
# return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
# "driftCoeffs":[], "baseLocsCombined":[],
# "baseLocsLeft":[],"baseLocsRight":[],
# "leftPeakData":[],"leftPeakModelData":[],
# "leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
# "rightPeakData":[],"rightPeakModelData":[],
# "rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
# "msg":"","midXValueLeft":[],"midXValueRight":[],
# "flag":flag
# }
log.error(msg)
plot_fail(x, y,paths,msg, log, save, flag=flag, fmt="orange")
result.flag = flag
result.message = msg
return result
log.info("\n")
log.info("-"*30)
log.info("Fit the peaks.")
log.info("-"*30)
msg="\npeak left: {:.3f} +- {:.3f} K\npeak right: {:.3f} +- {:.3f} K\n".format(
ymin, err_peakl, ymax, err_peakr)
log.info(msg)
# find final peak loc
ploca = np.where(ypeakl == ymin)[0]
if len(ploca)==0:
peakLoca=np.nan
else:
peakLoca = (x[leftMainBeamLocs])[ploca[0]]
# find final peak loc
plocb = np.where(ypeakr == ymax)[0]
if len(plocb)==0:
peakLocb=np.nan
else:
peakLocb = (x[rightMainBeamLocs])[plocb[0]]
log.debug('fit passed')
# plot_fail(x, y,paths,msg, log, save, flag=30, fmt="orange")
# result.apeak_coeffs =
flag=0
result.apeak_residuals = fitResl
result.ata_peak = ymax
result.ata_peak_err = err_peakl
result.apeak_loc_index = peakLoca
# result.bpeak_coeffs =
result.bpeak_residuals = fitResr
result.bta_peak = ymin
result.bta_peak_err = err_peakr
result.bpeak_loc_index = peakLocb
result.baseline_coeffs = base_coeffs
result.baseline_residuals = base_res
result.baseline_rms = float(base_rms)
result.baseline_window_indices = baseline_indices
result.y_corrected = y_corrected
result.y_corrected_spline = y_corr_spline
result.clean_rms = clean.rms
result.flag = flag
result.message = msg
a_idx = AbeamScan #np.where((x > -factored_fnbw) & (x < 0.0))[0]
b_idx = BbeamScan #np.where((x > 0.0) & (x < factored_fnbw))[0]
a_window = (float(x[a_idx[0]]), float(x[a_idx[-1]]))
b_window = (float(x[b_idx[0]]), float(x[b_idx[-1]]))
qc = compute_dual_beam_qc(
x=x,
y_corrected=y_corrected,
baseline_sigma=float(base_rms),
a_idx=a_idx,
b_idx=b_idx,
a_peak=float(result.ata_peak),
a_peak_x=float(result.apeak_loc_index),
a_resid=np.asarray(result.apeak_residuals) if result.apeak_residuals is not None else np.array([]),
b_peak=float(result.bta_peak),
b_peak_x=float(result.bpeak_loc_index),
b_resid=np.asarray(result.bpeak_residuals) if result.bpeak_residuals is not None else np.array([]),
a_window=a_window,
b_window=b_window,
cfg=cfg,
)
scan_qc = dual_beam_qc_to_scan_quality(qc)
result.qc=scan_qc
# print(qc);sys.exit()
if qc.is_bad:
result.flag = qc.flag
result.message = "bad scan. " + " | ".join(qc.reasons)
if qc.flag==60:
c = "fuchsia"
elif qc.flag==61:
c = "r"
# msg = "Failed QC"
# print(qc.reasons)
# sys.exit()
# msg_wrapper("warning", log.warning, msg)
# msg = "failed to locate base locs"
log.error(msg)
plot_fail(x, y,paths,result.message , log, save, flag=qc.flag, fmt=c)
# result.flag = flag
# result.message = msg
return result
else:
result.flag = 0
result.message = "ok"
plot_diagnostics(
[
{"x": clean.x, "y": y_corrected, "lab": "corrected", "fmt": "k"},
{"x": clean.x[leftMainBeamLocs], "y": y_corrected[leftMainBeamLocs], "lab": "peak left", "fmt": ""},
{"x": clean.x[leftMainBeamLocs], "y": ypeakl, "lab": "peak left", "fmt": ""},
{"x": clean.x[rightMainBeamLocs], "y": y_corrected[rightMainBeamLocs], "lab": "peak right", "fmt": ""},
{"x": clean.x[rightMainBeamLocs], "y": ypeakr, "lab": "peak right", "fmt": ""},
{"x": clean.x[leftMainBeamLocs], "y": ypeakl, "lab": f"T$_A$ [K]: {ymax:.3f} +- {err_peakl:.3f}", "fmt": ""},
{"x": clean.x[rightMainBeamLocs], "y": ypeakr, "lab": f"T$_A$ [K]: {ymin:.3f} +- {err_peakr:.3f}", "fmt": ""},
# "\npeak left: {:.3f} +- {:.3f} K\npeak right: {:.3f} +- {:.3f} K\n".format(
# ymax, err_peakl, ymin, err_peakr)
],
paths,
log,
save,
suffix="_corrected_final.png",
)
plot_diagnostics(
[
{"x": clean.x, "y": y_corrected, "lab": "corrected", "fmt": "k"},
# {"x": clean.x[leftMainBeamLocs], "y": y_corrected[leftMainBeamLocs], "lab": "", "fmt": ""},
{"x": clean.x[leftMainBeamLocs], "y": ypeakl, "lab": "peak left", "fmt": ""},
# {"x": clean.x[rightMainBeamLocs], "y": y_corrected[rightMainBeamLocs], "lab": "", "fmt": ""},
{"x": clean.x[rightMainBeamLocs], "y": ypeakr, "lab": "peak right", "fmt": ""},
{"x": clean.x[leftMainBeamLocs], "y": ypeakl, "lab": f"T$_A$ [K]: {ymax:.3f} +- {err_peakl:.3f}", "fmt": ""},
{"x": clean.x[rightMainBeamLocs], "y": ypeakr, "lab": f"T$_A$ [K]: {ymin:.3f} +- {err_peakr:.3f}", "fmt": ""},
],
paths,
log,
save,
plot_type="final",
suffix="_corrected_final.png",
)
return result