Source code for flametrack.processing.dewarping

import logging
import os
from collections.abc import Generator, Sequence
from dataclasses import dataclass
from datetime import datetime
from typing import Any, cast

import cv2
import h5py
import numpy as np
from numpy.typing import NDArray

from flametrack.analysis.dataset_handler import (
    assert_h5_schema,
    create_h5_file,
    init_h5_for_experiment,
)
from flametrack.analysis.ir_analysis import (
    compute_remap_from_homography,
    get_dewarp_parameters,
)
from flametrack.gui.plotting_utils import rotate_points, sort_corner_points
from flametrack.utils.math_utils import estimate_resolution_from_points


[docs] @dataclass(frozen=True) class DewarpConfig: target_ratio: float target_pixels_width: int target_pixels_height: int plate_width_mm: float | None = None plate_height_mm: float | None = None rotation_index: int = 0 frequency: int = 1 testing: bool = False filename: str | None = None # optional override output path datatype: str = "IR" # data source: "IR", "video", "picture"
[docs] @dataclass(frozen=True) class CornerSets: left: NDArray[np.float32] right: NDArray[np.float32] # für Room Corner; bei LFS nur "left" nutzen
DATATYPE = "IR" # dewarping.py – Hilfsfunktionen einfügen def _ensure_output_path(experiment: Any, filename: str | None) -> str: """Erstellt Standardpfad, falls kein filename gesetzt ist.""" if filename is None: processed_folder = os.path.join(experiment.folder_path, "processed_data") os.makedirs(processed_folder, exist_ok=True) return os.path.join(processed_folder, f"{experiment.exp_name}_results_RCE.h5") return filename def _init_room_corner_schema(h5f: h5py.File) -> None: init_h5_for_experiment(h5f, "Room Corner") assert_h5_schema(h5f, "Room Corner") def _init_lfs_schema(h5f: h5py.File) -> None: init_h5_for_experiment(h5f, "Lateral Flame Spread") assert_h5_schema(h5f, "Lateral Flame Spread") def _write_root_plate_attrs( h5f: h5py.File, w_mm: float | None, h_mm: float | None, room_corner: bool ) -> None: if room_corner: if w_mm is not None: h5f.attrs["plate_width_mm_left"] = float(w_mm) h5f.attrs["plate_width_mm_right"] = float(w_mm) if h_mm is not None: h5f.attrs["plate_height_mm_left"] = float(h_mm) h5f.attrs["plate_height_mm_right"] = float(h_mm) else: if w_mm is not None: h5f.attrs["plate_width_mm"] = float(w_mm) if h_mm is not None: h5f.attrs["plate_height_mm"] = float(h_mm) def _ensure_dataset(group: h5py.Group, shape_hw: tuple[int, int]) -> h5py.Dataset: """Legt 'data' neu an (float32, chunked, grow‑only in t‑Richtung).""" h, w = shape_hw if "data" in group: del group["data"] return group.create_dataset( "data", (h, w, 1), maxshape=(h, w, None), chunks=(h, w, 1), dtype=np.float32, ) def _compute_remap_maps( homography: NDArray[np.float32], w: int, h: int ) -> tuple[NDArray[np.float32], NDArray[np.float32]]: homography_inv = np.linalg.inv(homography) src_x, src_y = compute_remap_from_homography(homography_inv, w, h) return src_x.astype(np.float32), src_y.astype(np.float32) def _store_remap( group: h5py.Group, src_x: NDArray[np.float32], src_y: NDArray[np.float32] ) -> None: for name in ("src_x", "src_y"): if name in group: del group[name] group.create_dataset("src_x", data=src_x) group.create_dataset("src_y", data=src_y)
[docs] def dewarp_room_corner_remap( experiment: Any, points: NDArray[np.float32] | Sequence[tuple[float, float]], config: DewarpConfig, ) -> Generator[int, None, None]: """Dewarp für Room Corner anhand vorberechneter Remap‑Grids.""" logging.info("[DEWARP] Room Corner (remap) – start") pts = np.asarray(points, dtype=np.float32) if pts.shape[0] != 6: raise ValueError("Expected exactly 6 points for room corner dewarping.") if config.target_pixels_width <= 10 or config.target_pixels_height <= 10: raise ValueError("Target image size too small for meaningful dewarping.") # linke/rechte 4er‑Ecken aufteilen + optional rotieren frame_shape = experiment.get_data(config.datatype).get_frame(0, 0).shape pts_left = pts[[0, 1, 4, 5]] pts_right = pts[[1, 2, 3, 4]] sel_left = rotate_points(pts_left, frame_shape, config.rotation_index) sel_right = rotate_points(pts_right, frame_shape, config.rotation_index) params_left = get_dewarp_parameters( sel_left, config.target_pixels_width, config.target_pixels_height, config.target_ratio, ) params_right = get_dewarp_parameters( sel_right, config.target_pixels_width, config.target_pixels_height, config.target_ratio, ) out_path = _ensure_output_path(experiment, config.filename) # Nur blockieren, wenn die Datei bereits sinnvolle Dewarping-Daten enthält. # Eine leere oder unvollständige Datei vom letzten Lauf wird stillschweigend überschrieben. try: with h5py.File(out_path, "r") as h5f: if "dewarped_data_left" in h5f or "dewarped_data_right" in h5f: raise FileExistsError(out_path) except OSError: pass if experiment.h5_file is not None: experiment.h5_file.close() with create_h5_file(filename=out_path) as h5f: _write_root_plate_attrs( h5f, config.plate_width_mm, config.plate_height_mm, room_corner=True ) _init_room_corner_schema(h5f) for side, params, sel in ( ("left", params_left, sel_left), ("right", params_right, sel_right), ): grp = h5f.require_group(f"dewarped_data_{side}") grp.attrs.update( { "transformation_matrix": params["transformation_matrix"], "target_pixels_width": params["target_pixels_width"], "target_pixels_height": params["target_pixels_height"], "target_ratio": params["target_ratio"], "selected_points": sel, "frame_range": [ 0, experiment.get_data(config.datatype).get_frame_count(), ], "points_selection_date": datetime.now().strftime( "%Y-%m-%d %H:%M:%S" ), "error_unit": "pixels", "plate_width_mm": config.plate_width_mm, "plate_height_mm": config.plate_height_mm, } ) # „Best effort“: Auflösung schätzen try: p0, p1, p3 = sel[0], sel[1], sel[3] res = estimate_resolution_from_points( p0, p1, p3, config.plate_width_mm, config.plate_height_mm ) grp.attrs.update(res) except (ValueError, TypeError) as err: logging.warning( "[DEWARP] Resolution estimation failed for %s: %s", side, err, exc_info=False, ) h_out = int(params["target_pixels_height"]) w_out = int(params["target_pixels_width"]) data_dset: h5py.Dataset = _ensure_dataset(grp, (h_out, w_out)) # Remap vorbereiten src_x, src_y = _compute_remap_maps( np.asarray(params["transformation_matrix"], dtype=np.float32), w_out, h_out, ) _store_remap(grp, src_x, src_y) # Frames schleifen data = experiment.get_data(config.datatype) frames = data.data_numbers start, end = ( (len(frames) // 2 - 1, len(frames) // 2 + 1) if config.testing else (0, len(frames)) ) for i, idx in enumerate(frames[start : end : config.frequency]): raw = data.get_raw_frame(idx) for side in ("left", "right"): grp = h5f[f"dewarped_data_{side}"] data_dset = cast(h5py.Dataset, grp["data"]) if not isinstance(data_dset, h5py.Dataset): raise TypeError("Expected HDF5 dataset at 'data'") src_x_dset: h5py.Dataset = grp["src_x"] src_y_dset: h5py.Dataset = grp["src_y"] if not isinstance(src_x_dset, h5py.Dataset) or not isinstance( src_y_dset, h5py.Dataset ): raise TypeError("Expected HDF5 datasets 'src_x' and 'src_y'") assert isinstance(src_x_dset, h5py.Dataset) assert isinstance(src_y_dset, h5py.Dataset) src_x = np.asarray(src_x_dset[()], dtype=np.float32) src_y = np.asarray(src_y_dset[()], dtype=np.float32) # Clip x and y maps separately before remapping. # NOTE: do NOT use cv2.convertMaps here — it produces a 2-channel # CV_16SC2 map, and np.clip on that array would clip BOTH the x and # y channels to the same bound, incorrectly clamping y-coordinates # when the raw frame is non-square (e.g. after camera rotation). h_in, w_in = raw.shape[:2] remapped = cv2.remap( raw, np.clip(src_x, 0, w_in - 1), np.clip(src_y, 0, h_in - 1), interpolation=cv2.INTER_LINEAR, ) data_dset.resize((h_out, w_out, i + 1)) data_dset[:, :, i] = remapped.astype(np.float32, copy=False) yield i experiment.h5_file = h5py.File(out_path, "r+") experiment.h5_path = out_path
# pylint: disable=too-many-locals
[docs] def dewarp_lateral_flame_spread( experiment: Any, points: Sequence[tuple[float, float]], config: DewarpConfig, ) -> Generator[int, None, None]: """Dewarp für Lateral Flame Spread (warpPerspective).""" logging.info("[DEWARP] LFS – start") if config.target_pixels_width <= 10: raise ValueError("target_pixels_width must be greater than 10") if config.target_pixels_height <= 10: raise ValueError("target_pixels_height must be greater than 10") if len(points) != 4: raise ValueError("Exactly 4 corner points are required for LFS dewarping.") # "anticlockwise" → argsort(angles) ascending → [TL, TR, BR, BL] in image # coordinates (y-down), which is what get_dewarp_parameters expects. # The name is confusing because math angles increase CCW, but in image # coords (y-down) ascending angles traverse CW: TL→TR→BR→BL. sorted_pts = sort_corner_points( points, experiment_type="Lateral Flame Spread", direction="anticlockwise" ) params = get_dewarp_parameters( sorted_pts, target_pixels_width=config.target_pixels_width, target_pixels_height=config.target_pixels_height, target_ratio=config.target_ratio, ) out_path = _ensure_output_path(experiment, config.filename) # Datei nur blockieren, wenn schon sinnvolle Inhalte drin sind try: with h5py.File(out_path, "r") as h5f: if "dewarped_data" in h5f or "dewarped_data_left" in h5f: raise FileExistsError(out_path) except OSError: pass if experiment.h5_file: experiment.h5_file.close() with create_h5_file(filename=out_path) as h5f: _init_lfs_schema(h5f) _write_root_plate_attrs( h5f, config.plate_width_mm, config.plate_height_mm, room_corner=False ) grp = h5f.require_group("dewarped_data") grp.attrs.update( { "transformation_matrix": params["transformation_matrix"], "target_pixels_width": int(params["target_pixels_width"]), "target_pixels_height": int(params["target_pixels_height"]), "target_ratio": float(params["target_ratio"]), "selected_points": np.asarray(sorted_pts, dtype=np.float32), "frame_range": [ 0, experiment.get_data(config.datatype).get_frame_count(), ], "points_selection_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "plate_width_mm": config.plate_width_mm, "plate_height_mm": config.plate_height_mm, } ) # „Best effort“: Auflösung schätzen try: p0, p1, p3 = sorted_pts[0], sorted_pts[1], sorted_pts[3] res = estimate_resolution_from_points( p0, p1, p3, config.plate_width_mm, config.plate_height_mm ) grp.attrs.update(res) except (ValueError, TypeError) as err: logging.warning( "[DEWARP] Resolution estimation failed: %s", err, exc_info=False ) h_out = int(params["target_pixels_height"]) w_out = int(params["target_pixels_width"]) dset = _ensure_dataset(grp, (h_out, w_out)) data = experiment.get_data(config.datatype) frames = data.data_numbers start, end = ( (len(frames) // 2 - 1, len(frames) // 2 + 1) if config.testing else (0, len(frames)) ) homography_matrix = np.asarray( params["transformation_matrix"], dtype=np.float32 ) for i, idx in enumerate(frames[start : end : config.frequency]): frame = data.get_frame(idx, config.rotation_index) dewarped = cv2.warpPerspective( frame, homography_matrix, (w_out, h_out), flags=cv2.INTER_LINEAR ) dset.resize((h_out, w_out, i + 1)) dset[:, :, i] = dewarped.astype(np.float32, copy=False) yield i experiment.h5_file = h5py.File(out_path, "r+") experiment.h5_path = out_path
[docs] def rotate_image_and_points( image: NDArray[np.float32] | NDArray[np.uint8], points: NDArray[np.float32], angle_degrees: float, ) -> tuple[NDArray[np.float32], NDArray[np.float32]]: """ Rotate both image and corresponding points. Args: image: Input image. points: Nx2 array of (x, y) points. angle_degrees: Rotation angle in degrees. Returns: A tuple (rotated_image, rotated_points) """ h_img, w_img = image.shape[:2] center = (w_img // 2, h_img // 2) rotation_matrix = cv2.getRotationMatrix2D(center, angle_degrees, 1.0) rotated_img = cv2.warpAffine(image, rotation_matrix, (w_img, h_img)) points_h = np.hstack([points, np.ones((points.shape[0], 1), dtype=points.dtype)]) rotated_pts = (rotation_matrix @ points_h.T).T.astype(np.float32) return rotated_img.astype(np.float32, copy=False), rotated_pts