Source code for polsartools.preprocess.filters

import os
import numpy as np
from polsartools.utils.proc_utils import process_chunks_parallel
from polsartools.utils.utils import conv2d,time_it
from polsartools.preprocess.pre_utils import get_filter_io_paths

from polsartools.preprocess.rflee_filter import process_chunk_refined_lee
from polsartools.rflee import process_chunk_rfleecpp
[docs] @time_it def filter_boxcar(in_dir, win=3, fmt="tif", sub_dir=True, cog=False, ovr = [2, 4, 8, 16], comp=False, max_workers=None,block_size=(512, 512), progress_callback=None, # for QGIS plugin ): """ Apply a Boxcar speckle filter to polarimetric SAR data to reduce speckle noise. The Boxcar filter performs a uniform mean filter over a square window, effectively smoothing the polarimetric matrix to enhance interpretability. It is a fast, straightforward speckle reduction technique suitable for preprocessing and visualization. Examples -------- >>> # Basic usage >>> filter_boxcar("/path/to/polSAR_data") >>> # With custom window size and output as GeoTIFF >>> filter_boxcar("/path/to/polSAR_data", win=5, cog=True) Parameters ---------- in_dir : str Input folder containing C4, T4, C3, T3, C2, or T2 matrix files (.bin or .tif). win : int, default=3 Size of the spatial smoothing window (e.g., 3x3). fmt : {'tif', 'bin'}, default='tif' Output file format type. cog : bool, default=False If True, generates a Cloud Optimized GeoTIFF. ovr : list[int], default=[2, 4, 8, 16] Overview levels for COG pyramids. sub_dir : bool, default=True If True, creates a subdirectory for the output files based on filter type and window size in the input folder. Else saves to a output folder at same level as input folder. comp : bool, default=False If True, applies LZW compression to the output GeoTIFF files. max_workers : int | None, default=None Maximum number of parallel workers. block_size : tuple[int, int], default=(512, 512) Block size for chunked processing. Returns ------- None Writes filtered output matrix files to disk. """ write_flag = True # Always write output files input_filepaths, output_filepaths = get_filter_io_paths(in_dir, [win, win], fmt=fmt, filter_type="boxcar", sub_dir=sub_dir) # Process chunks in parallel num_outputs = len(output_filepaths) process_chunks_parallel(input_filepaths, list(output_filepaths), window_size=win, write_flag=write_flag, processing_func=process_chunk_boxcar,block_size=block_size, max_workers=max_workers, num_outputs=num_outputs, cog=cog, ovr=ovr,comp=comp, progress_callback=progress_callback )
def process_chunk_boxcar(chunks, window_size, *args): filtered_chunks = [] for i in range(len(chunks)): img = np.array(chunks[i]) kernel = np.ones((window_size, window_size), np.float32) / (window_size * window_size) filtered_chunks.append(conv2d(img, kernel)) filtered_chunks = np.array(filtered_chunks).astype(np.float32) return filtered_chunks
[docs] @time_it def filter_refined_lee(in_dir, win=3, fmt="tif",sub_dir=True, cog=False, ovr = [2, 4, 8, 16], comp=False, max_workers=None,block_size=(512, 512), progress_callback=None, # for QGIS plugin ): """ Apply Refined Lee speckle filter to polarimetric SAR data. The Refined Lee filter is an adaptive speckle filter that preserves edges and structural details while reducing noise. Unlike Boxcar, RLee dynamically adjusts based on local statistics, making it more suitable for classification, segmentation, and biophysical retrieval tasks. Examples -------- >>> # Basic usage with default parameters >>> filter_refined_lee("/path/to/polSAR_data") >>> # Custom usage with large window and tiled COG output >>> filter_refined_lee("/path/to/polSAR_data", win=7, cog=True) Parameters ---------- in_dir : str Path to the folder with C4, T4, C3, T3, C2, or T2 matrix files. win : int, default=3 Size of the adaptive filtering window. fmt : {'tif', 'bin'}, default='tif' Desired output file format. sub_dir : bool, default=True If True, creates a subdirectory for the output files based on filter type and window size in the input folder. Else saves to a output folder at same level as input folder. cog : bool, default=False Create Cloud Optimized GeoTIFF with overviews and internal tiling. ovr : list[int], default=[2, 4, 8, 16] Overview pyramid levels for zoomable image access. comp : bool, default=False If True, applies LZW compression to the output GeoTIFF files. max_workers : int | None, default=None Number of threads for parallel processing. block_size : tuple[int, int], default=(512, 512) Block size used during chunked filtering. Returns ------- None Output files are written to disk with the applied RLee filter. """ write_flag=True input_filepaths, output_filepaths = get_filter_io_paths(in_dir, [win, win], fmt=fmt, filter_type="rlee", sub_dir=sub_dir) num_outputs = len(output_filepaths) ### Python implementation # process_chunks_parallel(input_filepaths, output_filepaths, win=win, write_flag=write_flag, # processing_func=process_chunk_refined_lee, block_size=(512, 512), max_workers=max_workers, # num_outputs=num_outputs) #### Uncomment below to use C++ implementation process_chunks_parallel(input_filepaths, list(output_filepaths), window_size=win, write_flag=write_flag, processing_func=process_chunk_rfl,block_size=block_size, max_workers=max_workers, num_outputs=num_outputs, cog=cog, ovr=ovr, progress_callback=progress_callback )
def process_chunk_rfl(chunks, window_size, *args): # print('before pad',np.shape(chunks[0])) for i in range(len(chunks)): pad_top_left = window_size // 2 pad_bottom_right = window_size // 2 +1 chunks[i] = np.pad(chunks[i], ((pad_top_left, pad_bottom_right), (pad_top_left, pad_bottom_right)), mode='constant', constant_values=0) # print("after pad",np.shape(chunks[0])) chunk_arrays = [np.array(ch) for ch in chunks] vi_c_raw = process_chunk_rfleecpp(chunk_arrays, window_size) proc_chunks=[] for chunk in vi_c_raw: filt_data = np.array(chunk) # filt_data[filt_data == 0] = np.nan proc_chunks.append(filt_data) # print('mean %0.3f'%np.nanmean(np.array(chunk)),'std %0.3f'%np.nanstd(np.array(chunk))) del vi_c_raw,filt_data,chunk # print("proc_chunks pad",np.shape(proc_chunks[0])) for i in range(len(proc_chunks)): # Calculate the padding size pad_top_left = window_size // 2 pad_bottom_right = window_size // 2 +1 proc_chunks[i] = proc_chunks[i][pad_top_left:-pad_bottom_right, pad_top_left:-pad_bottom_right] def shift_array(arr,shift): # Get the number of rows and columns in the array rows, cols = arr.shape # Step 1: Move the rightmost 3 columns to the left right_columns = arr[:, -shift:] # Last 3 columns remaining_columns = arr[:, :-shift] # All but last 3 columns # Step 2: Move the bottom 3 rows to the top bottom_rows = arr[-shift:, :] # Last 3 rows remaining_rows = arr[:-shift, :] # All but last 3 rows # Combine the shifted rows and columns shifted_array = np.vstack((bottom_rows, remaining_rows)) # Stack bottom rows to the top shifted_array = np.hstack((right_columns, shifted_array[:, :-shift])) # Stack right columns to the left return shifted_array for i in range(len(proc_chunks)): # Remove 'window_size' rows from the top and 'window_size' columns from the left proc_chunks[i] = shift_array(proc_chunks[i],window_size//2) # print("proc_chunks unpad",np.shape(proc_chunks[0])) num_chunks = len(proc_chunks) // 2 out_chunks = [] for i in range(num_chunks): real_part = proc_chunks[2 * i] # Get the real part from the even indices # real_part[real_part == 0] = np.nan imag_part = proc_chunks[2 * i + 1] # Get the imaginary part from the odd indices # imag_part[imag_part == 0] = np.nan complex_array = real_part + 1j * imag_part # Create a complex number out_chunks.append(complex_array) # print(np.nanmean(real_part),' ' ,np.nanmean(imag_part)) filtered_chunks = [] if len(chunks)==9: # print("out_chunks shape:", np.shape(out_chunks)) filtered_chunks.append(np.real(out_chunks[0])) filtered_chunks.append(np.real(out_chunks[1])) filtered_chunks.append(np.imag(out_chunks[1])) filtered_chunks.append(np.real(out_chunks[2])) filtered_chunks.append(np.imag(out_chunks[2])) filtered_chunks.append(np.real(out_chunks[4])) filtered_chunks.append(np.real(out_chunks[5])) filtered_chunks.append(np.imag(out_chunks[5])) filtered_chunks.append(np.real(out_chunks[8])) if len(chunks)==4: filtered_chunks.append(np.real(out_chunks[0])) filtered_chunks.append(np.real(out_chunks[1])) filtered_chunks.append(np.imag(out_chunks[1])) filtered_chunks.append(np.real(out_chunks[3])) filtered_chunks = np.array(filtered_chunks).astype(np.float32) return filtered_chunks