Source code for dran.fits.populate

# =========================================================================== #
# File: populate.py                                                           #
# Author: Pfesesani V. van Zyl                                                #
# Email: pfesi24@gmail.com                                                    #
# =========================================================================== #


# Library imports
# --------------------------------------------------------------------------- #
import logging
import sys
from typing import Any, List, MutableMapping
import numpy as np
from dran.fits.conversion import counts_to_kelvin
# =========================================================================== #


def _as_1d(value: Any) -> np.ndarray:
    """Convert input to a one-dimensional NumPy array.
    Flattens multi-dimensional inputs and wraps scalar values into a 
    single-element array.
    """
    
    arr = np.asarray(value)
    if arr.ndim > 1:
        return arr.ravel()
    if arr.ndim == 0:
        return arr.reshape(1)
    return arr

def _get_col(table: Any, name: str, log: logging.Logger) -> np.ndarray:
    """Retrieve a column from a table as a one-dimensional NumPy array.
    Logs the column access and raises a clear KeyError if the column is 
    missing or inaccessible.
    """
    
    log.debug(f"getting column: {name}") # from {table}")
    try:
        return _as_1d(table[name])
    except Exception as exc:
        log.error(f"Missing column: {name}, -> {exc}")
        return np.array([0])

def _is_invalid_scalar(value: Any) -> bool:
    if value is None:
        return True
    try:
        arr = np.asarray(value)
    except Exception:
        return False
    if arr.shape == ():
        try:
            return not np.isfinite(float(arr))
        except Exception:
            return False
    return False

def _store_on_obs(obs: Any, key: str, value: Any) -> None:
    """Store a value on an observation object.
    Sets the value using an uppercase key for mappings, or as an attribute 
    for object-based observations.
    """
    if isinstance(obs, MutableMapping):
        obs[key.upper()] = value
        return
    setattr(obs, key, value)

[docs] def populate_scan_arrays( obs: Any, scans_table: Any, hdu_index: int, header_name: str, column_names: List, log: logging.Logger ) -> None: """Populate observation fields from a scans table. Extracts metadata and scan data for the given HDU, computes derived quantities for ZC headers, converts LCP and RCP counts to Kelvin, and stores results on the observation object. """ if scans_table is None: log.warning(f"No scans table in HDU {hdu_index} ({header_name})") return if not column_names: log.warning("No column names found for HDU %s (%s).", hdu_index, header_name) column_names = [] if 'ZC' in header_name: n = None for col in column_names: if 'Count' in col: pass else: data = _get_col(scans_table, col, log) n = len(data) mid = n // 2 value = data[mid] # Avoid clobbering valid header values with None/NaN from scan tables. if _is_invalid_scalar(value) and col in obs and obs[col] is not None: continue _store_on_obs(obs, col, value) log.debug(f"Added {col} = {value}, from HDU {hdu_index} ({header_name}).") centfreq = obs.get("CENTFREQ") try: centfreq_val = float(np.asarray(centfreq).item()) except Exception: log.warning("CENTFREQ missing/invalid; skipping LOGFREQ") else: obs["LOGFREQ"] = float(np.log10(centfreq_val)) obs['ZA']=90.0 - obs['ELEVATION'] # Offset array requires SCANDIST; guard if missing if n is None: try: n = len(_get_col(scans_table, "Count1", log)) except Exception: n = 0 try: scan_dist = _get_col(obs, "SCANDIST", log) except Exception: scan_dist = float(n) try: scan_dist = float(np.asarray(scan_dist).item()) # works for scalar/0-d arrays except Exception: scan_dist = float(n) # fallback: use number of samples offset = np.linspace(-scan_dist / 2.0, scan_dist / 2.0, n) _store_on_obs(obs,'OFFSET',offset) else: pass # get obs lcp_counts = _get_col(scans_table,"Count1",log) rcp_counts = _get_col(scans_table,"Count2",log) lcp_zeros=False rcp_zeros=False if not np.any(lcp_counts): lcp_zeros=True if not np.any(rcp_counts): rcp_zeros=True if lcp_zeros: _store_on_obs(obs, f"{header_name}_lcpdata",lcp_counts) else: try: conv_factor_lcp= _get_col(obs, "HZPERK1",log) lcp = counts_to_kelvin(lcp_counts, conv_factor_lcp) _store_on_obs(obs, f"{header_name}_lcpdata",lcp) except: _store_on_obs(obs, f"{header_name}_lcpdata",lcp_counts) if rcp_zeros: _store_on_obs(obs, f"{header_name}_rcpdata",rcp_counts) else: try: conv_factor_rcp= _get_col(obs, "HZPERK2",log) rcp = counts_to_kelvin(rcp_counts,conv_factor_rcp) _store_on_obs(obs, f"{header_name}_rcpdata",rcp) except: _store_on_obs(obs, f"{header_name}_rcpdata",rcp_counts)