Source code for polsartools.preprocess.prepare_dem


#%%
import os
import requests
import gzip
import shutil
import zipfile
import numpy as np
from osgeo import gdal
from tqdm import tqdm
gdal.UseExceptions()

from polsartools.utils.utils import time_it


# === DEM Metadata Dictionary ===
DEM_SOURCES = {
    "SRTM1": {
        "url_template": "https://s3.amazonaws.com/elevation-tiles-prod/skadi/{prefix}/{name}.hgt.gz",
        "ext": ".hgt.gz",
        "extract": "gzip"
    },
    "COP30": {
        "api_template": "https://portal.opentopography.org/API/globaldem?demtype=COP30&south={south}&north={north}&west={west}&east={east}&outputFormat=GTiff&API_Key={api_key}",
        "ext": ".tif",
        "extract": "direct"
    },
    "COP90": {
        "api_template": "https://portal.opentopography.org/API/globaldem?demtype=COP90&south={south}&north={north}&west={west}&east={east}&outputFormat=GTiff&API_Key={api_key}",
        "ext": ".tif",
        "extract": "direct"
    },
    "SRTM3": {
        "url_template": "https://srtm.csi.cgiar.org/wp-content/uploads/files/srtm_5x5/TIFF/{name}.zip",
        "ext": ".zip",
        "extract": "zip"
    }
}

def tile_name(lat, lon):
    ns = 'N' if lat >= 0 else 'S'
    ew = 'E' if lon >= 0 else 'W'
    return f"{ns}{abs(int(lat)):02d}{ew}{abs(int(lon)):03d}"

