Source code for dran.fits.init_fits_processing

# =========================================================================== #
# File: init_fits_processing.py                                               #
# Author: Pfesesani V. van Zyl                                                #
# Email: pfesi24@gmail.com                                                    #
# =========================================================================== #


# Library imports
# --------------------------------------------------------------------------- #
import logging
import sys
import os
import argparse
import numpy as np
from pathlib import Path
from typing import Any, Dict, List
from dran.storage.sqlite_connection import get_connection
from dran.fits.observation_extractor import extract_observation
from dran.utils.fs import (clear_diagnostics_dir, compute_file_hash,
    record_invalid_path_once,ProjectPaths,
    parse_source_frequency_band_from_path_if_folder,
    resolve_existing_path, parse_observation_path,
    _validate_symlink
)
from dran.storage.db_introspection import (
    _ensure_and_insert,
    record_exists, 
    ensure_processed_files_table,
    processed_file_exists_by_path,
    processed_file_hashes_by_size,
    insert_processed_file,
)
from dran.fits.processing_fit import populate_row
from dran.utils.frequency_utils import get_band_from_frequency

# =========================================================================== #


[docs] def process_fits_path( root_path: Path, log: logging.Logger, paths: ProjectPaths, args:argparse.Namespace, ) -> List[Dict[str, Any]]: """ Dispatch processing for a single FITS file or a directory of FITS files. Returns extracted FITS records. Directory results are concatenated. """ resolved = resolve_existing_path(root_path, log, paths) if resolved is None: raise ValueError("root_path is None") if resolved.is_file(): _process_single_file(resolved, paths, log,args) return if resolved.is_dir(): _process_directory(resolved, log,paths, args) return raise ValueError(f"Invalid path type: {resolved}")
def _should_skip_by_registry( fits_path: Path, paths: ProjectPaths, log: logging.Logger, ) -> tuple[bool, str | None, int, float]: """ Hybrid de-duplication: - Skip if FILEPATH is already registered. - If file_size matches existing entries, compute hash and skip if hash exists. Returns (skip, file_hash_or_none, file_size, file_mtime). """ stat = fits_path.stat() file_size = stat.st_size file_mtime = stat.st_mtime conn = get_connection(paths.db_path, log) try: ensure_processed_files_table(conn) if processed_file_exists_by_path(conn, str(fits_path)): return True, None, file_size, file_mtime known_hashes = set(processed_file_hashes_by_size(conn, file_size)) file_hash: str | None = None if known_hashes: file_hash = compute_file_hash(fits_path) if file_hash in known_hashes: return True, file_hash, file_size, file_mtime return False, file_hash, file_size, file_mtime finally: conn.close() def _record_processed_file( fits_path: Path, paths: ProjectPaths, log: logging.Logger, args:argparse.Namespace, *, file_hash: str | None, file_size: int, file_mtime: float, ) -> None: """Persist processed file metadata for path-independent de-duplication.""" if file_hash is None: file_hash = compute_file_hash(fits_path) conn = get_connection(paths.db_path, log) try: insert_processed_file( conn, file_hash=file_hash, file_size=file_size, file_mtime=file_mtime, filepath=str(fits_path), filename=fits_path.name ) finally: conn.close() def _process_single_file( fits_path: Path, paths: ProjectPaths, log: logging.Logger, args: argparse.Namespace) -> List[Dict[str, Any]]: fits_path=Path(fits_path) if not _validate_symlink(fits_path, paths, log): return [] if fits_path.stat().st_size == 0: record_invalid_path_once(fits_path,paths, log, "empty file") return [] p=parse_observation_path(fits_path) # print(p);sys.exit() if p.band_folder==None: record = extract_observation(fits_path, paths, p.band_folder, log) src=record["OBJECT"] band=record["BAND"] freq_mhz=int(record["CENTFREQ"]) table_name = f"{src}_{freq_mhz}".upper() conn = get_connection(paths.db_path, log) already_done = record_exists(conn, table_name, "FILEPATH", str(fits_path)) conn.close() if already_done: log.debug("Skipping already processed file: %s", fits_path) return [] skip, file_hash, file_size, file_mtime = _should_skip_by_registry( fits_path, paths, log ) if skip: log.debug("Skipping duplicate file by registry: %s", fits_path) return [] record = extract_observation(fits_path, paths, band, log) scan = [record] row = populate_row(scan, band, paths, log,args) disallowed_keys: set[str] = {"UISER_LONG", "GAIN1", "GAIN2", "ALTGAIN1","ALTGAIN2","ALTGAIN3"} row = {k: v for k, v in row.items() if k not in disallowed_keys} # print('here') _ensure_and_insert(table_name,row,paths,log) _record_processed_file( fits_path, paths, log, args, file_hash=file_hash, file_size=file_size, file_mtime=file_mtime, ) clear_diagnostics_dir(paths.diagnostics_dir, log) del row del scan else: src=p.source freq_mhz=int(p.frequency) band=get_band_from_frequency(freq_mhz,log) band=band.upper() table_name = f"{src}_{freq_mhz}".upper() conn = get_connection(paths.db_path, log) already_done = record_exists(conn, table_name, "FILEPATH", str(fits_path)) conn.close() if already_done: log.debug("Skipping already processed file: %s", fits_path) return [] skip, file_hash, file_size, file_mtime = _should_skip_by_registry( fits_path, paths, log ) if skip: log.debug("Skipping duplicate file by registry: %s", fits_path) return [] record = extract_observation(fits_path, paths, band, log) scan = [record] row = populate_row(scan, band, paths, log,args) disallowed_keys: set[str] = {"UISER_LONG", "GAIN1", "GAIN2", "ALTGAIN1","ALTGAIN2","ALTGAIN3"} row = {k: v for k, v in row.items() if k not in disallowed_keys} _ensure_and_insert(table_name,row,paths,log) _record_processed_file( fits_path, paths, log, args, file_hash=file_hash, file_size=file_size, file_mtime=file_mtime, ) clear_diagnostics_dir(paths.diagnostics_dir, log) del row del scan def _process_directory(root_dir: Path, log: logging.Logger, paths: ProjectPaths, args: argparse.Namespace, ) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] for dirpath, dirnames, files in os.walk(root_dir): base = Path(dirpath) if len(files) > 0: parent_files=[] for file in files: parent_files.append(base / file ) if len(files)==1: if files[0]=='.DS_Store': continue fits_files = sorted([path for path in parent_files if path.name.lower().endswith(".fits")]) if fits_files: try: src, _freq_mhz, band = parse_source_frequency_band_from_path_if_folder(base, log) except Exception as exc: log.warning("Skipping directory %s: %s", base, exc) continue # De-dup is handled by the processed_files registry (path + hash). paths_to_process = fits_files paths_to_process.reverse() if len(paths_to_process) > 0: log.info('*'*80) log.info('File stats') log.info("*"*80) log.info("Directory: %s", base) log.info("Total files: %s", len(parent_files)) log.info("New files: %s", len(paths_to_process)) log.info('-'*80) log.debug('\n') for fits_path in paths_to_process: if '.DS_Store' in str(fits_path): continue if not _validate_symlink(fits_path, paths, log): continue if fits_path.stat().st_size == 0: record_invalid_path_once(fits_path,paths, log, "empty file") continue skip, file_hash, file_size, file_mtime = _should_skip_by_registry( fits_path, paths, log ) if skip: log.debug("Skipping duplicate file by registry: %s", fits_path) continue log.info(f"\nWorking on path: {fits_path}") record = extract_observation(fits_path, paths,band, log) scan = [record] results.append(record) if src==None: src=record["OBJECT"] if band==None: band=record["BAND"] if _freq_mhz==None: _freq_mhz=int(record["CENTFREQ"]) row = populate_row(scan, band, paths,log,args) disallowed_keys: set[str] = {"UISER_LONG", "GAIN1", "GAIN2", "ALTGAIN1","ALTGAIN2","ALTGAIN3"} # example row = {k: v for k, v in row.items() if k not in disallowed_keys} try: src=row['OBJECT'].replace(' ','').upper() except: pass table_name=f'{src}_{int(_freq_mhz)}'.upper() _ensure_and_insert(table_name,row,paths,log) _record_processed_file( fits_path, paths, log, args, file_hash=file_hash, file_size=file_size, file_mtime=file_mtime, ) clear_diagnostics_dir(paths.diagnostics_dir, log) del row del scan else: log.info(f"Directory {base} has {len(paths_to_process)} files, skipping process") else: log.info(f"Directory {base} has no ` fits files, skipping process") return