import os,glob,re
import struct
import numpy as np
from scipy.interpolate import RegularGridInterpolator
from osgeo import gdal
gdal.UseExceptions()
from polsartools.utils.utils import conv2d,time_it, mlook_arr
def get_bandmeta(filepath):
metadata_dict = {}
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if '=' in line:
key, value = line.split('=', 1)
metadata_dict[key.strip()] = value.strip()
return metadata_dict
def extract_lengths(leader_file, data_file):
def htonl(val):
"""Convert unsigned int from host to network byte order"""
return struct.unpack(">I", struct.pack("<I", val))[0]
tmp = bytearray(32768)
# --- Read Leader File ---
with open(leader_file, "rb") as f:
for _ in range(9): # Skipping 9 known records
f.read(8)
length = htonl(struct.unpack("<I", f.read(4))[0])
f.read(length - 12)
f.read(8332)
calib_factor = f.read(16).decode().strip()
# --- Read Image File ---
with open(data_file, "rb") as f:
f.seek(8)
header_length = htonl(struct.unpack("<I", f.read(4))[0])
f.seek(186)
data_record_length = int(f.read(6).decode())
f.seek(276)
prefix_record_length = int(f.read(6).decode())
# print("header_length:",header_length)
# print("prefix_record_length:",prefix_record_length)
# print("data_record_length:",data_record_length)
return header_length,prefix_record_length,data_record_length
def load_data(filepath, NoScans, header_length, record_length, prefix_length,
bytes_per_pixel=4, dtype=np.dtype('>i2')):
prefix_length += 12
pixels_per_line = (record_length - prefix_length) // bytes_per_pixel
# image_lines = NoScans
# payload_bytes_per_line = pixels_per_line * bytes_per_pixel
with open(filepath, 'rb') as f:
f.seek(header_length)
full_data = np.frombuffer(f.read(NoScans * record_length), dtype=np.uint8)
full_data = full_data.reshape((NoScans, record_length))
image_payload = full_data[:, prefix_length:]
raw_pixels = image_payload.reshape(-1).view(dtype)
image = raw_pixels.reshape(NoScans, pixels_per_line, 2)
return image
def write_rst(file,wdata,dtype,cog=False,ovr=[2,4,8,16],comp=False):
[cols, rows] = wdata.shape
if '.bin' in file:
driver = gdal.GetDriverByName("ENVI")
outdata = driver.Create(file, rows, cols, 1, dtype)
else:
driver = gdal.GetDriverByName("GTiff")
options = ['BIGTIFF=IF_SAFER']
if comp:
options += ['COMPRESS=LZW']
if cog:
options += ['TILED=YES', 'BLOCKXSIZE=512', 'BLOCKYSIZE=512']
outdata = driver.Create(file, rows, cols, 1, dtype,options)
outdata.SetDescription(file)
outdata.GetRasterBand(1).WriteArray(wdata)
outdata.FlushCache()
if cog:
outdata.BuildOverviews("NEAREST", ovr)
outdata = None
def inrp_inc(inc_array, target_shape):
input_shape = inc_array.shape
y = np.linspace(0, 1, input_shape[0]) # scan direction
x = np.linspace(0, 1, input_shape[1]) # pixel direction
# Create interpolator
interpolator = RegularGridInterpolator((y, x), inc_array, bounds_error=False, fill_value=None)
target_y = np.linspace(0, 1, target_shape[0])
target_x = np.linspace(0, 1, target_shape[1])
yy, xx = np.meshgrid(target_y, target_x, indexing='ij')
# Stack points as (N, 2) for interpolation
points = np.stack([yy.ravel(), xx.ravel()], axis=-1)
interpolated = interpolator(points).reshape(target_shape)
return interpolated.astype(np.float32)
def get_inc(file_path):
inc_angles = []
num_records = None
num_samples = None
with open(file_path, 'r') as f:
for line in f:
# Extract shape from header lines
if "#Number of Records in Grid:" in line:
num_records = int(re.search(r'\d+', line).group())
elif "#Number of Samples in Grid:" in line:
num_samples = int(re.search(r'\d+', line).group())
elif line.strip().startswith("#") or line.strip() == "":
continue
else:
parts = line.strip().split()
if len(parts) >= 4:
try:
inc_angles.append(float(parts[3]))
except ValueError:
pass
inc_array = np.array(inc_angles)
if num_records and num_samples:
inc_array = inc_array.reshape((num_records, num_samples))
return inc_array.astype(np.float32)
[docs]
@time_it
def import_risat_l11(in_dir,mat='C2',
azlks=10,rglks=7,
fmt='tif', cog=False,ovr = [2, 4, 8, 16],comp=False,
out_dir = None):
"""
Extracts Sxy or C2 matrix elements from RISAT-1A compact-pol SLC data and saves them as raster files.
This function performs multi-looking using specified azimuth and range looks, and outputs matrix elements
in either GeoTIFF or binary format. It also extracts the incidence angle map and saves it alongside the matrix outputs.
Optional support for Cloud Optimized GeoTIFFs (COGs), compression, and custom output directories.
Examples
--------
>>> import_risat_l11("path_to_folder", mat='C2', azlks=10, rglks=7, fmt='tif')
Extracts C2 matrix elements and the incidence angle map from RISAT-1A SLC data and saves them as GeoTIFFs.
Parameters
----------
in_dir : str
Path to the RISAT-1A SLC folder containing the data and metadata.
mat : str, optional (default='C2')
Type of matrix to extract. Valid options are:
- 'Sxy': Scattering matrix elements
- 'C2' : Covariance matrix for compact-pol
azlks : int, optional (default=10)
Number of azimuth looks for multi-looking.
rglks : int, optional (default=7)
Number of range looks for multi-looking.
fmt : {'tif', 'bin'}, optional (default='tif')
Output format:
- 'tif': GeoTIFF in slant range
- 'bin': Raw binary format
cog : bool, optional (default=False)
If True, outputs will be saved as Cloud Optimized GeoTIFFs with internal tiling and overviews.
ovr : list of int, optional (default=[2, 4, 8, 16])
Overview levels for COG generation. Ignored if cog=False.
comp : bool, optional (default=False)
If True, applies LZW compression to GeoTIFF outputs.
out_dir : str or None, optional (default=None)
Directory to save output files. If None, a folder named after the matrix type will be created
in the same location as the input folder.
"""
band_meta = os.path.join(in_dir,'BAND_META.txt')
metadata_dict = get_bandmeta(band_meta)
ImagingMode = str(metadata_dict['ImagingMode'])
NoOfPolarizations=int(metadata_dict['NoOfPolarizations'])
polList = []
for ii in range(NoOfPolarizations):
polList.append(metadata_dict[f'TxRxPol{ii+1}'])
print(f"Detected {ImagingMode}: {polList} acquired on {metadata_dict['DateOfPass']}")
sr_grid = glob.glob(os.path.join(in_dir,f'*{polList[0]}_L1_SlantRange_grid.txt'))[0]
leader_file = glob.glob(os.path.join(in_dir,f'scene_{polList[0]}/lea_01.001'))[0]
data_file = glob.glob(os.path.join(in_dir,f'scene_{polList[0]}/dat_01.001'))[0]
header_length,prefix_length,record_length = extract_lengths(leader_file, data_file)
# print(header_length,prefix_length,record_length)
inc_array = get_inc(sr_grid)
target_shape = (int(metadata_dict['NoScans']), int(metadata_dict['NoPixels']))
resized_inc = inrp_inc(inc_array, target_shape)
if out_dir is None:
out_dir = in_dir
else:
os.makedirs(out_dir, exist_ok=True)
# print("Resized incidence angle shape:", resized_inc.shape)
if fmt=='bin':
write_rst(os.path.join(out_dir,'inc.bin'),resized_inc,gdal.GDT_Float32)
print(f"Saved file: {os.path.join(out_dir,'inc.bin')}")
else:
write_rst(os.path.join(out_dir,'inc.tif'),resized_inc,gdal.GDT_Float32,cog,ovr,comp)
print(f"Saved file: {os.path.join(out_dir,'inc.tif')}")
del inc_array
if 'RH' in polList and 'RV' in polList:
inFile = glob.glob(os.path.join(in_dir,'scene_RH/dat_01.001'))[0]
s11 = load_data(inFile, int(metadata_dict['NoScans']),
header_length, record_length, prefix_length,
bytes_per_pixel=int(metadata_dict['BytesPerPixel']), dtype=np.dtype('>i2'))
cc_dB = float(metadata_dict['Calibration_Constant_RH'])
cc_lin_rh = np.sqrt(10**(cc_dB/10))
inc_center = float(metadata_dict['IncidenceAngle'])
# print(cc_dB,inc_center)
s11 = ((s11[:,:,0]*np.sqrt(np.sin(resized_inc*np.pi/180)/np.sin(inc_center*np.pi/180))/cc_lin_rh)+\
1j*(s11[:,:,1]*np.sqrt(np.sin(resized_inc*np.pi/180)/np.sin(inc_center*np.pi/180))/cc_lin_rh)).astype(np.complex64)
inFile = glob.glob(os.path.join(in_dir,'scene_RV/dat_01.001'))[0]
s21 = load_data(inFile, int(metadata_dict['NoScans']),
header_length, record_length, prefix_length,
bytes_per_pixel=int(metadata_dict['BytesPerPixel']), dtype=np.dtype('>i2'))
cc_dB = float(metadata_dict['Calibration_Constant_RV'])
# print(cc_dB)
cc_lin_rv = np.sqrt(10**(cc_dB/10))
s21 = ((s21[:,:,0]*np.sqrt(np.sin(resized_inc*np.pi/180)/np.sin(inc_center*np.pi/180))/cc_lin_rv)+\
1j*(s21[:,:,1]*np.sqrt(np.sin(resized_inc*np.pi/180)/np.sin(inc_center*np.pi/180))/cc_lin_rv)).astype(np.complex64)
if mat == 'Sxy':
outFolder = os.path.join(out_dir,'Sxy')
os.makedirs(outFolder, exist_ok=True)
if fmt=='bin':
write_rst(os.path.join(outFolder,'s11.bin'),s11,gdal.GDT_CFloat32)
print(f"Saved file: {os.path.join(outFolder,'s11.bin')}")
write_rst(os.path.join(outFolder,'s21.bin'),s21,gdal.GDT_CFloat32)
print(f"Saved file: {os.path.join(outFolder,'s21.bin')}")
else:
write_rst(os.path.join(outFolder,'s11.tif'),s11,gdal.GDT_CFloat32,cog,ovr,comp)
print(f"Saved file: {os.path.join(outFolder,'s11.tif')}")
write_rst(os.path.join(outFolder,'s21.tif'),s21,gdal.GDT_CFloat32,cog,ovr,comp)
print(f"Saved file: {os.path.join(outFolder,'s21.tif')}")
elif mat == 'C2':
outFolder = os.path.join(out_dir,'C2')
os.makedirs(outFolder, exist_ok=True)
write_rst(os.path.join(outFolder,'inc.tif'),mlook_arr(resized_inc,azlks,rglks).astype(np.float32),
gdal.GDT_Float32,cog,ovr,comp)
print(f"Saved file: {os.path.join(outFolder,'inc.tif')}")
del resized_inc
if fmt=='bin':
write_rst(os.path.join(outFolder,'C11.bin'),mlook_arr(np.abs(s11)**2,azlks,rglks).astype(np.float32),gdal.GDT_Float32)
print(f"Saved file: {os.path.join(outFolder,'C11.bin')}")
write_rst(os.path.join(outFolder,'C22.bin'),mlook_arr(np.abs(s21)**2,azlks,rglks).astype(np.float32),gdal.GDT_Float32)
print(f"Saved file: {os.path.join(outFolder,'C22.bin')}")
else:
write_rst(os.path.join(outFolder,'C11.tif'),mlook_arr(np.abs(s11)**2,azlks,rglks).astype(np.float32),gdal.GDT_Float32,cog,ovr,comp)
print(f"Saved file: {os.path.join(outFolder,'C11.tif')}")
write_rst(os.path.join(outFolder,'C22.tif'),mlook_arr(np.abs(s21)**2,azlks,rglks).astype(np.float32),gdal.GDT_Float32,cog,ovr,comp)
print(f"Saved file: {os.path.join(outFolder,'C22.tif')}")
C12 = mlook_arr(s11*np.conjugate(s21),azlks,rglks).astype(np.complex64)
rows,cols = C12.shape
if fmt=='bin':
write_rst(os.path.join(outFolder,'C12_real.bin'),np.real(C12).astype(np.float32),gdal.GDT_Float32)
print(f"Saved file: {os.path.join(outFolder,'C12_real.bin')}")
write_rst(os.path.join(outFolder,'C12_imag.bin'),np.imag(C12).astype(np.float32),gdal.GDT_Float32)
print(f"Saved file: {os.path.join(outFolder,'C12_imag.bin')}")
else:
write_rst(os.path.join(outFolder,'C12_real.tif'),np.real(C12).astype(np.float32),gdal.GDT_Float32,cog,ovr,comp)
print(f"Saved file: {os.path.join(outFolder,'C12_real.tif')}")
write_rst(os.path.join(outFolder,'C12_imag.tif'),np.imag(C12).astype(np.float32),gdal.GDT_Float32,cog,ovr,comp)
print(f"Saved file: {os.path.join(outFolder,'C12_imag.tif')}")
file = open(outFolder +'/config.txt',"w+")
file.write('Nrow\n%d\n---------\nNcol\n%d\n---------\nPolarCase\nmonostatic\n---------\nPolarType\npp1'%(rows,cols))
file.close()
else:
print("Only Sxy, C2 matrices are supported for RISAT compact-pol data.\
Please check the mat argument.\
Example mat='C2' or 'Sxy'")