Source code for polsartools.sensors.risat


import os,glob,re
import struct
import numpy as np
from scipy.interpolate import RegularGridInterpolator
from osgeo import gdal
gdal.UseExceptions()
from polsartools.utils.utils import conv2d,time_it, mlook_arr

def get_bandmeta(filepath):
    metadata_dict = {}
    with open(filepath, 'r') as f:
        for line in f:
            line = line.strip()
            if '=' in line:
                key, value = line.split('=', 1) 
                metadata_dict[key.strip()] = value.strip()
    return metadata_dict

def extract_lengths(leader_file, data_file):
    def htonl(val):
        """Convert unsigned int from host to network byte order"""
        return struct.unpack(">I", struct.pack("<I", val))[0]
    tmp = bytearray(32768)

    # --- Read Leader File ---
    with open(leader_file, "rb") as f:
        for _ in range(9):  # Skipping 9 known records
            f.read(8)
            length = htonl(struct.unpack("<I", f.read(4))[0])
            f.read(length - 12)

        f.read(8332)
        calib_factor = f.read(16).decode().strip()

    # --- Read Image File ---
    with open(data_file, "rb") as f:
        f.seek(8)
        header_length = htonl(struct.unpack("<I", f.read(4))[0])

        f.seek(186)
        data_record_length = int(f.read(6).decode())

        f.seek(276)
        prefix_record_length = int(f.read(6).decode())

    # print("header_length:",header_length)
    # print("prefix_record_length:",prefix_record_length)
    # print("data_record_length:",data_record_length)

    return header_length,prefix_record_length,data_record_length

def load_data(filepath, NoScans, header_length, record_length, prefix_length,
                                 bytes_per_pixel=4, dtype=np.dtype('>i2')):
    
    prefix_length += 12
    
    pixels_per_line = (record_length - prefix_length) // bytes_per_pixel
    # image_lines = NoScans
    # payload_bytes_per_line = pixels_per_line * bytes_per_pixel

    with open(filepath, 'rb') as f:
        f.seek(header_length)
        full_data = np.frombuffer(f.read(NoScans * record_length), dtype=np.uint8)
    
    full_data = full_data.reshape((NoScans, record_length))
    image_payload = full_data[:, prefix_length:]


    raw_pixels = image_payload.reshape(-1).view(dtype)
    image = raw_pixels.reshape(NoScans, pixels_per_line, 2)

    return image

def write_rst(file,wdata,dtype,cog=False,ovr=[2,4,8,16],comp=False):
    [cols, rows] = wdata.shape
    if '.bin' in file:
        driver = gdal.GetDriverByName("ENVI")
        outdata = driver.Create(file, rows, cols, 1, dtype)
    else:
        driver = gdal.GetDriverByName("GTiff")
        
        options = ['BIGTIFF=IF_SAFER']
        if comp:
            options += ['COMPRESS=LZW']
        if cog:
            options += ['TILED=YES', 'BLOCKXSIZE=512', 'BLOCKYSIZE=512']
            
        outdata = driver.Create(file, rows, cols, 1, dtype,options)
    
    outdata.SetDescription(file)
    outdata.GetRasterBand(1).WriteArray(wdata)
    outdata.FlushCache() 
    if cog:
        outdata.BuildOverviews("NEAREST", ovr)
    outdata = None


def inrp_inc(inc_array, target_shape):
    input_shape = inc_array.shape
    y = np.linspace(0, 1, input_shape[0])  # scan direction
    x = np.linspace(0, 1, input_shape[1])  # pixel direction

    # Create interpolator
    interpolator = RegularGridInterpolator((y, x), inc_array, bounds_error=False, fill_value=None)
    target_y = np.linspace(0, 1, target_shape[0])
    target_x = np.linspace(0, 1, target_shape[1])
    yy, xx = np.meshgrid(target_y, target_x, indexing='ij')

    # Stack points as (N, 2) for interpolation
    points = np.stack([yy.ravel(), xx.ravel()], axis=-1)
    interpolated = interpolator(points).reshape(target_shape)

    return interpolated.astype(np.float32)

def get_inc(file_path):
    inc_angles = []
    num_records = None
    num_samples = None

    with open(file_path, 'r') as f:
        for line in f:
            # Extract shape from header lines
            if "#Number of Records in Grid:" in line:
                num_records = int(re.search(r'\d+', line).group())
            elif "#Number of Samples in Grid:" in line:
                num_samples = int(re.search(r'\d+', line).group())

            elif line.strip().startswith("#") or line.strip() == "":
                continue  

            else:
                parts = line.strip().split()
                if len(parts) >= 4:
                    try:
                        inc_angles.append(float(parts[3]))
                    except ValueError:
                        pass

    inc_array = np.array(inc_angles)

    if num_records and num_samples:
        inc_array = inc_array.reshape((num_records, num_samples))


    return inc_array.astype(np.float32)

