import numpy as np
from osgeo import gdal
import os, glob
import tempfile,shutil
from polsartools.utils.utils import time_it, mlook_arr
# from polsartools.utils.io_utils import write_T3, write_C3
from polsartools.preprocess.convert_S2 import convert_S
gdal.UseExceptions()
def read_bin(file):
ds = gdal.Open(file)
band = ds.GetRasterBand(1)
arr = band.ReadAsArray()
return arr
def read_a2(file):
fp = open(file,mode='rb')
fp.seek(232)
ch = int(fp.read(4))
# print(ch)
fp.seek(236)
nline = int(fp.read(8))
# print(nline)
fp.seek(248)
npixel = int(fp.read(8))
# print(npixel)
nrec = 544 + npixel*8
# print(nrec)
fp.seek(720)
data = np.frombuffer(fp.read(int(nrec * nline)), dtype='>f4')
data = np.array(data).reshape(-1,int(nrec/4))
# print(np.shape(data))
data = data[:,int(544/4):int(nrec/4)]
slc = data[:,::2] + 1j*data[:,1::2]
# print(np.shape(slc))
del data
return slc
def write_a2_rst(out_file,data,
driver='GTiff', out_dtype=gdal.GDT_CFloat32,
mat=None,
cog=False, ovr=[2, 4, 8, 16], comp=False
):
if driver =='ENVI':
# Create GDAL dataset
driver = gdal.GetDriverByName(driver)
dataset = driver.Create(
out_file,
data.shape[1],
data.shape[0],
1,
out_dtype
)
else:
driver = gdal.GetDriverByName("GTiff")
options = ['BIGTIFF=IF_SAFER']
if comp:
# options += ['COMPRESS=DEFLATE', 'PREDICTOR=2', 'ZLEVEL=9']
options += ['COMPRESS=LZW']
if cog:
options += ['TILED=YES', 'BLOCKXSIZE=512', 'BLOCKYSIZE=512']
dataset = driver.Create(
out_file,
data.shape[1],
data.shape[0],
1,
out_dtype,
options
)
dataset.GetRasterBand(1).WriteArray(data)
# outdata.GetRasterBand(1).SetNoDataValue(0)##if you want these values transparent
dataset.FlushCache() ##saves to disk!!
if cog:
dataset.BuildOverviews("NEAREST", ovr)
dataset = None
if mat == 'S2' or mat == 'Sxy':
print(f"Saved file: {out_file}")
[docs]
@time_it
def import_alos2_fbd_l11(in_dir,mat='C2', azlks=3,rglks=2,
fmt='tif', cog=False,ovr = [2, 4, 8, 16],comp=False,
out_dir=None,
cf_dB=-83):
"""
Extracts the C2 matrix elements (C11, C22, and C12) from ALOS-2 Fine Beam Dual-Pol (FBD) CEOS data
and saves them into respective binary files.
Example:
--------
>>> import_alos2_fbd_l11("path_to_folder", azlks=5, rglks=3)
This will extract the C2 matrix elements from the ALOS-2 Fine Beam Dual-Pol data
in the specified folder and save them in the 'C2' directory.
Parameters:
-----------
in_dir : str
The path to the folder containing the ALOS-2 Fine Beam Dual-Pol CEOS data files.
mat : str, optional (default = 'S2' or 'Sxy)
Type of matrix to extract. Valid options: 'Sxy','C2', 'T2'.
azlks : int, optional (default=3)
The number of azimuth looks for multi-looking.
rglks : int, optional (default=2)
The number of range looks for multi-looking.
fmt : {'tif', 'bin'}, optional (default='tif')
Output format:
- 'tif': GeoTIFF
- 'bin': Raw binary format
cog : bool, optional (default=False)
If True, outputs will be saved as Cloud Optimized GeoTIFFs with internal tiling and overviews.
ovr : list of int, optional (default=[2, 4, 8, 16])
Overview levels for COG generation. Ignored if cog=False.
comp : bool, optional (default=False)
If True, applies LZW compression to GeoTIFF outputs.
out_dir : str or None, optional (default=None)
Directory to save output files. If None, a folder named after the matrix type will be created
in the same location as the input file.
cf_dB : float, optional (default=-83)
The calibration factor in dB used to scale the raw radar data. It is applied to
the HH and HV polarization data before matrix computation.
Returns:
--------
None
The function does not return any value. Instead, it creates a folder named `C2`
(if not already present) and saves the following binary files:
- `C11.bin`: Contains the C11 matrix elements.
- `C22.bin`: Contains the C22 matrix elements.
- `C12_real.bin`: Contains the real part of the C12 matrix.
- `C12_imag.bin`: Contains the imaginary part of the C12 matrix.
- `config.txt`: A text file containing grid dimensions and polarimetric configuration.
Raises:
-------
FileNotFoundError
If the required ALOS-2 data files (e.g., `IMG-HH` and `IMG-HV`) cannot be found in the specified folder.
ValueError
If the calibration factor is invalid or if the files are not in the expected format.
"""
valid_dual_pol = ['Sxy', 'C2', 'T2']
valid_matrices = valid_dual_pol
if mat not in valid_matrices:
raise ValueError(f"Invalid matrix type '{mat}'. \n Supported types are:\n"
f" Dual-pol: {sorted(valid_dual_pol)}")
temp_dir = None
ext = 'bin' if fmt == 'bin' else 'tif'
driver = 'ENVI' if fmt == 'bin' else None
# Final output directory
if out_dir is None:
final_out_dir = os.path.join(in_dir, mat)
else:
final_out_dir = os.path.join(out_dir, mat)
os.makedirs(final_out_dir, exist_ok=True)
# Intermediate output directory
if mat in ['Sxy']:
base_out_dir = final_out_dir
else:
temp_dir = tempfile.mkdtemp(prefix='temp_S2_')
base_out_dir = temp_dir
hh_file = list(glob.glob(os.path.join(in_dir,'IMG-HH-*-FBDR1.1__A')) + \
glob.glob(os.path.join(in_dir, 'IMG-HH-*-FBDR1.1__D')))[0]
hv_file = list(glob.glob(os.path.join(in_dir,'IMG-HV-*-FBDR1.1__A')) + \
glob.glob(os.path.join(in_dir, 'IMG-HV-*-FBDR1.1__D')))[0]
calfac_linear = np.sqrt(10 ** ((cf_dB - 32) / 10))
S11 = read_a2(hh_file).astype(np.complex64)*calfac_linear
write_a2_rst(os.path.join(base_out_dir, f's11.{ext}'),S11, driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
del S11
S12 = read_a2(hv_file).astype(np.complex64)*calfac_linear
write_a2_rst(os.path.join(base_out_dir, f's12.{ext}'),S12, driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
del S12
# Matrix conversion if needed
if mat in ['C2', 'T2']:
convert_S(base_out_dir, mat=mat, azlks=azlks, rglks=rglks, cf=1,
fmt=fmt, out_dir=final_out_dir, cog=cog, ovr=ovr, comp=comp)
# Clean up temp directory
if temp_dir:
try:
shutil.rmtree(temp_dir)
except Exception as e:
print(f"Warning: Could not delete temporary directory {temp_dir}: {e}")
#################################################################################################
[docs]
@time_it
def import_alos2_hbq_l11(in_dir,mat='T3', azlks=8,rglks=4,
fmt='tif', cog=False,ovr = [2, 4, 8, 16],comp=False,
out_dir=None,
recip=False,cf_dB=-83):
"""
Extracts single look S2 or Multi-look T3/C3 matrix elements from ALOS-2 Quad-Pol (HBQ) CEOS data
and saves them into respective binary files.
Example:
--------
>>> import_alos2_hbq_l11("path_to_folder", azlks=5, rglks=3)
This will extract the T3 matrix elements from the ALOS-2 Full-pol data
in the specified folder and save them in the selected matrix directory.
Parameters:
-----------
in_dir : str
The path to the folder containing the ALOS-2 Quad-Pol (HBQ) CEOS data folder.
mat : str, optional (default='T3')
Type of matrix to extract. Valid options: 'S2', 'C4, 'C3', 'T4',
'T3', 'C2HX', 'C2VX', 'C2HV','T2HV'
azlks : int, optional (default=8)
The number of azimuth looks for multi-looking.
rglks : int, optional (default=4)
The number of range looks for multi-looking.
fmt : {'tif', 'bin'}, optional (default='tif')
Output format:
- 'tif': GeoTIFF
- 'bin': Raw binary format
cog : bool, optional (default=False)
If True, outputs will be saved as Cloud Optimized GeoTIFFs with internal tiling and overviews.
ovr : list of int, optional (default=[2, 4, 8, 16])
Overview levels for COG generation. Ignored if cog=False.
comp : bool, optional (default=False)
If True, applies LZW compression to GeoTIFF outputs.
out_dir : str or None, optional (default=None)
Directory to save output files. If None, a folder named after the matrix type will be created
in the same location as the input file.
recip : bool, optional (default=False)
If True, scattering matrix reciprocal symmetry is applied, i.e, S_HV = S_VH.
cf_dB : float, optional (default=-83)
The calibration factor in dB used to scale the raw radar data. It is applied to
the HH and HV polarization data before matrix computation.
Returns:
--------
None
The function does not return any value. Instead, it creates a folders named 'S2` or 'C3` or 'T3`
(depending on the chosen matrix) and saves the corresponding binary files.
"""
hh_file = list(glob.glob(os.path.join(in_dir,'IMG-HH-*-HBQR1.1__A')) + \
glob.glob(os.path.join(in_dir, 'IMG-HH-*-HBQR1.1__D')))[0]
hv_file = list(glob.glob(os.path.join(in_dir,'IMG-HV-*-HBQR1.1__A')) + \
glob.glob(os.path.join(in_dir, 'IMG-HV-*-HBQR1.1__D')))[0]
vh_file = list(glob.glob(os.path.join(in_dir,'IMG-VH-*-HBQR1.1__A')) + \
glob.glob(os.path.join(in_dir, 'IMG-VH-*-HBQR1.1__D')))[0]
vv_file = list(glob.glob(os.path.join(in_dir,'IMG-VV-*-HBQR1.1__A')) + \
glob.glob(os.path.join(in_dir, 'IMG-VV-*-HBQR1.1__D')))[0]
valid_full_pol = ['S2', 'C4', 'C3', 'T4', 'T3', 'C2HX', 'C2VX', 'C2HV', 'T2HV']
valid_matrices = valid_full_pol
if mat not in valid_matrices:
raise ValueError(f"Invalid matrix type '{mat}'. \n Supported types are:\n"
f" Full-pol: {sorted(valid_full_pol)}")
temp_dir = None
ext = 'bin' if fmt == 'bin' else 'tif'
driver = 'ENVI' if fmt == 'bin' else None
# Final output directory
if out_dir is None:
final_out_dir = os.path.join(in_dir, mat)
else:
final_out_dir = os.path.join(out_dir, mat)
os.makedirs(final_out_dir, exist_ok=True)
# Intermediate output directory
if mat in ['S2', 'Sxy']:
base_out_dir = final_out_dir
else:
temp_dir = tempfile.mkdtemp(prefix='temp_S2_')
base_out_dir = temp_dir
calfac_linear = np.sqrt(10 ** ((cf_dB - 32) / 10))
S11 = read_a2(hh_file).astype(np.complex64)*calfac_linear
write_a2_rst(os.path.join(base_out_dir, f's11.{ext}'),S11, driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
del S11
S21 = read_a2(hv_file).astype(np.complex64)*calfac_linear
S12 = read_a2(vh_file).astype(np.complex64)*calfac_linear
if recip:
S12 = (S12 + S21)/2
S21 = S12
write_a2_rst(os.path.join(base_out_dir, f's12.{ext}'),S12, driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
write_a2_rst(os.path.join(base_out_dir, f's21.{ext}'),S21, driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
del S12, S21
S22 = read_a2(vv_file).astype(np.complex64)*calfac_linear
write_a2_rst(os.path.join(base_out_dir, f's22.{ext}'),S22, driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
del S22
# Matrix conversion if needed
if mat not in ['S2', 'Sxy']:
convert_S(base_out_dir, mat=mat, azlks=azlks, rglks=rglks, cf=1,
fmt=fmt, out_dir=final_out_dir, cog=cog, ovr=ovr, comp=comp)
# Clean up temp directory
if temp_dir:
try:
shutil.rmtree(temp_dir)
except Exception as e:
print(f"Warning: Could not delete temporary directory {temp_dir}: {e}")
[docs]
@time_it
def import_alos2_wbd_l11(in_dir,mat='C2', azlks=25,rglks=5, swath=1,
fmt='tif', cog=False,ovr = [2, 4, 8, 16],comp=False,
out_dir=None,
cf_dB=-83):
"""
Extracts the C2 matrix elements (C11, C22, and C12) from ALOS-2 Wide Beam Dual-Pol (WBD) CEOS data
and saves them into respective binary files.
Example:
--------
>>> import_alos2_wbd_l11("path_to_folder", azlks=25, rglks=5)
This will extract the C2 matrix elements from the ALOS-2 Wide Beam Dual-Pol data
in the specified folder and save them in the 'C2' directory.
Parameters:
-----------
in_dir : str
The path to the folder containing the ALOS-2 Wide Beam Dual-Pol CEOS data files.
mat : str, optional (default = 'S2' or 'Sxy)
Type of matrix to extract. Valid options: 'Sxy','C2', 'T2'.
azlks : int, optional (default=25)
The number of azimuth looks for multi-looking.
rglks : int, optional (default=5)
The number of range looks for multi-looking.
swath : int, optional (default=1)
The swath number [1,2,3,4,5].
fmt : {'tif', 'bin'}, optional (default='tif')
Output format:
- 'tif': GeoTIFF
- 'bin': Raw binary format
cog : bool, optional (default=False)
If True, outputs will be saved as Cloud Optimized GeoTIFFs with internal tiling and overviews.
ovr : list of int, optional (default=[2, 4, 8, 16])
Overview levels for COG generation. Ignored if cog=False.
comp : bool, optional (default=False)
If True, applies LZW compression to GeoTIFF outputs.
out_dir : str or None, optional (default=None)
Directory to save output files. If None, a folder named after the matrix type will be created
in the same location as the input file.
cf_dB : float, optional (default=-83)
The calibration factor in dB used to scale the raw radar data. It is applied to
the HH and HV polarization data before matrix computation.
Returns:
--------
None
The function does not return any value. Instead, it creates a folder named `C2`
(if not already present) and saves the following binary files:
- `C11.bin`: Contains the C11 matrix elements.
- `C22.bin`: Contains the C22 matrix elements.
- `C12_real.bin`: Contains the real part of the C12 matrix.
- `C12_imag.bin`: Contains the imaginary part of the C12 matrix.
- `config.txt`: A text file containing grid dimensions and polarimetric configuration.
Raises:
-------
FileNotFoundError
If the required ALOS-2 data files (e.g., `IMG-HH` and `IMG-HV`) cannot be found in the specified folder.
ValueError
If the calibration factor is invalid or if the files are not in the expected format.
"""
valid_dual_pol = ['Sxy', 'C2', 'T2']
valid_matrices = valid_dual_pol
if mat not in valid_matrices:
raise ValueError(f"Invalid matrix type '{mat}'. \n Supported types are:\n"
f" Dual-pol: {sorted(valid_dual_pol)}")
temp_dir = None
ext = 'bin' if fmt == 'bin' else 'tif'
driver = 'ENVI' if fmt == 'bin' else None
# Final output directory
if out_dir is None:
final_out_dir = os.path.join(in_dir, mat)
else:
final_out_dir = os.path.join(out_dir, mat)
os.makedirs(final_out_dir, exist_ok=True)
# Intermediate output directory
if mat in ['Sxy']:
base_out_dir = final_out_dir
else:
temp_dir = tempfile.mkdtemp(prefix='temp_S2_')
base_out_dir = temp_dir
print(f"Extracting swath {swath} ...")
hh_file = list(glob.glob(os.path.join(in_dir,f'IMG-HH-*-WBDR1.1__A-F{swath}')) + \
glob.glob(os.path.join(in_dir, f'IMG-HH-*-WBDR1.1__D-F{swath}')))[0]
hv_file = list(glob.glob(os.path.join(in_dir,f'IMG-HV-*-WBDR1.1__A-F{swath}')) + \
glob.glob(os.path.join(in_dir, f'IMG-HV-*-WBDR1.1__D-F{swath}')))[0]
calfac_linear = np.sqrt(10 ** ((cf_dB) / 10))
S11 = read_a2(hh_file).astype(np.complex64)*calfac_linear
write_a2_rst(os.path.join(base_out_dir, f's11.{ext}'),S11, driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
del S11
S12 = read_a2(hv_file).astype(np.complex64)*calfac_linear
write_a2_rst(os.path.join(base_out_dir, f's12.{ext}'),S12, driver=driver, mat=mat, cog=cog, ovr=ovr, comp=comp)
del S12
# Matrix conversion if needed
if mat in ['C2', 'T2']:
convert_S(base_out_dir, mat=mat, azlks=azlks, rglks=rglks, cf=1,
fmt=fmt, out_dir=final_out_dir, cog=cog, ovr=ovr, comp=comp)
# Clean up temp directory
if temp_dir:
try:
shutil.rmtree(temp_dir)
except Exception as e:
print(f"Warning: Could not delete temporary directory {temp_dir}: {e}")