Source code for flametrack.gui.main_window

import glob
import logging
import os

import h5py
import numpy as np
import progressbar
from PySide6.QtCore import QThread, Signal
from PySide6.QtWidgets import (
    QApplication,
    QFileDialog,
    QMainWindow,
    QMessageBox,
    QSlider,
)

from flametrack.analysis.data_types import RceExperiment
from flametrack.analysis.edge_worker import EdgeDetectionWorker
from flametrack.analysis.flamespread import (
    EDGE_METHOD_CATALOG,
    EdgeMethodSpec,
    calculate_edge_data,
    calculate_edge_results_for_exp_name,
)
from flametrack.processing.dewarping import (
    DewarpConfig,
    dewarp_lateral_flame_spread,
    dewarp_room_corner_remap,
)
from flametrack.utils.math_utils import compute_target_ratio

from .ui_form import Ui_MainWindow

EXPERIMENT_CONFIG = {
    "Room Corner": {"required_points": 6},
    "Lateral Flame Spread": {"required_points": 4},
}


[docs] class MainWindow(QMainWindow): # Relay signal so handle_edge_result always runs on the main thread, # regardless of which worker thread emits it. _edge_result_ready = Signal(object, str) def __init__(self) -> None: super().__init__() self.ui = Ui_MainWindow() self.ui.setupUi(self) self.experiment: RceExperiment | None = None self.datatype: str = "IR" self.experiment_type: str = "Lateral Flame Spread" self.required_points: int = 4 self.rotation_index: int = 0 self.target_ratio: float | None = None self.target_pixels_width: int | None = None self.target_pixels_height: int | None = None self.edge_workers_done: int = 0 self._pending_edge_results: dict = {} # Active edge method key (short_id from EDGE_METHOD_CATALOG). # Currently the "correct" key for the flame direction is chosen automatically # in _edge_spec_for(); this attribute exists so a future UI combo box can # override it without changing any other code. self.edge_method_key: str | None = None # None → auto-select per direction # Initialize values self.plate_width_m: float | None = None self.plate_height_m: float | None = None self.console_bar: progressbar.ProgressBar | None = None self.console_bar_started: bool = False self.console_bar_left: progressbar.ProgressBar | None = None self.console_bar_right: progressbar.ProgressBar | None = None self.console_bar_right_started: bool = False self.thread: QThread | None = None self.worker: EdgeDetectionWorker | None = None self.thread_left: QThread | None = None self.worker_left: EdgeDetectionWorker | None = None self.thread_right: QThread | None = None self.worker_right: EdgeDetectionWorker | None = None self._setup_ui() self._setup_connections() self._initialize_defaults() def _setup_ui(self) -> None: """Initial UI setup: titles, visibility, and default states.""" self.setWindowTitle("Flamespread Analysis Tool") self.ui.plot_dewarping.parent = self self.ui.progress_edge_finding_plate2.hide() self.ui.checkBox_mulithread.hide() self.ui.slider_analysis_y.setMinimum(0) self.ui.slider_analysis_y.setMaximum(100) self.ui.slider_analysis_y.setTickPosition(QSlider.TicksBothSides) self.ui.slider_analysis_y.setTickInterval(10) def _setup_connections(self) -> None: """Connect UI signals to their corresponding slots.""" self.ui.button_open_folder.clicked.connect(self.load_file) self.ui.combo_rotation.currentIndexChanged.connect(self.set_rotation_index) self.ui.doubleSpinBox_plate_width.valueChanged.connect(self.update_target_ratio) self.ui.doubleSpinBox_plate_height.valueChanged.connect( self.update_target_ratio ) self.ui.comboBox_experiment_type.currentTextChanged.connect( self.set_experiment_type ) self.ui.comboBox_experiment_type.currentTextChanged.connect( self.update_flame_direction_visibility ) self.ui.comboBox_flame_direction.currentTextChanged.connect( self.update_edge_preview ) self.ui.slider_frame.sliderReleased.connect( lambda: self.update_plot( framenr=self.ui.slider_frame.value(), rotation_factor=self.rotation_index, ) ) self.ui.slider_scale_min.sliderReleased.connect( lambda: self.update_plot( cmin=self.ui.slider_scale_min.value(), rotation_factor=self.rotation_index, ) ) self.ui.slider_scale_max.sliderReleased.connect( lambda: self.update_plot( cmax=self.ui.slider_scale_max.value(), rotation_factor=self.rotation_index, ) ) self.ui.button_dewarp.clicked.connect(self.on_dewarp_clicked) self.ui.button_find_edge.clicked.connect(self.start_edge_detection) self.ui.slider_analysis_y.valueChanged.connect(self.update_analysis_plot) self._edge_result_ready.connect(self.handle_edge_result) def _initialize_defaults(self) -> None: """Set initial values for UI elements and internal parameters.""" self.ui.combo_rotation.setCurrentIndex(0) self.update_target_ratio() initial_type = self.ui.comboBox_experiment_type.currentText() self.set_experiment_type(initial_type) self.update_flame_direction_visibility()
[docs] def set_rotation_index(self, index: int) -> None: self.rotation_index = index self.update_plot(framenr=self.ui.slider_frame.value(), rotation_factor=index)
@staticmethod def _read_plate_mm_from_h5_root_first( h5, ) -> tuple[float | None, float | None]: """ Liefert (width_mm, height_mm). Reihenfolge: 1) Room-Corner Root: plate_width_mm_left/right, plate_height_mm_left/right 2) Gruppen: dewarped_data_left/right 3) LFS Root: plate_width_mm, plate_height_mm 4) Gruppe: dewarped_data Nimmt für die GUI (ein Paar Spinboxes) bevorzugt 'left', fällt sonst auf 'right' zurück. """ def _f(x) -> float | None: try: return float(x) except (TypeError, ValueError): return None # 1) Room-Corner Root (left preferred, fallback right) w_l = h5.attrs.get("plate_width_mm_left", None) h_l = h5.attrs.get("plate_height_mm_left", None) w_r = h5.attrs.get("plate_width_mm_right", None) h_r = h5.attrs.get("plate_height_mm_right", None) if w_l is not None or h_l is not None or w_r is not None or h_r is not None: w = w_l if w_l is not None else w_r h = h_l if h_l is not None else h_r return _f(w), _f(h) # 2) Gruppen (Room-Corner) if "dewarped_data_left" in h5 or "dewarped_data_right" in h5: w = h = None if "dewarped_data_left" in h5: g = h5["dewarped_data_left"].attrs w = g.get("plate_width_mm", None) h = g.get("plate_height_mm", None) if (w is None or h is None) and "dewarped_data_right" in h5: g = h5["dewarped_data_right"].attrs w = w if w is not None else g.get("plate_width_mm", None) h = h if h is not None else g.get("plate_height_mm", None) return _f(w), _f(h) # 3) LFS Root (einheitliche Platte) w = h5.attrs.get("plate_width_mm", None) h = h5.attrs.get("plate_height_mm", None) if w is not None or h is not None: return _f(w), _f(h) # 4) Gruppe dewarped_data (LFS) if "dewarped_data" in h5: g = h5["dewarped_data"].attrs w = g.get("plate_width_mm", None) h = g.get("plate_height_mm", None) return _f(w), _f(h) return None, None # ---- Helper inside MainWindow ------------------------------------------------ def _detect_experiment_type_from_h5(self, h5) -> None: """Setzt experiment_type und ComboBox anhand der HDF5-Gruppen.""" if "dewarped_data_left" in h5 and "dewarped_data_right" in h5: self.experiment_type = "Room Corner" self.ui.comboBox_experiment_type.setCurrentText("Room Corner") elif "dewarped_data" in h5: self.experiment_type = "Lateral Flame Spread" self.ui.comboBox_experiment_type.setCurrentText("Lateral Flame Spread") else: logging.debug("No dewarped data found in HDF5 - type unchanged.") def _read_plate_mm(self, h5) -> tuple[float | None, float | None]: """Liest Plattenmaße (mm) – Root bevorzugt, dann Gruppen (Room Corner/LFS).""" def _f(x) -> float | None: try: return float(x) except (TypeError, ValueError): return None # Room-Corner: Root (left/right) w_mm = h5.attrs.get("plate_width_mm_left") h_mm = h5.attrs.get("plate_height_mm_left") if w_mm is None: w_mm = h5.attrs.get("plate_width_mm_right") if h_mm is None: h_mm = h5.attrs.get("plate_height_mm_right") if w_mm is not None or h_mm is not None: return _f(w_mm), _f(h_mm) # Room-Corner: Gruppen if "dewarped_data_left" in h5: g = h5["dewarped_data_left"].attrs w_mm = g.get("plate_width_mm") h_mm = g.get("plate_height_mm") if (w_mm is None or h_mm is None) and "dewarped_data_right" in h5: g = h5["dewarped_data_right"].attrs w_mm = g.get("plate_width_mm") if w_mm is None else w_mm h_mm = g.get("plate_height_mm") if h_mm is None else h_mm if w_mm is not None or h_mm is not None: return _f(w_mm), _f(h_mm) # LFS: Root w_mm = h5.attrs.get("plate_width_mm") h_mm = h5.attrs.get("plate_height_mm") if w_mm is not None or h_mm is not None: return _f(w_mm), _f(h_mm) # LFS: Gruppe if "dewarped_data" in h5: g = h5["dewarped_data"].attrs return _f(g.get("plate_width_mm")), _f(g.get("plate_height_mm")) return None, None def _apply_plate_mm_to_spinboxes( self, w_mm: float | None, h_mm: float | None ) -> None: """Schreibt (falls vorhanden) die Plattenmaße in die SpinBoxes und aktualisiert Ratio.""" wrote_any = False if w_mm is not None: self.ui.doubleSpinBox_plate_width.setValue(w_mm) wrote_any = True if h_mm is not None: self.ui.doubleSpinBox_plate_height.setValue(h_mm) wrote_any = True if wrote_any: self.update_target_ratio() logging.debug("Loaded plate size (mm): %s x %s", w_mm, h_mm) else: logging.debug("Plate size (mm) not found in HDF5 (root or groups).") def _enable_controls_after_load(self) -> None: """Aktiviert Slider/Controls nach erfolgreichem Laden und setzt Bereiche.""" self.ui.slider_frame.setDisabled(False) self.ui.slider_scale_min.setDisabled(False) self.ui.slider_scale_max.setDisabled(False) frame_count = self.experiment.get_data(self.datatype).get_frame_count() self.ui.slider_frame.setMinimum(0) self.ui.slider_frame.setMaximum(frame_count) @staticmethod def _detect_datatype(folder: str) -> str: """ Infer the primary data type by scanning *folder* and all immediate subdirectories (one level deep). Rules (first match wins): *.csv → "IR" *.mp4 → "video" *.png / *.jpg / *.jpeg → "picture" fallback → "IR" """ search_dirs = [folder] + [ os.path.join(folder, d) for d in os.listdir(folder) if os.path.isdir(os.path.join(folder, d)) ] def _find(*exts: str) -> bool: return any( glob.glob(os.path.join(d, f"*.{ext}")) for d in search_dirs for ext in exts ) if _find("csv", "CSV"): return "IR" if _find("mp4", "MP4"): return "video" if _find("png", "PNG", "jpg", "JPG", "jpeg", "JPEG"): return "picture" return "IR" def _edge_threshold(self) -> float: """ Return the default intensity threshold for the active EdgeMethodSpec. Falls back to data-type defaults when no explicit method key is set. """ spec = self._edge_spec_for( "right_to_left" ) # threshold is direction-independent if self.datatype == "IR": return spec.default_threshold_ir return spec.default_threshold_image def _edge_spec_for(self, direction: str) -> EdgeMethodSpec: """ Return the EdgeMethodSpec for the given flame direction. If ``self.edge_method_key`` is set (e.g. by a future UI combo box), that entry from EDGE_METHOD_CATALOG is returned directly. Otherwise the appropriate default is chosen based on direction: right_to_left → ``leftmost_threshold`` left_to_right → ``rightmost_threshold`` Args: direction: ``"left_to_right"`` or ``"right_to_left"`` """ if self.edge_method_key is not None: return EDGE_METHOD_CATALOG[self.edge_method_key] key = ( "leftmost_threshold" if direction == "right_to_left" else "rightmost_threshold" ) return EDGE_METHOD_CATALOG[key] def _edge_method_for(self, direction: str): """Convenience wrapper — returns the ready EdgeFn for the given direction.""" spec = self._edge_spec_for(direction) thr = ( spec.default_threshold_ir if self.datatype == "IR" else spec.default_threshold_image ) return spec.make_fn(thr)
[docs] def load_file(self) -> None: """Open directory dialog, load experiment data, set UI elements.""" folder = QFileDialog.getExistingDirectory(self, "Select Directory") if not folder: return self.datatype = self._detect_datatype(folder) logging.info("Detected data type: %s", self.datatype) self.experiment = RceExperiment(folder) try: _ = self.experiment.get_data(self.datatype) # force load h5 = self.experiment.h5_file self.experiment.h5_path = h5.filename # Experiment-Typ & Plattenmaße ermitteln self._detect_experiment_type_from_h5(h5) w_mm, h_mm = self._read_plate_mm(h5) self._apply_plate_mm_to_spinboxes(w_mm, h_mm) except (AttributeError, KeyError, TypeError, ValueError) as exc: logging.debug("Error reading experiment type from HDF5: %s", exc) # UI & Plots aktualisieren self.update_plot(framenr=0) self._enable_controls_after_load() if self.experiment: self.experiment.experiment_type = self.experiment_type self.update_edge_preview() y_cutoff = self.ui.slider_analysis_y.value() / 100 self.ui.plot_analysis.plot_edge_results(self.experiment, y_cutoff=y_cutoff)
[docs] def update_target_ratio(self) -> None: """Compute and update target pixel size and ratio based on plate dimensions.""" width_mm = self.ui.doubleSpinBox_plate_width.value() height_mm = self.ui.doubleSpinBox_plate_height.value() self.target_ratio = compute_target_ratio(width_mm, height_mm) mm_per_pixel = 5 self.target_pixels_width = int(round(width_mm / mm_per_pixel)) self.target_pixels_height = int(round(height_mm / mm_per_pixel)) self.plate_width_m = width_mm / 1000.0 self.plate_height_m = height_mm / 1000.0 logging.debug("target_ratio: %s", self.target_ratio) logging.debug("target_pixels_width: %s", self.target_pixels_width) logging.debug("target_pixels_height: %s", self.target_pixels_height)
[docs] def set_experiment_type(self, experiment_type: str) -> None: """Set experiment type and adjust UI visibility and required points.""" self.experiment_type = experiment_type if self.experiment: self.experiment.experiment_type = experiment_type if experiment_type == "Room Corner": self.required_points = 6 self.ui.progress_edge_finding_plate1.show() self.ui.progress_edge_finding_plate2.show() self.ui.checkBox_mulithread.show() elif experiment_type == "Lateral Flame Spread": self.required_points = 4 self.ui.progress_edge_finding_plate1.show() self.ui.progress_edge_finding_plate2.hide() self.ui.checkBox_mulithread.hide() if hasattr(self.ui, "plot_dewarping"): self.ui.plot_dewarping.clear_points() logging.debug( "Set experiment type to %s - expecting %s points", experiment_type, self.required_points, )
[docs] def update_plot( self, framenr: int | None = None, rotation_factor: int | None = None, cmin: float | None = None, cmax: float | None = None, ) -> None: """Update image plot according to frame, rotation and color scaling.""" if not self.experiment: return rotation_factor = ( rotation_factor if rotation_factor is not None else self.ui.combo_rotation.currentIndex() ) frame = framenr if framenr is not None else self.ui.slider_frame.value() cmin = cmin if cmin is not None else self.ui.slider_scale_min.value() cmax = cmax if cmax is not None else self.ui.slider_scale_max.value() # Ensure cmin <= cmax and normalize to 0..1 range cmin = min(cmin, cmax) / 100 cmax = max(cmin, cmax) / 100 logging.debug( "update_plot: frame=%s, rotation=%s, cmin=%s, cmax=%s", frame, rotation_factor, cmin, cmax, ) img = self.experiment.get_data(self.datatype).get_frame(frame, rotation_factor) self.ui.plot_dewarping.plot(img, cmin, cmax)
[docs] def calculate_edge_results(self) -> None: """Calculate edge detection results and write to HDF5 datasets.""" if not self.experiment or not self.experiment.h5_file: QMessageBox.warning( self, "No file loaded", "Please load a file first and dewarp it" ) return # Left plate edge dewarped_data_left = self.experiment.h5_file["dewarped_data_left"]["data"] results_left = calculate_edge_results_for_exp_name( self.experiment.exp_name, left=True, dewarped_data=dewarped_data_left, save=False, ) grp_left = self.experiment.h5_file["edge_results_left"] if "data" in grp_left: del grp_left["data"] grp_left.create_dataset("data", data=results_left) logging.debug("Finished LEFT edge calculation") # Right plate edge dewarped_data_right = self.experiment.h5_file["dewarped_data_right"]["data"] results_right = calculate_edge_results_for_exp_name( self.experiment.exp_name, left=False, dewarped_data=dewarped_data_right, save=False, ) grp_right = self.experiment.h5_file["edge_results_right"] if "data" in grp_right: del grp_right["data"] grp_right.create_dataset("data", data=results_right) logging.debug("Finished RIGHT edge calculation") self.experiment.h5_file.close() self.ui.button_dewarp.setDisabled(False)
[docs] def on_dewarp_clicked(self) -> None: """Trigger dewarping based on user-set points and experiment type.""" points = [ (p.scatter_points[0].x(), p.scatter_points[0].y()) for p in self.ui.plot_dewarping.draggable_points ] if len(points) not in (4, 6): QMessageBox.warning( self, "Invalid point count", "Please set exactly 4 or 6 points." ) return experiment_type = self.experiment_type rotation_index = self.ui.combo_rotation.currentIndex() plate_width_mm = self.ui.doubleSpinBox_plate_width.value() plate_height_mm = self.ui.doubleSpinBox_plate_height.value() frame_shape = self.experiment.get_data(self.datatype).get_frame(0, 0).shape logging.debug( "[DEWARP] rotation_index=%d, original_frame_shape=%s, points_in_rotated_coords=%s", rotation_index, frame_shape, points, ) cfg = DewarpConfig( target_ratio=self.target_ratio or 1.0, target_pixels_width=self.target_pixels_width or 100, target_pixels_height=self.target_pixels_height or 100, plate_width_mm=plate_width_mm, plate_height_mm=plate_height_mm, rotation_index=rotation_index, filename=None, frequency=1, testing=False, datatype=self.datatype, ) try: if experiment_type == "Room Corner" and len(points) == 6: dewarp_generator = dewarp_room_corner_remap( experiment=self.experiment, points=points, config=cfg, ) elif experiment_type == "Lateral Flame Spread" and len(points) == 4: dewarp_generator = dewarp_lateral_flame_spread( experiment=self.experiment, points=points, config=cfg, ) else: QMessageBox.warning( self, "Invalid experiment type or point count", f"{experiment_type} requires exactly 4 or 6 points.", ) return self.ui.button_dewarp.setDisabled(True) frame_count = self.experiment.get_data(self.datatype).get_frame_count() # Range 0..frame_count so setValue(i+1) reaches 100% for any count ≥ 1 self.ui.progress_dewarping.setRange(0, frame_count) console_bar = progressbar.ProgressBar( max_value=frame_count, widgets=[ "Dewarping: ", progressbar.Percentage(), " ", progressbar.Bar(marker="█", left="[", right="]"), " ", progressbar.ETA(), ], ) console_bar.start() for progress in dewarp_generator: self.ui.progress_dewarping.setValue(progress + 1) if frame_count > 1: console_bar.update(progress) QApplication.processEvents() self.ui.progress_dewarping.setValue(frame_count) console_bar.finish() self.ui.button_dewarp.setDisabled(False) if hasattr(self.experiment, "h5_file"): self.experiment.h5_path = self.experiment.h5_file.filename self.update_edge_preview() except FileExistsError as err: choice = QMessageBox.question( self, "File exists", f"{err}\nOverwrite?", QMessageBox.Yes | QMessageBox.No, ) if choice == QMessageBox.Yes: os.remove(str(err)) self.on_dewarp_clicked() return except (OSError, ValueError, KeyError) as err: logging.error("Unexpected error during dewarping: %s", err) self.ui.button_dewarp.setDisabled(False) QMessageBox.critical(self, "Error", f"An unexpected error occurred:\n{err}") return
[docs] def start_edge_detection(self) -> None: """Start edge detection with multi-threading support.""" self._disable_ui_while_edge_detecting() frame_count = self.experiment.get_data(self.datatype).get_frame_count() if self.experiment_type == "Lateral Flame Spread": if not hasattr(self, "console_bar") or self.console_bar is None: frame_count = self.experiment.get_data(self.datatype).get_frame_count() logging.debug("Edge detection frame count: %d", frame_count) if frame_count <= 0: logging.error("Frame count is 0 – cannot start edge detection.") QMessageBox.critical( self, "Error", "No frames found for edge detection." ) return self.console_bar = self._create_progress_bar( "Edge Finding: ", max_value=frame_count ) self.console_bar_started = False logging.debug( "Created progress bar with max_value=%d", self.console_bar.max_value ) self.ui.progress_edge_finding_plate1.setRange(0, 100) self.ui.progress_edge_finding_plate1.setValue(0) self.thread = QThread() flame_dir = self.ui.comboBox_flame_direction.currentText() flame_dir_key = ( "right_to_left" if flame_dir == "Right -> Left" else "left_to_right" ) _spec_lfs = self._edge_spec_for(flame_dir_key) self.worker = EdgeDetectionWorker( h5_path=self.experiment.h5_path, dataset_key="dewarped_data/data", result_key="edge_results", threshold=self._edge_threshold(), method=_spec_lfs.make_fn(self._edge_threshold()), flame_direction=flame_dir_key, use_otsu_masking=_spec_lfs.use_otsu_masking, ) self._setup_edge_worker(self.thread, self.worker, "lfs") self.thread.start() return # Room Corner: separate threads for left and right if not hasattr(self, "console_bar_left") or self.console_bar_left is None: frame_count = self.experiment.get_data(self.datatype).get_frame_count() self.console_bar_left = self._create_progress_bar( "Edge Left: ", max_value=frame_count ) self.console_bar_right = self._create_progress_bar( "Edge Right: ", max_value=frame_count ) self.console_bar_right_started = False self.console_bar_left.start() self.ui.progress_edge_finding_plate1.setRange(0, 100) self.ui.progress_edge_finding_plate1.setValue(0) self.ui.progress_edge_finding_plate2.setRange(0, 100) self.ui.progress_edge_finding_plate2.setValue(0) # Left worker/thread # RCE: left plate flame spreads right→left (away from corner on right side) # right plate flame spreads left→right (away from corner on left side) self.thread_left = QThread() _spec_left = self._edge_spec_for("right_to_left") self.worker_left = EdgeDetectionWorker( h5_path=self.experiment.h5_path, dataset_key="dewarped_data_left/data", result_key="edge_results_left/data", threshold=self._edge_threshold(), method=_spec_left.make_fn(self._edge_threshold()), use_otsu_masking=_spec_left.use_otsu_masking, ) self._setup_edge_worker(self.thread_left, self.worker_left, "left") # Right worker/thread self.thread_right = QThread() _spec_right = self._edge_spec_for("left_to_right") self.worker_right = EdgeDetectionWorker( h5_path=self.experiment.h5_path, dataset_key="dewarped_data_right/data", result_key="edge_results_right/data", threshold=self._edge_threshold(), method=_spec_right.make_fn(self._edge_threshold()), use_otsu_masking=_spec_right.use_otsu_masking, ) self._setup_edge_worker(self.thread_right, self.worker_right, "right") if self.ui.checkBox_mulithread.isChecked(): self.thread_left.start() self.thread_right.start() else: self.worker_left.finished.connect(self.thread_right.start) self.thread_left.start()
def _disable_ui_while_edge_detecting(self) -> None: """Disable UI elements during edge detection to avoid user interference.""" self.ui.button_open_folder.setEnabled(False) self.ui.button_find_edge.setEnabled(False) self.ui.comboBox_experiment_type.setEnabled(False) self.ui.checkBox_mulithread.setEnabled(False) self.ui.slider_analysis_y.setEnabled(False) self.ui.comboBox_flame_direction.setEnabled(False) def _create_progress_bar( self, label: str, max_value: int ) -> progressbar.ProgressBar: widgets = [ label, progressbar.Percentage(), " ", progressbar.Bar(marker="█", left="[", right="]"), " ", progressbar.ETA(), ] return progressbar.ProgressBar(max_value=max_value, widgets=widgets) def _setup_edge_worker( self, thread: QThread, worker: EdgeDetectionWorker, side: str ) -> None: """Helper to set up worker-thread connections for edge detection.""" worker.moveToThread(thread) thread.started.connect(worker.run) worker.progress.connect(getattr(self, f"update_edge_progress_{side}")) worker.finished.connect( lambda result, _: self._edge_result_ready.emit(result, side) ) worker.finished.connect(thread.quit) worker.finished.connect(worker.deleteLater) thread.finished.connect(thread.deleteLater)
[docs] def handle_edge_result(self, result_array: np.ndarray, side: str) -> None: """Buffer edge result and update UI; write to HDF5 only when all workers are done.""" # Update progress UI if side == "lfs": if self.console_bar: self.console_bar.finish() self.console_bar = None self.console_bar_started = False self.ui.progress_edge_finding_plate1.setValue(100) elif side == "left": if self.console_bar_left: self.console_bar_left.finish() self.console_bar_left = None self.ui.progress_edge_finding_plate1.setValue(100) elif side == "right": if self.console_bar_right: self.console_bar_right.finish() self.console_bar_right = None self.ui.progress_edge_finding_plate2.setValue(100) # Buffer result – do NOT write to h5 yet (other workers may still be reading) self._pending_edge_results[side] = result_array logging.info("Edge detection finished for %s side.", side.upper()) self.edge_workers_done += 1 logging.debug( "handle_edge_result side=%s, edge_workers_done=%d, experiment_type=%s", side, self.edge_workers_done, self.experiment_type, ) all_done = ( self.experiment_type == "Lateral Flame Spread" and self.edge_workers_done == 1 ) or (self.experiment_type == "Room Corner" and self.edge_workers_done == 2) if all_done: self._write_pending_edge_results() self.enable_analysis_controls()
def _write_pending_edge_results(self) -> None: """Write all buffered edge results to HDF5 at once (all workers are done reading).""" try: with h5py.File(self.experiment.h5_path, "a") as f: for side, result_array in self._pending_edge_results.items(): group_key = ( "edge_results" if side == "lfs" else f"edge_results_{side}" ) group = f.require_group(group_key) if "data" in group: del group["data"] group.create_dataset("data", data=result_array) if side == "lfs" and hasattr(self, "worker"): flame_dir = getattr(self.worker, "flame_direction", None) if flame_dir in ["left_to_right", "right_to_left"]: group.attrs["flame_direction"] = flame_dir logging.debug("_write_pending_edge_results done") except Exception: # pylint: disable=broad-except logging.exception("Failed to write edge results to HDF5") self._pending_edge_results = {}
[docs] def enable_analysis_controls(self) -> None: """Enable UI controls after edge detection is complete.""" logging.debug("enable_analysis_controls called") self.ui.button_open_folder.setEnabled(True) self.ui.button_find_edge.setEnabled(True) self.ui.comboBox_experiment_type.setEnabled(True) self.ui.checkBox_mulithread.setEnabled(True) self.ui.slider_analysis_y.setEnabled(True) self.ui.comboBox_flame_direction.setEnabled(True) self.edge_workers_done = 0 y_cutoff = self.ui.slider_analysis_y.value() / 100 self.ui.plot_analysis.plot_edge_results(self.experiment, y_cutoff=y_cutoff)
[docs] def update_edge_progress_lfs(self, value: int) -> None: self.ui.progress_edge_finding_plate1.setValue(value) if hasattr(self, "console_bar") and self.console_bar: if not getattr(self, "console_bar_started", False): self.console_bar.start() self.console_bar_started = True self.console_bar.update(value)
[docs] def update_edge_progress_left(self, value: int) -> None: self.ui.progress_edge_finding_plate1.setValue(value) if hasattr(self, "console_bar_left") and self.console_bar_left: self.console_bar_left.update(value)
[docs] def update_edge_progress_right(self, value: int) -> None: self.ui.progress_edge_finding_plate2.setValue(value) if hasattr(self, "console_bar_right") and self.console_bar_right: if not getattr(self, "console_bar_right_started", False): self.console_bar_right.start() self.console_bar_right_started = True self.console_bar_right.update(value)
[docs] def update_edge_preview(self) -> None: """Update edge preview plot for current experiment and settings.""" if not self.experiment: return try: frame_count = self.experiment.get_data(self.datatype).get_frame_count() frame_index = frame_count // 2 h5 = self.experiment.h5_file if self.experiment_type == "Lateral Flame Spread": if "dewarped_data" not in h5: logging.debug("No dewarped_data for LFS - skipping preview.") return dataset = h5["dewarped_data"]["data"] is_left = False elif self.experiment_type == "Room Corner": if "dewarped_data_left" not in h5: logging.debug( "No dewarped_data_left for Room Corner - skipping preview." ) return dataset = h5["dewarped_data_left"]["data"] is_left = True else: logging.debug("Unknown experiment type: %s", self.experiment_type) return frame = dataset[:, :, frame_index] if self.experiment_type == "Lateral Flame Spread": flame_dir = self.ui.comboBox_flame_direction.currentText() dir_key = ( "right_to_left" if flame_dir == "Right -> Left" else "left_to_right" ) else: # Room Corner preview always shows left plate (flame right→left) dir_key = "right_to_left" _spec_preview = self._edge_spec_for(dir_key) _thr_preview = self._edge_threshold() edge = calculate_edge_data( data=dataset[:, :, frame_index : frame_index + 1], find_edge_point=_spec_preview.make_fn(_thr_preview), custom_filter=lambda x: x, use_otsu_masking=_spec_preview.use_otsu_masking, )[0] self.ui.plot_edge_preview.plot_with_edge(frame, edge, cmin=0.0, cmax=1.0) logging.debug( "Showing dataset shape: %s, from: %s", dataset.shape, "dewarped_data_left" if is_left else "dewarped_data", ) except ( KeyError, IndexError, AttributeError, TypeError, ValueError, OSError, ) as err: # HDF5-Schlüssel fehlt, Frameindex out-of-range, falsche Typen/Werte, IO logging.debug("Could not load edge preview: %s", err)
[docs] def update_flame_direction_visibility(self) -> None: """Show or hide flame direction controls based on experiment type.""" is_lfs = ( self.ui.comboBox_experiment_type.currentText() == "Lateral Flame Spread" ) self.ui.comboBox_flame_direction.setVisible(is_lfs)
[docs] def update_analysis_plot(self): if self.experiment is None: return y_cutoff = self.ui.slider_analysis_y.value() / 100 self.ui.plot_analysis.plot_edge_results(self.experiment, y_cutoff)