[docs] @time_it def import_risat_l11(in_dir,mat='C2', azlks=10,rglks=7, fmt='tif', cog=False,ovr = [2, 4, 8, 16],comp=False, out_dir = None): """ Extracts Sxy or C2 matrix elements from RISAT-1A compact-pol SLC data and saves them as raster files. This function performs multi-looking using specified azimuth and range looks, and outputs matrix elements in either GeoTIFF or binary format. It also extracts the incidence angle map and saves it alongside the matrix outputs. Optional support for Cloud Optimized GeoTIFFs (COGs), compression, and custom output directories. Examples -------- >>> import_risat_l11("path_to_folder", mat='C2', azlks=10, rglks=7, fmt='tif') Extracts C2 matrix elements and the incidence angle map from RISAT-1A SLC data and saves them as GeoTIFFs. Parameters ---------- in_dir : str Path to the RISAT-1A SLC folder containing the data and metadata. mat : str, optional (default='C2') Type of matrix to extract. Valid options are: - 'Sxy': Scattering matrix elements - 'C2' : Covariance matrix for compact-pol azlks : int, optional (default=10) Number of azimuth looks for multi-looking. rglks : int, optional (default=7) Number of range looks for multi-looking. fmt : {'tif', 'bin'}, optional (default='tif') Output format: - 'tif': GeoTIFF in slant range - 'bin': Raw binary format cog : bool, optional (default=False) If True, outputs will be saved as Cloud Optimized GeoTIFFs with internal tiling and overviews. ovr : list of int, optional (default=[2, 4, 8, 16]) Overview levels for COG generation. Ignored if cog=False. comp : bool, optional (default=False) If True, applies LZW compression to GeoTIFF outputs. out_dir : str or None, optional (default=None) Directory to save output files. If None, a folder named after the matrix type will be created in the same location as the input folder. """ band_meta = os.path.join(in_dir,'BAND_META.txt') metadata_dict = get_bandmeta(band_meta) ImagingMode = str(metadata_dict['ImagingMode']) NoOfPolarizations=int(metadata_dict['NoOfPolarizations']) polList = [] for ii in range(NoOfPolarizations): polList.append(metadata_dict[f'TxRxPol{ii+1}']) print(f"Detected {ImagingMode}: {polList} acquired on {metadata_dict['DateOfPass']}") sr_grid = glob.glob(os.path.join(in_dir,f'*{polList[0]}_L1_SlantRange_grid.txt'))[0] leader_file = glob.glob(os.path.join(in_dir,f'scene_{polList[0]}/lea_01.001'))[0] data_file = glob.glob(os.path.join(in_dir,f'scene_{polList[0]}/dat_01.001'))[0] header_length,prefix_length,record_length = extract_lengths(leader_file, data_file) # print(header_length,prefix_length,record_length) inc_array = get_inc(sr_grid) target_shape = (int(metadata_dict['NoScans']), int(metadata_dict['NoPixels'])) resized_inc = inrp_inc(inc_array, target_shape) if out_dir is None: out_dir = in_dir else: os.makedirs(out_dir, exist_ok=True) # print("Resized incidence angle shape:", resized_inc.shape) if fmt=='bin': write_rst(os.path.join(out_dir,'inc.bin'),resized_inc,gdal.GDT_Float32) print(f"Saved file: {os.path.join(out_dir,'inc.bin')}") else: write_rst(os.path.join(out_dir,'inc.tif'),resized_inc,gdal.GDT_Float32,cog,ovr,comp) print(f"Saved file: {os.path.join(out_dir,'inc.tif')}") del inc_array if 'RH' in polList and 'RV' in polList: inFile = glob.glob(os.path.join(in_dir,'scene_RH/dat_01.001'))[0] s11 = load_data(inFile, int(metadata_dict['NoScans']), header_length, record_length, prefix_length, bytes_per_pixel=int(metadata_dict['BytesPerPixel']), dtype=np.dtype('>i2')) cc_dB = float(metadata_dict['Calibration_Constant_RH']) cc_lin_rh = np.sqrt(10**(cc_dB/10)) inc_center = float(metadata_dict['IncidenceAngle']) # print(cc_dB,inc_center) s11 = ((s11[:,:,0]*np.sqrt(np.sin(resized_inc*np.pi/180)/np.sin(inc_center*np.pi/180))/cc_lin_rh)+\ 1j*(s11[:,:,1]*np.sqrt(np.sin(resized_inc*np.pi/180)/np.sin(inc_center*np.pi/180))/cc_lin_rh)).astype(np.complex64) inFile = glob.glob(os.path.join(in_dir,'scene_RV/dat_01.001'))[0] s21 = load_data(inFile, int(metadata_dict['NoScans']), header_length, record_length, prefix_length, bytes_per_pixel=int(metadata_dict['BytesPerPixel']), dtype=np.dtype('>i2')) cc_dB = float(metadata_dict['Calibration_Constant_RV']) # print(cc_dB) cc_lin_rv = np.sqrt(10**(cc_dB/10)) s21 = ((s21[:,:,0]*np.sqrt(np.sin(resized_inc*np.pi/180)/np.sin(inc_center*np.pi/180))/cc_lin_rv)+\ 1j*(s21[:,:,1]*np.sqrt(np.sin(resized_inc*np.pi/180)/np.sin(inc_center*np.pi/180))/cc_lin_rv)).astype(np.complex64) if mat == 'Sxy': outFolder = os.path.join(out_dir,'Sxy') os.makedirs(outFolder, exist_ok=True) if fmt=='bin': write_rst(os.path.join(outFolder,'s11.bin'),s11,gdal.GDT_CFloat32) print(f"Saved file: {os.path.join(outFolder,'s11.bin')}") write_rst(os.path.join(outFolder,'s21.bin'),s21,gdal.GDT_CFloat32) print(f"Saved file: {os.path.join(outFolder,'s21.bin')}") else: write_rst(os.path.join(outFolder,'s11.tif'),s11,gdal.GDT_CFloat32,cog,ovr,comp) print(f"Saved file: {os.path.join(outFolder,'s11.tif')}") write_rst(os.path.join(outFolder,'s21.tif'),s21,gdal.GDT_CFloat32,cog,ovr,comp) print(f"Saved file: {os.path.join(outFolder,'s21.tif')}") elif mat == 'C2': outFolder = os.path.join(out_dir,'C2') os.makedirs(outFolder, exist_ok=True) write_rst(os.path.join(outFolder,'inc.tif'),mlook_arr(resized_inc,azlks,rglks).astype(np.float32), gdal.GDT_Float32,cog,ovr,comp) print(f"Saved file: {os.path.join(outFolder,'inc.tif')}") del resized_inc if fmt=='bin': write_rst(os.path.join(outFolder,'C11.bin'),mlook_arr(np.abs(s11)**2,azlks,rglks).astype(np.float32),gdal.GDT_Float32) print(f"Saved file: {os.path.join(outFolder,'C11.bin')}") write_rst(os.path.join(outFolder,'C22.bin'),mlook_arr(np.abs(s21)**2,azlks,rglks).astype(np.float32),gdal.GDT_Float32) print(f"Saved file: {os.path.join(outFolder,'C22.bin')}") else: write_rst(os.path.join(outFolder,'C11.tif'),mlook_arr(np.abs(s11)**2,azlks,rglks).astype(np.float32),gdal.GDT_Float32,cog,ovr,comp) print(f"Saved file: {os.path.join(outFolder,'C11.tif')}") write_rst(os.path.join(outFolder,'C22.tif'),mlook_arr(np.abs(s21)**2,azlks,rglks).astype(np.float32),gdal.GDT_Float32,cog,ovr,comp) print(f"Saved file: {os.path.join(outFolder,'C22.tif')}") C12 = mlook_arr(s11*np.conjugate(s21),azlks,rglks).astype(np.complex64) rows,cols = C12.shape if fmt=='bin': write_rst(os.path.join(outFolder,'C12_real.bin'),np.real(C12).astype(np.float32),gdal.GDT_Float32) print(f"Saved file: {os.path.join(outFolder,'C12_real.bin')}") write_rst(os.path.join(outFolder,'C12_imag.bin'),np.imag(C12).astype(np.float32),gdal.GDT_Float32) print(f"Saved file: {os.path.join(outFolder,'C12_imag.bin')}") else: write_rst(os.path.join(outFolder,'C12_real.tif'),np.real(C12).astype(np.float32),gdal.GDT_Float32,cog,ovr,comp) print(f"Saved file: {os.path.join(outFolder,'C12_real.tif')}") write_rst(os.path.join(outFolder,'C12_imag.tif'),np.imag(C12).astype(np.float32),gdal.GDT_Float32,cog,ovr,comp) print(f"Saved file: {os.path.join(outFolder,'C12_imag.tif')}") file = open(outFolder +'/config.txt',"w+") file.write('Nrow\n%d\n---------\nNcol\n%d\n---------\nPolarCase\nmonostatic\n---------\nPolarType\npp1'%(rows,cols)) file.close() else: print("Only Sxy, C2 matrices are supported for RISAT compact-pol data.\ Please check the mat argument.\ Example mat='C2' or 'Sxy'")