# =========================================================================== #
# File: processing_fit.py #
# Author: Pfesesani V. van Zyl #
# Email: pfesi24@gmail.com #
# =========================================================================== #
# Library imports
# --------------------------------------------------------------------------- #
from typing import Any, Dict, List, Mapping, Sequence
import os
import argparse
import logging
import json
from dataclasses import asdict, is_dataclass
import math
# from dran.config.constants import DIAGNOSTICS_DIRNAME
# from dran.utils.fs import clear_diagnostics_dir
from dran.utils.config import ProjectPaths
from dran.fitting.pipeline import fit_scan, fit_scan_db
from dran.fitting.models import sig_to_noise
from dran.calibration.calibrate import calibrate_pointing_corrected_ta
# =========================================================================== #
[docs]
def to_jsonable(value: Any) -> Any:
"""Convert a value into a JSON-serializable form.
Handles dataclasses, objects with to_dict, scalars, collections,
mappings, and plain objects, replacing non-finite floats with null
equivalents.
"""
if value is None:
return None
if is_dataclass(value):
return asdict(value)
to_dict = getattr(value, "to_dict", None)
if callable(to_dict):
return to_dict()
if isinstance(value, (str, int, bool)):
return value
if isinstance(value, float):
return value if math.isfinite(value) else None
if isinstance(value, (list, tuple)):
return [to_jsonable(v) for v in value]
if isinstance(value, dict):
return {str(k): to_jsonable(v) for k, v in value.items()}
# Plain Python objects, like ScanQualityResult
try:
return {k: to_jsonable(v) for k, v in vars(value).items()}
except TypeError:
return str(value)
[docs]
def dumps_json(value: Any) -> str:
"""JSON-dump with a safe default conversion."""
return json.dumps(to_jsonable(value), ensure_ascii=False)
def _fit_one_scan(
row: Mapping[str, Any],
y: Any,
band: str,
out_path: str,
paths:ProjectPaths,
log: logging.Logger,
) -> Any:
"""Fit a single scan using the band-appropriate fitting routine.
Derives scan geometry from OFFSET, FNBW, and HPBW, dispatches to
single-beam or dual-beam fitting based on band, and raises an error
for unsupported band values.
"""
try:
x = row["OFFSET"]
fnbw = row["FNBW"] / 2.0
hpbw = row["HPBW"] / 2.0
except KeyError as exc:
raise KeyError(f"Missing required key for fitting: {exc}") from exc
if band in {"L", "S", "CM", "KU", "K"}:
return fit_scan(x, y, band, fnbw, hpbw, False, log, out_path, "", "",paths)
if band == "C":
return fit_scan_db(x, y, band, fnbw, hpbw, False, log, out_path, "", "", 0.55,paths)
if band == "X":
return fit_scan_db(x, y, band, fnbw, hpbw, False, log, out_path, "", "", 0.60,paths)
raise ValueError(f"Invalid band type: {band}")
def _populate_fit_fields(
row: Dict[str, Any],
scan: Any,
pol_key: str,
band: str,
log: logging.Logger,
args:argparse.Namespace
) -> None:
"""
Write fit results into row fields that are currently None.
"""
try:
del row["ALT1"]
except:
pass
try:
del row["ALT2"]
except:
pass
try:
del row['UISER_LONG']
except:
pass
for field, current in list(row.items()):
if current is not None:
continue
# Dual beam fields for C/X.
if band in {"C", "X"}:
if field == f"{pol_key}RMS":
row[field] = scan.clean_rms
elif field == f"{pol_key}BSLOPE":
try:
row[field] = scan.baseline_coeffs[0]
except (IndexError, TypeError, AttributeError):
row[field] = None
elif field == f"{pol_key}BRMS":
row[field] = scan.baseline_rms
elif field == f"{pol_key}FLAG":
row[field] = scan.flag
elif field == f"A{pol_key}TA":
row[field] = scan.ata_peak
elif field == f"A{pol_key}TAERR":
row[field] = scan.ata_peak_err
elif field == f"A{pol_key}TAPEAKLOC":
row[field] = scan.apeak_loc_index
elif field == f"A{pol_key}S2N":
row[field] = sig_to_noise(scan.ata_peak, scan.baseline_residuals, log)
elif field == f'A{pol_key}QC':
try:
ok = scan.qc.is_bad
flag = scan.qc.flag
msg = scan.qc.reasons
except (AttributeError, TypeError):
ok = False
flag = scan.flag
msg = scan.message
qc={'is_bad':ok, 'flag':flag, 'message':msg}
row[field] = json.dumps(to_jsonable(qc), ensure_ascii=False)
elif field == f"B{pol_key}TA":
row[field] = scan.bta_peak
elif field == f"B{pol_key}TAERR":
row[field] = scan.bta_peak_err
elif field == f"B{pol_key}TAPEAKLOC":
row[field] = scan.bpeak_loc_index
elif field == f"B{pol_key}S2N":
row[field] = sig_to_noise(scan.bta_peak, scan.baseline_residuals, log)
elif field == f'B{pol_key}QC':
try:
ok = scan.qc.is_bad
flag = scan.qc.flag
msg = scan.qc.reasons
except (AttributeError, TypeError):
ok = False
flag = scan.flag
msg = scan.message
qc={'is_bad':ok, 'flag':flag, 'message':msg}
row[field] = json.dumps(to_jsonable(qc), ensure_ascii=False)
# Single beam fields.
if field == f"{pol_key}RMS":
row[field] = scan.clean_rms
elif field == f"{pol_key}BSLOPE":
try:
row[field] = scan.baseline_coeffs[0]
except (IndexError, TypeError, AttributeError):
row[field] = None
elif field == f"{pol_key}BRMS":
row[field] = scan.baseline_rms
elif field == f"{pol_key}FLAG":
row[field] = scan.flag
elif field == f"{pol_key}TA":
row[field] = scan.ta_peak
elif field == f"{pol_key}TAERR":
row[field] = scan.ta_peak_err
elif field == f"{pol_key}TAPEAKLOC":
row[field] = scan.peak_loc_index
elif field == f"{pol_key}S2N":
row[field] = sig_to_noise(scan.ta_peak, scan.baseline_residual, log)
elif field == f'{pol_key}QC':
qc={'ok':scan.qc.ok, 'flag':scan.qc.flag, 'message':scan.qc.message, 'metrics':scan.qc.metrics}
row[field] = json.dumps(to_jsonable(qc), ensure_ascii=False)
# print('made it')
def _populate_pointing_single_beam(row: Dict[str, Any], log: logging.Logger) -> None:
"""
Single beam pointing uses S, N, O triplet per pol.
Writes PC and corrected ON TA for the ON position only.
"""
# print('pointing')
for pol in ["L", "R"]:
tas: List[float] = []
for pos in ["S", "N", "O"]:
tas.append(row.get(f"{pos}{pol}TA"))
tas.append(row.get(f"{pos}{pol}TAERR"))
# print(f"{pos}{pol}TA", row.get(f"{pos}{pol}TA"))
# print(f"{pos}{pol}TAERR", row.get(f"{pos}{pol}TAERR"))
pc = calibrate_pointing_corrected_ta(
tas[0], tas[1],
tas[2], tas[3],
tas[4], tas[5],
log,
row,
)
# print('sigh')
# print('pc: ',pc)
on_pos = "O"
row[f"{on_pos}{pol}PC"] = pc.pc
row[f"C{on_pos}{pol}TA"] = pc.ta_corr
row[f"C{on_pos}{pol}TAERR"] = pc.ta_corr_err
# print('wows')
def _populate_pointing_dual_beam(row: Dict[str, Any], log: logging.Logger) -> None:
"""
Dual beam pointing uses b in {A,B} and S, N, O triplet per pol.
Writes PC and corrected ON TA for the ON position only.
"""
for beam in ["A", "B"]:
for pol in ["L", "R"]:
tas: List[float] = []
for pos in ["S", "N", "O"]:
tas.append(row.get(f"{beam}{pos}{pol}TA"))
tas.append(row.get(f"{beam}{pos}{pol}TAERR"))
pc = calibrate_pointing_corrected_ta(
tas[0], tas[1],
tas[2], tas[3],
tas[4], tas[5],
log,
row,
)
on_pos = "O"
row[f"{beam}{on_pos}{pol}PC"] = pc.pc
row[f"{beam}C{on_pos}{pol}TA"] = pc.ta_corr
row[f"{beam}C{on_pos}{pol}TAERR"] = pc.ta_corr_err
def _tag_from_data_key(key: str) -> str:
"""Derive a scan tag from a data key string.
Maps ZC, HPNZ, and HPSZ data keys to on-source, north, or south scan
tags and raises an error for unsupported formats.
"""
if "ZC" in key:
return f"O{key.split('_')[1]}"
if "HPNZ" in key:
return f"N{key.split('_')[1]}"
if "HPSZ" in key:
return f"S{key.split('_')[1]}"
raise ValueError(f"Unrecognized DATA key format: {key}")
def _plot_base_path(
row: Mapping[str, Any],
src: str,
fname: str,
paths: ProjectPaths) -> str:
"""Build and ensure the base path for scan plot output.
Creates the source and frequency-specific plot directory and returns the
full file path for the plot.
"""
centfreq_mhz = int(row["CENTFREQ"])
plot_path_dir = paths.plots_dir / src / str(centfreq_mhz)
os.makedirs(plot_path_dir, exist_ok=True)
return str(plot_path_dir/ fname)
[docs]
def populate_row(
file_data: Sequence[Dict[str, Any]],
band: str,
paths:ProjectPaths,
log: logging.Logger,
args:argparse.Namespace,
) -> Dict[str, Any]:
"""
Run fitting per DATA column and populate derived fields back
into the row dict. Returns the updated row (last row in
file_data, consistent with existing behavior).
"""
print(band)
band = band.upper()
for row in file_data:
# if row.get("SCAN_ERROR") is not None:
# # Skip fitting if scan extraction failed; keep header-level fields only.
# log.warning("Skipping fitting due to SCAN_ERROR: %s", row["SCAN_ERROR"])
# continue
# accommodate the new QC feature
if band=="C" or band == "X":
for b in ["A","B"]:
for s in ["N","S","O"]:
for p in ["L","R"]:
row[f'{b}{s}{p}QC']=None
else:
if band == "L" or band=="S":
for p in ["L","R"]:
row[f"O{p}QC"]=None
else:
for s in ["N","S","O"]:
for p in ["L","R"]:
row[f'{s}{p}QC']=None
# process data
data_keys = [k for k in row.keys() if "DATA" in k]
for key in data_keys:
# print(key)
value = row[key]
tag = _tag_from_data_key(key)
pol_key = tag[:2]
fname_stub = f"{row['FILENAME'][:18]}_{pol_key}"
src = str(row.get("OBJECT") or "UNKNOWN").replace(" ", "")
out_path = _plot_base_path(row, src, fname_stub, paths)
# print("here**")
# if "CIRX" in row.get("OBJECT") :
# print("---> ",row)
# try:
# print(row["ANLQC"])
# except:
# row["ANLQC"]=None
# print("ANLQC : ",row["ANLQC"])
scan = _fit_one_scan(row, value, band, out_path, paths, log)
_populate_fit_fields(row=row, scan=scan, pol_key=pol_key, band=band, log=log,args=args)
del scan
if band in {"CM", "KU", "K"}:
_populate_pointing_single_beam(row, log)
if band in {"X", "C"}:
_populate_pointing_dual_beam(row, log)
# print('wow')
return row