# =========================================================================== #
# File: frequency_utils.py #
# Author: Pfesesani V. van Zyl #
# Email: pfesi24@gmail.com #
# =|========================================================================= #
# Library imports
# --------------------------------------------------------------------------- #
import logging
import sys
from pathlib import Path
from typing import Optional
from dran.utils.config import ObservationPathError
from dran.config.constants import (
FREQUENCY_BANDS_MHZ, BAND_ALIASES, FREQ_ALIASES,
_WAVELENGTH_BEAM_RE,_WAVELENGTH_ONLY_RE)
# =========================================================================== #
def _normalize_band(band: str) -> str:
"""Normalize band identifiers to canonical uppercase (e.g., Ku -> KU)."""
if band in BAND_ALIASES:
return BAND_ALIASES[band]
return band.strip().upper()
[docs]
def get_band_from_frequency(frequency: float | int, log: logging.Logger) -> str:
"""
Determine the satellite frequency band for a given frequency (in MHz).
Args:
frequency (float | int): Frequency in megahertz (MHz).
Returns:
str: The corresponding band identifier
(e.g., 'L', 'S', 'C', 'CM', 'X', 'Ku', 'K', 'Ka').
Raises:
ValueError: If the input frequency is invalid or outside known bands.
"""
log.debug('Getting frequency band from frequency in MHz')
# print('fr: ',frequency)
try:
frequency=int(FREQ_ALIASES[frequency])
except:
pass
# print(FREQ_ALIASES[frequency])
# sys.exit()
# some files need a helper
# if frequency==13:
# frequency=2270
# if frequency==35:
# frequency=8280
# if frequency==6:
# frequency=4800
# Validate input type
if not isinstance(frequency, (int, float)):
raise TypeError(f"Frequency must be a numeric value, got {type(frequency).__name__}.")
# Determine the corresponding frequency band
for band_name, freq_range in FREQUENCY_BANDS_MHZ.items():
if freq_range[0] <= frequency <= freq_range[1]:
# Return canonical band keys (uppercase).
return _normalize_band(band_name)
# Frequency not in any known band
valid_ranges = [
f"{band}({band_info[0]}–{band_info[1]} MHz)"
for band, band_info in FREQUENCY_BANDS_MHZ.items()
]
raise ValueError(
f"Frequency {frequency} MHz does not fall into any known band. "
f"Valid ranges: {', '.join(valid_ranges)}."
)
[docs]
def get_frequency_range_from_band(band: str, log: logging.Logger) -> tuple[int, int]:
"""
Return the frequency range in MHz for a given band identifier.
Args:
band: Band identifier such as 'L', 'S', 'C', 'CM', 'X', 'Ku', 'K', 'Ka'.
log: Logger instance.
Returns:
Tuple of start and end frequency in MHz.
Raises:
TypeError: Invalid band type.
ValueError: Unknown band.
"""
log.debug("Getting frequency range from band")
if not isinstance(band, str):
raise TypeError(
f"Band must be a string, got {type(band).__name__}."
)
# Accept legacy/user inputs but normalize to canonical keys.
band_normalised = _normalize_band(band)
if band_normalised not in FREQUENCY_BANDS_MHZ:
valid_bands = ", ".join(FREQUENCY_BANDS_MHZ.keys())
raise ValueError(
f"Unknown band '{band}'. Valid bands: {valid_bands}."
)
return FREQUENCY_BANDS_MHZ[band_normalised]
def _normalize_wavelength_key(value: float) -> str:
text = f"{value:.10f}".rstrip("0").rstrip(".")
return text
def _resolve_band_to_frequency_mhz(band: str, source_freq_folder: str, p: Path) -> tuple[int, Optional[float], Optional[str]]:
"""
Returns: (frequency_mhz, wavelength_cm, beam)
"""
# Case A: wavelength + beam like 13NB, 3.5WB, etc.
# print(band)
m_beam = _WAVELENGTH_BEAM_RE.match(band)
# print('beam: ',m_beam)
if m_beam:
wavelength_cm = float(m_beam.group("wavelength"))
beam = m_beam.group("beam").upper()
key = _normalize_wavelength_key(wavelength_cm)
# print('>>> key: ',key)
mhz_str = FREQ_ALIASES.get(key)
if mhz_str is None:
raise ObservationPathError(
f"Unknown wavelength '{key} cm' in folder '{source_freq_folder}'. "
f"Add it to FREQ_ALIASES: {p}"
)
return int(mhz_str), wavelength_cm, beam
# Case B: wavelength only like 35, 13, 3.5
m_wave = _WAVELENGTH_ONLY_RE.match(band)
# print('m_wave: ',m_wave,m_wave.group("wavelength"))
if m_wave:
wavelength_cm = float(m_wave.group("wavelength"))
key = _normalize_wavelength_key(wavelength_cm)
mhz_str = FREQ_ALIASES.get(key)
# print(mhz_str)
if mhz_str is not None:
return int(mhz_str), wavelength_cm, None
# Case C: treat as frequency folder (must be digits)
if band.isdigit():
return int(band), None, None
raise ObservationPathError(
f"Unrecognized band token '{band}' in '{source_freq_folder}': {p}"
)
def _read_frequency_from_fits_header(path: Path) -> tuple[str, str, str]:
"""
Read observing frequency from the FITS header and convert it to the
frequency, wavelength, and beam fields expected by DRAN.
Returns:
tuple[str, str, str]:
frequency_mhz, wavelength_cm, beam
"""
try:
with fits.open(path) as hdul:
header = hdul[2].header
# Try the most likely header keys first.
freq_mhz = (
header.get("CENTFREQ")
or header.get("FREQ")
or header.get("RESTFREQ")
# or header.get("CRVAL3")
)
if freq_mhz is None:
raise ObservationPathError(
f"Could not determine frequency from FITS header: {path}"
)
# Convert to MHz if the value looks like Hz.
freq_mhz = float(freq_mhz)
wavelength_cm = header.get("CENTFREQ")[:-1]
# You can refine beam logic if your project has exact rules.
if freq_mhz<4000:
beam = "SB"
elif freq_mhz>=4000 and freq_mhz<=8000:
beam="DB"
else:
beam="NB"
return (
freq_mhz,
wavelength_cm,
beam
)
except Exception as exc:
raise ObservationPathError(
f"Failed to read FITS header frequency from {path}: {exc}"
) from exc