def cgiar_tile_name(lat, lon):
    
    lat = lat+5 #right bottom indexing 
    # print(lat,lon)
    
    lat_base = (lat // 5) * 5
    lon_base = (lon // 5) * 5
    lat_index = int((60 - lat_base) // 5) + 1  # Top-down indexing
    lon_index = int((lon_base + 180) // 5) + 1  # Left-right indexing
    return f"srtm_{lon_index:02d}_{lat_index:02d}"


def download_tile(name, hgt_dir, dem_type="SRTM1"):
    meta = DEM_SOURCES.get(dem_type)
    if not meta:
        raise ValueError(f"Unsupported DEM type: {dem_type}")

    hgt_path = os.path.join(hgt_dir, f"{name}.hgt")
    tif_path = os.path.join(hgt_dir, f"{name}.tif")

    # === Copernicus DEMs via OpenTopography API ===
    if dem_type.startswith("COP"):
        api_key = os.getenv("OPENTOPO_API_KEY")
        if not api_key:
            raise EnvironmentError("Missing OpenTopography API key. Set OPENTOPO_API_KEY.")

        lat = int(name[1:3]) * (1 if name[0] == 'N' else -1)
        lon = int(name[4:7]) * (1 if name[3] == 'E' else -1)
        south, north = lat, lat + 1
        west, east = lon, lon + 1

        url = meta["api_template"].format(south=south, north=north, west=west, east=east, api_key=api_key)

        try:
            r = requests.get(url, timeout=60)
            if r.status_code == 200:
                with open(tif_path, "wb") as f:
                    f.write(r.content)
                # print(f"Downloaded: {name} ({dem_type})")
                return tif_path
            else:
                print(f"Copernicus tile not found: {name}")
                return None
        except Exception as e:
            print(f"Error downloading Copernicus tile {name}: {e}")
            return None

    # === CGIAR SRTM90 ===
    if dem_type == "SRTM3":
        url = meta["url_template"].format(name=name)
        archive_path = os.path.join(hgt_dir, f"{name}.zip")

        # Skip download if any matching .tif already exists
        for file in os.listdir(hgt_dir):
            if file.startswith(name) and file.endswith(".tif"):
                # print(f"Already downloaded: {file}")
                return os.path.join(hgt_dir, file)

        try:
            r = requests.get(url, timeout=60)
            if r.status_code == 200:
                with open(archive_path, "wb") as f:
                    f.write(r.content)
                with zipfile.ZipFile(archive_path, 'r') as zip_ref:
                    zip_ref.extractall(hgt_dir)
                os.remove(archive_path)
                # print(f"Downloaded: {name} (SRTM3)")
                # Return first matching .tif file
                for file in os.listdir(hgt_dir):
                    if file.startswith(name) and file.endswith(".tif"):
                        return os.path.join(hgt_dir, file)
                print(f"No .tif file found after extracting {name}")
                return None
            else:
                print(f"CGIAR tile not found: {name}")
                return None
        except Exception as e:
            print(f"Error downloading CGIAR tile {name}: {e}")
            return None

    # === SRTM1 ===
    prefix = name[:3]
    url = meta["url_template"].format(prefix=prefix, name=name)
    archive_path = os.path.join(hgt_dir, f"{name}{meta['ext']}")

    if os.path.exists(hgt_path):
        # print(f"Already downloaded: {name}")
        return hgt_path

    try:
        r = requests.get(url, timeout=30)
        if r.status_code == 200:
            with open(archive_path, "wb") as f:
                f.write(r.content)
            if meta["extract"] == "gzip":
                with gzip.open(archive_path, 'rb') as f_in, open(hgt_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
            os.remove(archive_path)
            # print(f"Downloaded: {name} ({dem_type})")
            return hgt_path
        else:
            print(f"Tile not found: {name}")
            return None
    except Exception as e:
        print(f"Error downloading {name}: {e}")
        return None

def create_empty_hgt(path):
    empty_data = np.full((3601, 3601), -32768, dtype=np.int16)
    with open(path, 'wb') as f:
        f.write(empty_data.tobytes())
    print(f"Created empty HGT tile: {os.path.basename(path)}")
[docs] @time_it def prepare_dem(bbox,out_path="dem.tif", dem_type="SRTM1", hgt_dir="hgt_tiles", apply_correction=True, clip=True, clean_tiles=True ): """ Prepares a Digital Elevation Model (DEM) from downloaded elevation tiles. This function downloads elevation tiles based on a bounding box, mosaics them into a single raster, optionally applies ellipsoid height correction, clips to the bounding box, and saves the result as a GeoTIFF. Temporary tile files can be cleaned up after processing. Examples -------- >>> bbox = (-70.2, 43.5, -66.2, 45.8) # xmin, ymin, xmax, ymax >>> prepare_dem(bbox, out_path="SRTM1_mosaic_clip.tif") >>> prepare_dem(bbox, out_path="SRTM3_mosaic_clip.tif", dem_type="SRTM3") Parameters ---------- bbox : tuple of float Bounding box in the format (xmin, ymin, xmax, ymax), in geographic coordinates (longitude, latitude). out_path : str, optional Path to save the output DEM GeoTIFF file. Defaults to "dem.tif". dem_type : str, optional Type of DEM source. Options include "SRTM1", "SRTM3", "COP30", "COP90". Defaults to "SRTM1". hgt_dir : str, optional Directory to store downloaded tiles. Defaults to "hgt_tiles". apply_correction : bool, optional Whether to apply WGS84ellipsoid height correction to SRTM tiles. Defaults to True. clip : bool, optional Whether to clip the output DEM to the bounding box. Defaults to False. clean_tiles : bool, optional Whether to delete the tile directory after processing. Defaults to True. Returns ------- None The function saves the DEM to the specified `out_path`. Notes ----- - Uses GDAL for mosaicking and warping. - Supports ellipsoid correction for SRTM1 and SRTM3. - Automatically creates empty tiles for missing SRTM1 data. """ tile_limit=9 os.makedirs(hgt_dir, exist_ok=True) xmin, ymin, xmax, ymax = bbox if dem_type == "SRTM3": lon_range = range(int(np.floor(xmin)), int(np.ceil(xmax))) lat_range = range(int(np.floor(ymin)), int(np.ceil(ymax))) tiles = set() for lon in lon_range: for lat in lat_range: tiles.add(cgiar_tile_name(lat, lon)) else: lon_range = range(int(np.floor(xmin)), int(np.ceil(xmax))) lat_range = range(int(np.floor(ymin)), int(np.ceil(ymax))) tiles = [tile_name(lat, lon) for lon in lon_range for lat in lat_range] # print(f"Downloading {len(tiles)} tiles...") # def chunk(lst, size): # for i in range(0, len(lst), size): # yield lst[i:i + size] # hgt_paths = [] # for batch in chunk(list(tiles), tile_limit): # for name in batch: # ext = ".tif" if dem_type in ["COP30", "COP90", "SRTM3"] else ".hgt" # hgt_path = os.path.join(hgt_dir, f"{name}{ext}") # if not os.path.exists(hgt_path): # downloaded = download_tile(name, hgt_dir=hgt_dir, dem_type=dem_type) # if not downloaded and dem_type == "SRTM1": # create_empty_hgt(hgt_path) # hgt_paths.append(hgt_path) def chunk(lst, size): for i in range(0, len(lst), size): yield lst[i:i + size] hgt_paths = [] # Create a single progress bar over all tiles with tqdm(total=len(tiles), desc="Downloading tiles", unit="tile") as pbar: for batch in chunk(list(tiles), tile_limit): for name in batch: ext = ".tif" if dem_type in ["COP30", "COP90", "SRTM3"] else ".hgt" hgt_path = os.path.join(hgt_dir, f"{name}{ext}") if os.path.exists(hgt_path): hgt_paths.append(hgt_path) pbar.update(1) continue downloaded = download_tile(name, hgt_dir=hgt_dir, dem_type=dem_type) if downloaded or (not downloaded and dem_type == "SRTM1"): if not downloaded: create_empty_hgt(hgt_path) hgt_paths.append(hgt_path) pbar.update(1) print(f"\nProcessing {len(hgt_paths)} tiles...") # === Mosaic and optional correction === vrt_path = os.path.splitext(out_path)[0] + ".vrt" gdal.BuildVRT(vrt_path, hgt_paths) warp_args = { "format": "GTiff" } if apply_correction and dem_type in ["SRTM1", "SRTM3"]: warp_args.update({ "srcSRS": "EPSG:4326+5773", "dstSRS": "EPSG:4326+4979", "warpOptions": ["SOURCE_EXTRA=1000"] }) if clip: warp_args["outputBounds"] = (xmin, ymin, xmax, ymax) gdal.Warp(out_path, vrt_path, **warp_args) label = "with ellipsoid correction" if apply_correction else "without ellipsoid correction" clip_label = "clipped to bbox" if clip else "full extent" print(f"\nDEM saved to: {out_path} ({label}, {clip_label})") # === Clean up === if clean_tiles: if os.path.exists(hgt_dir): shutil.rmtree(hgt_dir)
# bbox = (-70.2, 43.5, -66.2, 45.8) # xmin, ymin, xmax, ymax # prepare_dem(bbox, out_path="srtm1_mosaic_clip.tif", dem_type="SRTM1", clip=True) # prepare_dem(bbox, out_path="SRTM3_mosaic_clip.tif", dem_type="SRTM3", clip=True) # # prepare_dem(bbox, out_path="cop30_mosaic.tif", dem_type="COP30") # # prepare_dem(bbox, out_path="cop90_mosaic.tif", dem_type="COP90")