import glob,os,tempfile,shutil
import numpy as np
from osgeo import gdal
gdal.UseExceptions()
from polsartools.utils.utils import time_it
# from polsartools.utils.io_utils import write_T3, write_C3
from polsartools.preprocess.convert_S2 import convert_S
def read_rs2_tif(file):
ds = gdal.Open(file)
band1 = ds.GetRasterBand(1).ReadAsArray()
band2 = ds.GetRasterBand(2).ReadAsArray()
ds=None
return np.dstack((band1,band2))
def write_rst(out_file,data,
driver='GTiff', out_dtype=gdal.GDT_CFloat32,
mat=None,
cog=False, ovr=[2, 4, 8, 16], comp=False
):
if driver =='ENVI':
# Create GDAL dataset
driver = gdal.GetDriverByName(driver)
dataset = driver.Create(
out_file,
data.shape[1],
data.shape[0],
1,
out_dtype
)
else:
driver = gdal.GetDriverByName("GTiff")
options = ['BIGTIFF=IF_SAFER']
if comp:
# options += ['COMPRESS=DEFLATE', 'PREDICTOR=2', 'ZLEVEL=9']
options += ['COMPRESS=LZW']
if cog:
options += ['TILED=YES', 'BLOCKXSIZE=512', 'BLOCKYSIZE=512']
dataset = driver.Create(
out_file,
data.shape[1],
data.shape[0],
1,
out_dtype,
options
)
if cog:
dataset.BuildOverviews("NEAREST", ovr)
dataset.GetRasterBand(1).WriteArray(data)
# outdata.GetRasterBand(1).SetNoDataValue(0)##if you want these values transparent
dataset.FlushCache() ##saves to disk!!
dataset = None
if mat == 'S2' or mat == 'Sxy':
print(f"Saved file: {out_file}")
def write_s2_bin(file,wdata):
[cols, rows] = wdata.shape
driver = gdal.GetDriverByName("ENVI")
outdata = driver.Create(file, rows, cols, 1, gdal.GDT_CFloat32)
outdata.SetDescription(file)
outdata.GetRasterBand(1).WriteArray(wdata)
outdata.FlushCache()
def read_bin(file):
ds = gdal.Open(file,gdal.GA_ReadOnly)
band = ds.GetRasterBand(1)
arr = band.ReadAsArray()
return arr
def write_bin_s2(file,wdata,refData):
# ds = gdal.Open(refData)
[cols, rows] = wdata.shape
driver = gdal.GetDriverByName("ENVI")
outdata = driver.Create(file, rows, cols, 1, gdal.GDT_Float32)
# outdata.SetGeoTransform(ds.GetGeoTransform())##sets same geotransform as input
# outdata.SetProjection(ds.GetProjection())##sets same projection as input
outdata.SetDescription(file)
outdata.GetRasterBand(1).WriteArray(wdata)
# outdata.GetRasterBand(1).SetNoDataValue(np.NaN)##if you want these values transparent
outdata.FlushCache() ##saves to disk!!
def write_bin(file,wdata):
# ds = gdal.Open(refData)
[cols, rows] = wdata.shape
driver = gdal.GetDriverByName("ENVI")
outdata = driver.Create(file, rows, cols, 1, gdal.GDT_Float32)
# outdata.SetGeoTransform(ds.GetGeoTransform())##sets same geotransform as input
# outdata.SetProjection(ds.GetProjection())##sets same projection as input
outdata.SetDescription(file)
outdata.GetRasterBand(1).WriteArray(wdata)
# outdata.GetRasterBand(1).SetNoDataValue(np.NaN)##if you want these values transparent
outdata.FlushCache() ##saves to disk!!
[docs]
@time_it
def import_chyaan2_fp(in_dir,mat='T3',azlks=None,rglks=None,
fmt='tif', cog=False,ovr = [2, 4, 8, 16],comp=False,
out_dir=None,
recip=False):
"""
Extracts specified matrix elements (S2, T3, or C3) from Chandrayaan-II DFSAR Full-Pol data
and saves them into respective directories.
Example:
--------
>>> import_chyaan2_fp("path_to_folder", mat='T3', azlks=50, rglks=2)
This will extract the T3 matrix elements from the Chandrayaan-II DFSAR Full-Pol data
in the specified folder and save them in the 'T3' directory.
Parameters:
-----------
in_dir : str
The path to the folder containing the Chandrayaan-II DFSAR Full-Pol data files.
mat : str, optional (default='T3')
Type of matrix to extract. Valid options: 'S2', 'C4, 'C3', 'T4',
'T3', 'C2HX', 'C2VX', 'C2HV','T2HV'
azlks : int, optional (default=None)
The number of azimuth looks for multi-looking. If not specified, the value is derived from
the ground range and output line spacing.
rglks : int, optional (default=None)
The number of range looks for multi-looking. If not specified, the value is set to 1.
fmt : {'tif', 'bin'}, optional (default='tif')
Output format:
- 'tif': GeoTIFF
- 'bin': Raw binary format
cog : bool, optional (default=False)
If True, outputs will be saved as Cloud Optimized GeoTIFFs with internal tiling and overviews.
ovr : list of int, optional (default=[2, 4, 8, 16])
Overview levels for COG generation. Ignored if cog=False.
comp : bool, optional (default=False)
If True, applies LZW compression to GeoTIFF outputs.
out_dir : str or None, optional (default=None)
Directory to save output files. If None, a folder named after the matrix type will be created
in the same location as the input file.
recip : bool, optional (default=False)
If True, scattering matrix reciprocal symmetry is applied, i.e, S_HV = S_VH.
"""
#%%
valid_full_pol = ['S2', 'C4', 'C3', 'T4', 'T3', 'C2HX', 'C2VX', 'C2HV', 'T2HV']
# valid_dual_pol = ['Sxy', 'C2', 'T2']
valid_matrices = valid_full_pol
if mat not in valid_matrices:
raise ValueError(f"Invalid matrix type '{mat}'. \n Supported types are:\n"
f" Full-pol: {sorted(valid_full_pol)}\n")
temp_dir = None
ext = 'bin' if fmt == 'bin' else 'tif'
driver = 'ENVI' if fmt == 'bin' else None
# Final output directory
if out_dir is None:
final_out_dir = os.path.join(in_dir, mat)
else:
final_out_dir = os.path.join(out_dir, mat)
os.makedirs(final_out_dir, exist_ok=True)
# Intermediate output directory
if mat in ['S2', 'Sxy']:
base_out_dir = final_out_dir
else:
temp_dir = tempfile.mkdtemp(prefix='temp_S2_')
base_out_dir = temp_dir
xmlFile = glob.glob(os.path.join(in_dir,'data/calibrated/*/*sli*.xml'))[0]
fxml = open(xmlFile, 'r')
for line in fxml:
if "output_line_spacing" in line:
# print("output_line_spacing: ", line.split('>')[1].split('<')[0])
ols = float(line.split('>')[1].split('<')[0])
if "output_pixel_spacing" in line:
# print("output_pixel_spacing: ", line.split('>')[1].split('<')[0])
ops = float(line.split('>')[1].split('<')[0])
if "isda:incidence_angle" in line:
# print("incidence_angle: ", line.split('>')[1].split('<')[0])
inc= float( line.split('>')[1].split('<')[0])
if "isda:calibration_constant" in line:
cc = float( line.split('>')[1].split('<')[0])
if "isda:pulse_bandwidth" in line:
bw = float( line.split('>')[1].split('<')[0])/1000000
fxml.close()
gRange = ops/np.sin(inc*np.pi/180)
# multi-llok factor
mlf = int(np.round(gRange/ols,0))
ds = gdal.Open(glob.glob(in_dir+'/data/calibrated/*/*sli*_hh_*.tif')[0])
cols = ds.RasterXSize
rows = ds.RasterYSize
lines = ['output_line_spacing '+ str(ols)+'\n',
'output_pixel_spacing '+ str(ops)+'\n',
'ground_range '+ str(gRange)+'\n',
'mlook_factor '+ str(mlf)+'\n',
'incidence_angle '+ str(inc)+'\n',
'calibration_constant '+str(cc)+'\n',
'pulse_bandwidth '+str(bw)+'\n',
'lines '+ str(rows)+'\n',
'samples '+str(cols)+'\n'
]
calFactor = 1/np.sqrt(10**(cc/10))
# print(lines)
inFile = glob.glob(os.path.join(in_dir, 'data/calibrated/*/*sli*_hh_*.tif'))[0]
S11 = read_rs2_tif(inFile)
write_rst(os.path.join(base_out_dir, f's11.{ext}'),
S11[:,:,0]*calFactor+1j*(S11[:,:,1]*calFactor),
driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
del S11
# write_s2_bin(out_file,data[:,:,0]*calFactor+1j*(data[:,:,1]*calFactor))
inFile = glob.glob(os.path.join(in_dir, 'data/calibrated/*/*sli*_hv_*.tif'))[0]
# data_xy = read_rs2_tif(inFile)
S12 = read_rs2_tif(inFile)
inFile = glob.glob(os.path.join(in_dir, 'data/calibrated/*/*sli*_vh_*.tif'))[0]
# data_yx = read_rs2_tif(inFile)
S21 = read_rs2_tif(inFile)
if recip:
S12 = (S12 + S21)/2
S21 = S12
write_rst(os.path.join(base_out_dir, f's12.{ext}'),
S12[:,:,0]*calFactor+1j*(S12[:,:,1]*calFactor),
driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
del S12
write_rst(os.path.join(base_out_dir, f's21.{ext}'),
S21[:,:,0]*calFactor+1j*(S21[:,:,1]*calFactor),
driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
del S21
inFile = glob.glob(os.path.join(in_dir, 'data/calibrated/*/*sli*_vv_*.tif'))[0]
S22 = read_rs2_tif(inFile)
write_rst(os.path.join(base_out_dir, f's22.{ext}'),
S22[:,:,0]*calFactor+1j*(S22[:,:,1]*calFactor),
driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
del S22
with open(os.path.join(final_out_dir, 'multilook_info.txt'), 'w') as f:
f.writelines(lines)
f.close()
if azlks == None and rglks == None:
azlks = mlf
rglks = 1
# Matrix conversion if needed
if mat not in ['S2', 'Sxy']:
convert_S(base_out_dir, mat=mat, azlks=azlks, rglks=rglks, cf=1,
fmt=fmt, out_dir=final_out_dir, cog=cog, ovr=ovr, comp=comp)
# Clean up temp directory
if temp_dir:
try:
shutil.rmtree(temp_dir)
except Exception as e:
print(f"Warning: Could not delete temporary directory {temp_dir}: {e}")
#############################################################
[docs]
@time_it
def import_chyaan2_cp(in_dir,mat='C2',azlks=None,rglks=None,
fmt='tif', cog=False,ovr = [2, 4, 8, 16],comp=False,
out_dir=None
):
"""
Extracts specified matrix elements (Sxy, C2) from Chandrayaan-II DFSAR Compact-Pol data
and saves them into respective directories.
Example:
--------
>>> import_chyaan2_cp("path_to_folder", mat='C2', azlks=50, rglks=2)
This will extract the C2 matrix elements from the Chandrayaan-II DFSAR Compact-Pol data
in the specified folder and save them in the 'C2' directory.
Parameters:
-----------
in_dir : str
The path to the folder containing the Chandrayaan-II DFSAR Compact-Pol data files.
mat : str, optional (default='C2')
Type of matrix to extract. Valid options: 'Sxy', 'C2'
azlks : int, optional (default=None)
The number of azimuth looks for multi-looking. If not specified, the value is derived from
the ground range and output line spacing.
rglks : int, optional (default=None)
The number of range looks for multi-looking. If not specified, the value is set to 1.
fmt : {'tif', 'bin'}, optional (default='tif')
Output format:
- 'tif': GeoTIFF
- 'bin': Raw binary format
cog : bool, optional (default=False)
If True, outputs will be saved as Cloud Optimized GeoTIFFs with internal tiling and overviews.
ovr : list of int, optional (default=[2, 4, 8, 16])
Overview levels for COG generation. Ignored if cog=False.
comp : bool, optional (default=False)
If True, applies LZW compression to GeoTIFF outputs.
out_dir : str or None, optional (default=None)
Directory to save output files. If None, a folder named after the matrix type will be created
in the same location as the input file.
"""
#%%
# valid_full_pol = ['S2', 'C4', 'C3', 'T4', 'T3', 'C2HX', 'C2VX', 'C2HV', 'T2HV']
valid_dual_pol = ['Sxy', 'C2', 'T2']
valid_matrices = valid_dual_pol
if mat not in valid_matrices:
raise ValueError(f"Invalid matrix type '{mat}'. \n Supported types are:\n"
f" Full-pol: {sorted(valid_dual_pol)}\n")
temp_dir = None
ext = 'bin' if fmt == 'bin' else 'tif'
driver = 'ENVI' if fmt == 'bin' else None
# Final output directory
if out_dir is None:
final_out_dir = os.path.join(in_dir, mat)
else:
final_out_dir = os.path.join(out_dir, mat)
os.makedirs(final_out_dir, exist_ok=True)
# Intermediate output directory
if mat in ['Sxy']:
base_out_dir = final_out_dir
else:
temp_dir = tempfile.mkdtemp(prefix='temp_S2_')
base_out_dir = temp_dir
xmlFile = glob.glob(os.path.join(in_dir,'data/calibrated/*/ch2_sar_*sli*.xml'))[0]
fxml = open(xmlFile, 'r')
for line in fxml:
if "output_line_spacing" in line:
# print("output_line_spacing: ", line.split('>')[1].split('<')[0])
ols = float(line.split('>')[1].split('<')[0])
if "output_pixel_spacing" in line:
# print("output_pixel_spacing: ", line.split('>')[1].split('<')[0])
ops = float(line.split('>')[1].split('<')[0])
if "isda:incidence_angle" in line:
# print("incidence_angle: ", line.split('>')[1].split('<')[0])
inc= float( line.split('>')[1].split('<')[0])
if "isda:calibration_constant" in line:
cc = float( line.split('>')[1].split('<')[0])
if "isda:pulse_bandwidth" in line:
bw = float( line.split('>')[1].split('<')[0])/1000000
fxml.close()
gRange = ops/np.sin(inc*np.pi/180)
# multi-llok factor
mlf = int(np.round(gRange/ols,0))
ds = gdal.Open(glob.glob(in_dir+'/data/calibrated/*/*sli*_lh_*.tif')[0])
cols = ds.RasterXSize
rows = ds.RasterYSize
lines = ['output_line_spacing '+ str(ols)+'\n',
'output_pixel_spacing '+ str(ops)+'\n',
'ground_range '+ str(gRange)+'\n',
'mlook_factor '+ str(mlf)+'\n',
'incidence_angle '+ str(inc)+'\n',
'calibration_constant '+str(cc)+'\n',
'pulse_bandwidth '+str(bw)+'\n',
'lines '+ str(rows)+'\n',
'samples '+str(cols)+'\n'
]
calFactor = 1/np.sqrt(10**(cc/10))
# print(lines)
try:
inFile = glob.glob(os.path.join(in_dir, 'data/calibrated/*/*sli*_lh_*.tif'))[0]
except:
inFile = glob.glob(os.path.join(in_dir, 'data/calibrated/*/*sli*_rh_*.tif'))[0]
S11 = read_rs2_tif(inFile)
write_rst(os.path.join(base_out_dir, f's11.{ext}'),
S11[:,:,0]*calFactor+1j*(S11[:,:,1]*calFactor),
driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
del S11
try:
inFile = glob.glob(os.path.join(in_dir, 'data/calibrated/*/*sli*_lv_*.tif'))[0]
except:
inFile = glob.glob(os.path.join(in_dir, 'data/calibrated/*/*sli*_rv_*.tif'))[0]
# data_xy = read_rs2_tif(inFile)
S12 = read_rs2_tif(inFile)
write_rst(os.path.join(base_out_dir, f's12.{ext}'),
S12[:,:,0]*calFactor+1j*(S12[:,:,1]*calFactor),
driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
del S12
with open(os.path.join(final_out_dir, 'multilook_info.txt'), 'w') as f:
f.writelines(lines)
f.close()
if azlks == None and rglks == None:
azlks = mlf
rglks = 1
# Matrix conversion if needed
if mat not in ['Sxy']:
convert_S(base_out_dir, mat=mat, azlks=azlks, rglks=rglks, cf=1,
fmt=fmt, out_dir=final_out_dir, cog=cog, ovr=ovr, comp=comp)
# Clean up temp directory
if temp_dir:
try:
shutil.rmtree(temp_dir)
except Exception as e:
print(f"Warning: Could not delete temporary directory {temp_dir}: {e}")