# src/Synaptipy/core/analysis/synaptic_events.py
# -*- coding: utf-8 -*-
"""
Core Protocol Module 4: Synaptic Events.
Consolidates all synaptic event detection methods (adaptive threshold,
template matching, baseline-peak-kinetics) from event_detection.py into
one self-contained module.
All registry wrapper functions return::
{
"module_used": "synaptic_events",
"metrics": { ... flat result keys ... }
}
Exports ``detect_minis_threshold`` as a backward-compatibility alias.
"""
import logging
from typing import Any, Dict, Optional, Tuple
import numpy as np
from scipy import signal
from scipy.optimize import curve_fit
from scipy.stats import median_abs_deviation
from Synaptipy.core.analysis.registry import AnalysisRegistry
from Synaptipy.core.results import EventDetectionResult
from Synaptipy.core.signal_processor import find_artifact_windows
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Quiescent Noise Floor
# ---------------------------------------------------------------------------
[docs]
def find_quiescent_baseline_rms(
data: np.ndarray,
sample_rate: float,
window_ms: float = 20.0,
) -> Tuple[float, Tuple[int, int]]:
"""
Identify the quietest (minimum-variance) segment in a trace via a sliding
window and return its RMS as the noise floor.
Unlike a fixed pre-trace window (e.g. ``trace[0:50]``), this approach is
robust to recordings with spontaneous activity at the start: the search
considers the *entire* trace, selecting the 20 ms chunk with the smallest
variance regardless of its position.
Args:
data: 1D signal array (mV or pA).
sample_rate: Sampling rate (Hz).
window_ms: Duration of the sliding window (ms, default 20).
Returns:
Tuple of (rms_noise_floor, (start_idx, end_idx)) where the indices
define the quiescent window used for the RMS calculation.
"""
n = len(data)
window_samples = max(2, int(window_ms / 1000.0 * sample_rate))
step_samples = max(1, window_samples // 2)
min_var = np.inf
best_start = 0
for i in range(0, n - window_samples + 1, step_samples):
chunk = data[i : i + window_samples]
# Detrend each chunk to remove slow drift before computing variance so
# that only high-frequency thermal noise is assessed.
chunk_detrended = signal.detrend(chunk, type="linear")
var = float(np.var(chunk_detrended))
if var < min_var:
min_var = var
best_start = i
best_end = best_start + window_samples
quiescent_chunk = signal.detrend(data[best_start:best_end], type="linear")
rms = float(np.sqrt(np.mean(quiescent_chunk**2)))
return rms, (best_start, best_end)
# ---------------------------------------------------------------------------
# Dynamic AUC / Charge Integration
# ---------------------------------------------------------------------------
[docs]
def calculate_event_charge_dynamic(
data: np.ndarray,
event_index: int,
sample_rate: float,
local_baseline: float,
polarity: str = "negative",
max_duration_ms: float = 100.0,
) -> float:
"""
Integrate event charge (area under curve) with a dynamic boundary.
The integration ends at whichever comes first:
- The signal returns to ``local_baseline`` (event complete).
- A large derivative transient indicates the onset of a subsequent
summating event (onset detected as ``dV/dt`` > 3x the noise in the
early derivative).
Args:
data: 1D signal array.
event_index: Sample index of the event peak.
sample_rate: Sampling rate (Hz).
local_baseline: Local baseline voltage/current level.
polarity: ``"negative"`` or ``"positive"``.
max_duration_ms: Hard cap on integration window (ms).
Returns:
Signed charge (area under curve relative to baseline, in units of
data * seconds, e.g. mV·s or pA·s).
"""
dt = 1.0 / sample_rate
max_samples = max(2, int(max_duration_ms / 1000.0 * sample_rate))
start = int(event_index)
end = min(start + max_samples, len(data))
segment = data[start:end]
if len(segment) < 2:
return 0.0
# Boundary 1: signal returns to local baseline
if polarity == "negative":
returned = np.where(segment >= local_baseline)[0]
else:
returned = np.where(segment <= local_baseline)[0]
baseline_return_idx = int(returned[0]) if len(returned) > 0 else len(segment)
# Boundary 2: onset of subsequent event (derivative transient)
dvdt_seg = np.diff(segment[:baseline_return_idx]) if baseline_return_idx > 1 else np.array([])
onset_idx = baseline_return_idx
if len(dvdt_seg) > 4:
# Estimate noise from the first quarter of the derivative
n_noise = max(2, len(dvdt_seg) // 4)
noise_std = float(np.std(dvdt_seg[:n_noise]))
if noise_std > 0:
min_pts = max(1, int(0.001 * sample_rate)) # skip first 1 ms
if polarity == "negative":
candidates = np.where(dvdt_seg < -3.0 * noise_std)[0]
else:
candidates = np.where(dvdt_seg > 3.0 * noise_std)[0]
candidates = candidates[candidates >= min_pts]
if len(candidates) > 0:
onset_idx = min(baseline_return_idx, int(candidates[0]))
integration_end = max(1, onset_idx)
t_seg = np.arange(integration_end) * dt
trace_slice = segment[:integration_end]
charge = float(np.trapezoid(trace_slice - local_baseline, t_seg))
return charge
# ---------------------------------------------------------------------------
# Bi-Exponential Decay Kinetics Fitting
# ---------------------------------------------------------------------------
[docs]
def fit_biexponential_decay( # noqa: C901
data: np.ndarray,
event_index: int,
sample_rate: float,
local_baseline: float,
polarity: str = "negative",
fit_window_ms: float = 80.0,
) -> Dict[str, Any]:
"""Fit mono- and bi-exponential decays to a synaptic event.
Tries a bi-exponential fit first. Falls back to mono-exponential when the
bi-exp fit does not converge or yields non-physical parameters (negative
amplitudes or time constants).
Bi-exponential model (relative to baseline)::
f(t) = A_fast * exp(-t / tau_fast) + A_slow * exp(-t / tau_slow)
with A_fast + A_slow normalised by the event peak amplitude at t=0.
Args:
data: 1-D signal array.
event_index: Sample index of the event peak.
sample_rate: Sampling rate (Hz).
local_baseline: Local baseline level (same units as data).
polarity: ``"negative"`` or ``"positive"``.
fit_window_ms: Maximum duration of the decay segment to fit (ms).
Returns:
Dict with keys:
- ``tau_mono_ms`` – mono-exp time constant (ms)
- ``tau_fast_ms`` – fast component time constant (ms); ``None`` if
bi-exp did not converge
- ``tau_slow_ms`` – slow component time constant (ms); ``None`` if
bi-exp did not converge
- ``bi_exp_converged`` – True when bi-exp fit was accepted
- ``decay_fit_error`` – ``None`` when fit succeeded; error message
string otherwise
"""
result: Dict[str, Any] = {
"tau_mono_ms": None,
"tau_fast_ms": None,
"tau_slow_ms": None,
"bi_exp_converged": False,
"decay_fit_error": None,
}
dt = 1.0 / sample_rate
max_samples = max(4, int(fit_window_ms / 1000.0 * sample_rate))
start = int(event_index)
end = min(start + max_samples, len(data))
segment = data[start:end]
if len(segment) < 5:
result["decay_fit_error"] = "Segment too short"
return result
# Build relative amplitude trace (positive, regardless of polarity)
if polarity == "negative":
y = local_baseline - segment
else:
y = segment - local_baseline
peak_amp = float(y[0]) if y.size > 0 else 0.0
if peak_amp <= 0:
result["decay_fit_error"] = "Non-positive peak amplitude"
return result
# Find where signal returns to baseline (positive threshold cross)
returned = np.where(y <= 0)[0]
decay_end = int(returned[0]) if len(returned) > 0 else len(y)
y_fit = y[:decay_end]
t_fit = np.arange(len(y_fit)) * dt * 1000.0 # ms
if len(t_fit) < 4:
result["decay_fit_error"] = "Decay segment too short after baseline crossing"
return result
# --- Mono-exponential fit ---
def mono_exp(t: np.ndarray, a: float, tau: float) -> np.ndarray:
return a * np.exp(-t / tau)
tau_mono_ms = None
try:
p0_mono = [peak_amp, max(0.1, t_fit[-1] / 3.0)]
bounds_mono = ([0.0, 0.01], [peak_amp * 10, t_fit[-1] * 20 + 1.0])
popt_m, _ = curve_fit(mono_exp, t_fit, y_fit, p0=p0_mono, bounds=bounds_mono, maxfev=2000)
tau_mono_ms = float(popt_m[1])
result["tau_mono_ms"] = tau_mono_ms
except Exception as exc_m:
result["decay_fit_error"] = f"Mono-exp failed: {exc_m}"
return result
# --- Bi-exponential fit ---
def bi_exp(t: np.ndarray, a_f: float, tau_f: float, a_s: float, tau_s: float) -> np.ndarray:
return a_f * np.exp(-t / tau_f) + a_s * np.exp(-t / tau_s)
try:
# Initialise: fast half of mono, slow twice; amplitudes split equally
p0_bi = [peak_amp * 0.6, tau_mono_ms * 0.4, peak_amp * 0.4, tau_mono_ms * 2.0]
bounds_bi = (
[0.0, 0.01, 0.0, 0.01],
[peak_amp * 5, tau_mono_ms * 5, peak_amp * 5, tau_mono_ms * 50 + 1.0],
)
popt_b, _ = curve_fit(bi_exp, t_fit, y_fit, p0=p0_bi, bounds=bounds_bi, maxfev=4000)
a_f, tau_f, a_s, tau_s = popt_b
# Sanity: both amplitudes and both taus must be positive and tau_fast < tau_slow
if a_f > 0 and a_s > 0 and tau_f > 0 and tau_s > 0 and tau_f != tau_s:
tau_fast = min(tau_f, tau_s)
tau_slow = max(tau_f, tau_s)
result["tau_fast_ms"] = float(tau_fast)
result["tau_slow_ms"] = float(tau_slow)
result["bi_exp_converged"] = True
except Exception:
# Bi-exp failed; mono-exp result is still valid
pass
return result
# ---------------------------------------------------------------------------
# 1. Adaptive Threshold Detection
# ---------------------------------------------------------------------------
[docs]
def compute_local_pre_event_baseline(
data: np.ndarray,
event_indices: np.ndarray,
sample_rate: float,
pre_event_window_ms: float = 2.0,
polarity: str = "negative",
) -> np.ndarray:
"""
Compute a local pre-event baseline voltage for each detected event.
For summ ating synaptic events that ride on the decay of a previous event,
the global resting potential is a poor amplitude reference. This function
searches the `pre_event_window_ms` immediately preceding each event peak
and returns the local "foot" voltage:
- Negative polarity: the maximum (most depolarised) voltage in the search
window - i.e. the point before the hyperpolarising/inward current event
begins to deflect the trace.
- Positive polarity: the minimum (most hyperpolarised) voltage.
Args:
data: 1D voltage/current array.
event_indices: Integer indices of detected event peaks.
sample_rate: Sampling rate in Hz.
pre_event_window_ms: Duration (ms) of the search window before each
peak (default 2.0 ms, valid range 1-3 ms recommended).
polarity: "negative" or "positive".
Returns:
1D float array of local baseline values, one per event.
"""
if len(event_indices) == 0:
return np.array([], dtype=float)
search_samples = max(1, int(pre_event_window_ms / 1000.0 * sample_rate))
local_baselines = np.empty(len(event_indices), dtype=float)
for k, idx in enumerate(event_indices):
win_start = max(0, int(idx) - search_samples)
win_end = max(win_start + 1, int(idx))
segment = data[win_start:win_end]
if segment.size == 0:
local_baselines[k] = float(data[max(0, int(idx))])
elif polarity == "negative":
# Foot of a negative event: highest voltage before the downswing.
local_baselines[k] = float(np.max(segment))
else:
# Foot of a positive event: lowest voltage before the upswing.
local_baselines[k] = float(np.min(segment))
return local_baselines
# ---------------------------------------------------------------------------
# Paired-Pulse Ratio with Residual Decay Subtraction
# ---------------------------------------------------------------------------
def _fit_p1_decay_residual(
data: np.ndarray,
time: np.ndarray,
peak1_idx: int,
s2_t: float,
global_baseline: float,
sample_rate: float,
) -> Tuple[float, float]:
"""Fit P1 decay tail; attempt bi-exponential first, fall back to mono-exp.
A bi-exponential model captures both fast and slow decay components
(e.g., AMPA fast-deactivation + NMDA slow-deactivation) and yields a
more accurate residual estimate at P2 onset when two components are
present. If bi-exp optimisation fails to converge, the function falls
back gracefully to a mono-exponential fit.
Returns:
Tuple of (residual_at_p2, tau_dominant_ms) where ``tau_dominant_ms``
is the amplitude-weighted mean time constant for the bi-exp case or
the single tau for the mono-exp case.
"""
nan = float("nan")
max_decay_samples = min(int(0.2 * sample_rate), int((s2_t - time[peak1_idx]) * sample_rate) - 1)
decay_end_idx = min(peak1_idx + max(4, max_decay_samples), len(data) - 1)
if decay_end_idx <= peak1_idx + 3:
return 0.0, nan
t_decay = time[peak1_idx:decay_end_idx] - time[peak1_idx]
i_decay = data[peak1_idx:decay_end_idx] - global_baseline
t_at_s2 = s2_t - time[peak1_idx]
A0 = float(i_decay[0]) if i_decay.size > 0 else 1.0
tau_guess = float(t_decay[-1] / 3.0) if t_decay[-1] > 0 else 0.01
# ------------------------------------------------------------------
# Attempt 1: bi-exponential A_f*exp(-t/tau_f) + A_s*exp(-t/tau_s)
# ------------------------------------------------------------------
def _bi_exp_ppr(t, A_f, tau_f, A_s, tau_s):
return A_f * np.exp(-t / tau_f) + A_s * np.exp(-t / tau_s)
if len(t_decay) >= 8:
tau_fast_guess = tau_guess * 0.2
tau_slow_guess = tau_guess
p0_bi = [A0 * 0.6, tau_fast_guess, A0 * 0.4, tau_slow_guess]
bounds_bi = (
[-np.inf, 1e-5, -np.inf, 1e-4],
[np.inf, tau_guess * 2.0, np.inf, 2.0],
)
try:
popt_bi, _ = curve_fit(
_bi_exp_ppr,
t_decay,
i_decay,
p0=p0_bi,
bounds=bounds_bi,
maxfev=4000,
)
A_f, tau_f, A_s, tau_s = popt_bi
residual_at_p2 = float(_bi_exp_ppr(t_at_s2, *popt_bi))
# Amplitude-weighted dominant tau for reporting
total_amp = abs(A_f) + abs(A_s)
if total_amp > 0:
tau_dominant_ms = (abs(A_f) * tau_f + abs(A_s) * tau_s) / total_amp * 1000.0
else:
tau_dominant_ms = tau_guess * 1000.0
log.debug("PPR: bi-exp decay fit converged (tau_f=%.2f ms, tau_s=%.2f ms).", tau_f * 1000, tau_s * 1000)
return residual_at_p2, float(tau_dominant_ms)
except (RuntimeError, ValueError):
log.debug("PPR: bi-exp decay fit did not converge; falling back to mono-exp.")
# ------------------------------------------------------------------
# Attempt 2 (fallback): mono-exponential A*exp(-t/tau)
# ------------------------------------------------------------------
def _mono_exp_ppr(t, A, tau):
return A * np.exp(-t / tau)
p0 = [A0, tau_guess]
try:
popt, _ = curve_fit(
_mono_exp_ppr,
t_decay,
i_decay,
p0=p0,
bounds=([-np.inf, 1e-4], [np.inf, 2.0]),
maxfev=2000,
)
tau_p1_ms = float(popt[1]) * 1000.0
residual_at_p2 = float(_mono_exp_ppr(t_at_s2, *popt))
return residual_at_p2, tau_p1_ms
except (RuntimeError, ValueError):
log.debug("PPR: mono-exp decay fit did not converge; using zero residual.")
return 0.0, nan
def _measure_ppr_peak(
data: np.ndarray,
time: np.ndarray,
onset_s: float,
baseline: float,
resp_samples: int,
polarity: str,
) -> Tuple[float, int]:
"""Return (amplitude, peak_index) relative to *baseline* in a post-onset window."""
nan = float("nan")
onset_idx = int(np.searchsorted(time, onset_s, side="left"))
end_idx = min(onset_idx + resp_samples, len(data))
if onset_idx >= end_idx:
return nan, onset_idx
segment = data[onset_idx:end_idx]
if polarity == "negative":
rel_idx = int(np.argmin(segment))
else:
rel_idx = int(np.argmax(segment))
peak_idx = onset_idx + rel_idx
amp = float(data[peak_idx]) - float(baseline)
return amp, peak_idx
[docs]
def calculate_paired_pulse_ratio(
data: np.ndarray,
time: np.ndarray,
stimulus_onsets_s: np.ndarray,
sample_rate: float,
response_window_ms: float = 20.0,
polarity: str = "negative",
) -> Dict[str, Any]:
"""Calculate the Paired-Pulse Ratio (PPR) with residual decay correction.
A naive amplitude ratio (P2/P1) is contaminated by the unresolved decay
tail of the first response. This function:
1. Fits a mono-exponential decay to the P1 tail.
2. Evaluates the fitted tail at the P2 onset to obtain the *residual*
baseline offset.
3. Measures P2 amplitude relative to the corrected (residual-subtracted)
baseline instead of the global baseline.
PPR = A2_corrected / A1
Parameters
----------
data : np.ndarray
1-D signal array (pA or mV).
time : np.ndarray
1-D time array (seconds), same length as *data*.
stimulus_onsets_s : np.ndarray
1-D array of stimulus onset times (seconds). At least two are required.
sample_rate : float
Sampling rate (Hz).
response_window_ms : float
Duration (ms) of the post-stimulus window searched for the peak.
polarity : str
``"negative"`` for inward currents/hyperpolarisations;
``"positive"`` for depolarisations.
Returns
-------
dict
Keys:
- ``ppr`` – corrected P2/P1 amplitude ratio
- ``ppr_naive`` – uncorrected P2/P1 ratio (no residual subtraction)
- ``amplitude_p1`` – P1 amplitude relative to pre-stimulus baseline
- ``amplitude_p2_corrected`` – P2 amplitude after residual subtraction
- ``amplitude_p2_naive`` – P2 amplitude relative to global baseline
- ``residual_at_p2`` – predicted P1 tail amplitude at P2 onset
- ``tau_p1_ms`` – time constant of the P1 decay fit (ms)
- ``interpulse_interval_ms`` – interval between S1 and S2 (ms)
- ``error`` – error message string when calculation fails
"""
nan = float(np.nan)
result: Dict[str, Any] = {
"ppr": nan,
"ppr_naive": nan,
"amplitude_p1": nan,
"amplitude_p2_corrected": nan,
"amplitude_p2_naive": nan,
"residual_at_p2": nan,
"tau_p1_ms": nan,
"interpulse_interval_ms": nan,
"error": None,
}
if len(stimulus_onsets_s) < 2:
result["error"] = "Need at least 2 stimulus onsets for PPR."
return result
resp_samples = max(2, int(response_window_ms / 1000.0 * sample_rate))
def _measure_peak(onset_s: float, baseline: float) -> Tuple[float, int]:
return _measure_ppr_peak(data, time, onset_s, baseline, resp_samples, polarity)
try:
s1_t = float(stimulus_onsets_s[0])
s2_t = float(stimulus_onsets_s[1])
isi_ms = (s2_t - s1_t) * 1000.0
result["interpulse_interval_ms"] = isi_ms
# Pre-S1 baseline (5 ms before S1)
base_mask = (time >= s1_t - 0.005) & (time < s1_t)
if not np.any(base_mask):
result["error"] = "No pre-S1 baseline samples."
return result
global_baseline = float(np.mean(data[base_mask]))
# P1 amplitude
amp_p1, peak1_idx = _measure_peak(s1_t, global_baseline)
if np.isnan(amp_p1) or amp_p1 == 0.0:
result["error"] = "P1 amplitude is zero or NaN."
return result
result["amplitude_p1"] = amp_p1
# Naive P2 amplitude (no correction)
amp_p2_naive, peak2_idx = _measure_peak(s2_t, global_baseline)
result["amplitude_p2_naive"] = amp_p2_naive
if amp_p1 != 0.0:
result["ppr_naive"] = float(amp_p2_naive / amp_p1)
# ------------------------------------------------------------------ #
# Fit mono-exponential decay to P1 tail to predict residual at S2 #
# ------------------------------------------------------------------ #
residual_at_p2, tau_p1_ms = _fit_p1_decay_residual(data, time, peak1_idx, s2_t, global_baseline, sample_rate)
if not np.isnan(tau_p1_ms):
result["tau_p1_ms"] = tau_p1_ms
result["residual_at_p2"] = residual_at_p2
# Corrected P2 amplitude: subtract the predicted residual from the baseline
corrected_p2_baseline = global_baseline + residual_at_p2
amp_p2_corrected, _ = _measure_peak(s2_t, corrected_p2_baseline)
result["amplitude_p2_corrected"] = amp_p2_corrected
if amp_p1 != 0.0:
result["ppr"] = float(amp_p2_corrected / amp_p1)
except (ValueError, TypeError, IndexError) as e:
result["error"] = str(e)
log.warning("calculate_paired_pulse_ratio: %s", e)
return result
[docs]
def detect_events_threshold( # noqa: C901
data: np.ndarray,
time: np.ndarray,
threshold: float,
polarity: str = "negative",
refractory_period: float = 0.002,
rolling_baseline_window_ms: Optional[float] = 100.0,
artifact_mask: Optional[np.ndarray] = None,
use_quiescent_noise_floor: bool = True,
quiescent_window_ms: float = 20.0,
) -> EventDetectionResult:
"""
Detect events using topological prominence to handle shifting baselines.
By default uses a quiescent-noise-floor estimate: the RMS of the
minimum-variance 20 ms chunk in the trace is used to set a dynamic
noise threshold, preventing false positives even when spontaneous
activity dominates the beginning of the recording.
"""
if data.size < 2 or time.shape != data.shape:
return EventDetectionResult(value=0, unit="Hz", is_valid=False, error_message="Invalid data/time shape")
try:
fs = 1.0 / (time[1] - time[0]) if len(time) > 1 else 10000.0
if rolling_baseline_window_ms is not None and rolling_baseline_window_ms > 0:
from scipy.ndimage import median_filter
window_samples = int((rolling_baseline_window_ms / 1000.0) * fs)
if window_samples % 2 == 0:
window_samples += 1
if window_samples >= 3:
baseline = median_filter(data, size=window_samples)
baseline_corrected_data = data - baseline
else:
baseline_corrected_data = data
else:
baseline_corrected_data = data
is_negative = polarity == "negative"
work_data = -baseline_corrected_data if is_negative else baseline_corrected_data
if use_quiescent_noise_floor:
# Dynamic noise floor: RMS of the quietest window in the trace
quiescent_rms, _ = find_quiescent_baseline_rms(work_data, fs, window_ms=quiescent_window_ms)
noise_sd = quiescent_rms if quiescent_rms > 0 else 1e-12
else:
noise_sd = median_abs_deviation(work_data, scale="normal")
if noise_sd == 0:
noise_sd = 1e-12
abs_threshold = abs(threshold)
min_prominence = max(abs_threshold, 2.0 * noise_sd)
distance_samples = max(1, int(refractory_period * fs))
min_width_samples = max(2, int(0.0002 * fs))
peaks, _ = signal.find_peaks(
work_data,
prominence=min_prominence,
height=abs_threshold,
distance=distance_samples,
width=min_width_samples,
)
n_artifacts_rejected = 0
if artifact_mask is not None and len(peaks) > 0:
valid_idx_mask = peaks < len(artifact_mask)
peaks = peaks[valid_idx_mask]
not_artifact_mask = ~artifact_mask[peaks]
n_artifacts_rejected = int(np.sum(~not_artifact_mask))
peaks = peaks[not_artifact_mask]
event_indices = peaks.astype(int)
if len(event_indices) > 0:
event_times = time[event_indices]
event_amplitudes = baseline_corrected_data[event_indices]
else:
event_times = np.array([])
event_amplitudes = np.array([])
num_events = len(event_indices)
duration = time[-1] - time[0] if len(time) > 0 else 0
frequency = num_events / duration if duration > 0 else 0.0
mean_amplitude = float(np.mean(event_amplitudes)) if num_events > 0 else 0.0
std_amplitude = float(np.std(event_amplitudes)) if num_events > 0 else 0.0
return EventDetectionResult(
value=frequency,
unit="Hz",
is_valid=True,
event_count=num_events,
frequency_hz=frequency,
mean_amplitude=mean_amplitude,
amplitude_sd=std_amplitude,
event_indices=event_indices,
event_times=event_times,
event_amplitudes=event_amplitudes,
detection_method="threshold_adaptive_prominence",
threshold_value=threshold,
direction=polarity,
n_artifacts_rejected=n_artifacts_rejected,
artifact_mask=artifact_mask,
summary_stats={"noise_sd": float(noise_sd), "min_prominence_used": float(min_prominence)},
)
except Exception as e:
log.error(f"Error during adaptive threshold event detection: {e}", exc_info=True)
return EventDetectionResult(value=0, unit="Hz", is_valid=False, error_message=str(e))
# Backward compatibility alias
detect_minis_threshold = detect_events_threshold
[docs]
@AnalysisRegistry.register(
"event_detection_threshold",
label="Event Detection (Threshold)",
plots=[
{"name": "Trace", "type": "trace", "show_spikes": True},
{"type": "markers", "x": "_event_times", "y": "_event_peaks", "color": "r", "symbol": "o"},
{"type": "threshold_line", "param": "threshold"},
{"type": "artifact_overlay"},
],
ui_params=[
{
"name": "threshold",
"label": "Threshold (pA/mV):",
"type": "float",
"default": 5.0,
"min": -1e9,
"max": 1e9,
"decimals": 4,
},
{
"name": "direction",
"label": "Direction:",
"type": "choice",
"choices": ["negative", "positive"],
"default": "negative",
},
{
"name": "refractory_period",
"label": "Refractory (s):",
"type": "float",
"default": 0.005,
"min": 0.0,
"max": 1.0,
"decimals": 4,
},
{
"name": "rolling_baseline_window_ms",
"label": "Rolling Baseline (ms):",
"type": "float",
"default": 100.0,
"min": 0.0,
"max": 5000.0,
"decimals": 1,
},
{"name": "reject_artifacts", "label": "Reject Artifacts", "type": "bool", "default": False},
{
"name": "artifact_slope_threshold",
"label": "Artifact Slope Thresh:",
"type": "float",
"default": 20.0,
"min": 0.0,
},
{"name": "artifact_padding_ms", "label": "Artifact Padding (ms):", "type": "float", "default": 2.0},
{
"name": "use_quiescent_noise_floor",
"label": "Quiescent Noise Floor",
"type": "bool",
"default": True,
"tooltip": "Use minimum-variance sliding window to estimate noise, ignoring bursts at recording start.",
},
{
"name": "quiescent_window_ms",
"label": "Quiescent Window (ms):",
"type": "float",
"default": 20.0,
"min": 1.0,
"max": 500.0,
"decimals": 1,
"visible_when": {"param": "use_quiescent_noise_floor", "value": True},
},
],
)
def run_event_detection_threshold_wrapper(
data: np.ndarray, time: np.ndarray, sampling_rate: float, **kwargs
) -> Dict[str, Any]:
"""Wrapper for adaptive threshold event detection."""
threshold = kwargs.get("threshold", 5.0)
direction = kwargs.get("direction", "negative")
refractory_period = kwargs.get("refractory_period", 0.005)
rolling_baseline_window_ms = kwargs.get("rolling_baseline_window_ms", 100.0)
use_quiescent_noise_floor = kwargs.get("use_quiescent_noise_floor", True)
quiescent_window_ms = float(kwargs.get("quiescent_window_ms", 20.0))
reject_artifacts = kwargs.get("reject_artifacts", False)
artifact_mask = None
if reject_artifacts:
slope_thresh = kwargs.get("artifact_slope_threshold", 20.0)
padding_ms = kwargs.get("artifact_padding_ms", 2.0)
artifact_mask = find_artifact_windows(data, sampling_rate, slope_thresh, padding_ms)
result = detect_events_threshold(
data,
time,
threshold,
direction,
refractory_period,
rolling_baseline_window_ms=rolling_baseline_window_ms,
artifact_mask=artifact_mask,
use_quiescent_noise_floor=use_quiescent_noise_floor,
quiescent_window_ms=quiescent_window_ms,
)
if not result.is_valid:
return {"module_used": "synaptic_events", "metrics": {"event_error": result.error_message}}
_idx = np.asarray(result.event_indices if result.event_indices is not None else [], dtype=int)
# Compute local pre-event baseline for each detected event (handles summating events).
local_baselines = compute_local_pre_event_baseline(data, _idx, sampling_rate, polarity=direction)
if len(_idx) > 0:
if direction == "negative":
local_amplitudes = local_baselines - data[_idx]
else:
local_amplitudes = data[_idx] - local_baselines
else:
local_amplitudes = np.array([], dtype=float)
# Dynamic AUC: integrate each event charge with a dynamic boundary
event_charges = []
for k, idx in enumerate(_idx):
lb = float(local_baselines[k]) if k < len(local_baselines) else float(np.mean(data))
charge = calculate_event_charge_dynamic(data, idx, sampling_rate, lb, polarity=direction)
event_charges.append(charge)
mean_charge = float(np.mean(event_charges)) if event_charges else 0.0
# Bi-exponential decay kinetics (aggregate over all detected events)
tau_fast_list = []
tau_slow_list = []
tau_mono_list = []
for k, idx in enumerate(_idx):
lb = float(local_baselines[k]) if k < len(local_baselines) else float(np.mean(data))
kin = fit_biexponential_decay(data, idx, sampling_rate, lb, polarity=direction)
if kin["tau_mono_ms"] is not None:
tau_mono_list.append(kin["tau_mono_ms"])
if kin["bi_exp_converged"]:
tau_fast_list.append(kin["tau_fast_ms"])
tau_slow_list.append(kin["tau_slow_ms"])
mean_tau_mono = float(np.mean(tau_mono_list)) if tau_mono_list else float("nan")
mean_tau_fast = float(np.mean(tau_fast_list)) if tau_fast_list else None
mean_tau_slow = float(np.mean(tau_slow_list)) if tau_slow_list else None
return {
"module_used": "synaptic_events",
"metrics": {
"event_count": result.event_count,
"frequency_hz": result.frequency_hz,
"mean_amplitude": result.mean_amplitude,
"amplitude_sd": result.amplitude_sd,
"mean_local_amplitude": float(np.mean(local_amplitudes)) if local_amplitudes.size > 0 else 0.0,
"mean_event_charge": mean_charge,
"tau_mono_ms": mean_tau_mono,
"tau_fast_ms": mean_tau_fast,
"tau_slow_ms": mean_tau_slow,
"_event_charges": event_charges,
"_event_times": time[_idx].tolist() if len(_idx) > 0 else [],
"_event_peaks": data[_idx].tolist() if len(_idx) > 0 else [],
"_local_baselines": local_baselines.tolist() if local_baselines.size > 0 else [],
"_local_amplitudes": local_amplitudes.tolist() if local_amplitudes.size > 0 else [],
"_result_obj": result,
},
}
# ---------------------------------------------------------------------------
# 2. Parametric Template Matching
# ---------------------------------------------------------------------------
[docs]
def detect_events_template( # noqa: C901
data: np.ndarray,
sampling_rate: float,
threshold_std: float,
tau_rise: float,
tau_decay: float,
polarity: str = "negative",
rolling_baseline_window_ms: Optional[float] = 100.0,
artifact_mask: Optional[np.ndarray] = None,
time: Optional[np.ndarray] = None,
min_event_distance_ms: float = 0.0,
) -> EventDetectionResult:
"""Detect events using a multi-kernel matched-filter bank.
Three kernels are built using the specified tau_rise and tau_decay × 1, 2, 3
to tolerate dendritic filtering that prolongs event decay (Cable theory
predicts a ~2-3× slowdown for distal inputs). A combined z-score trace
(pointwise maximum across the three filtered traces) is used for peak
detection, improving sensitivity to both somatic and dendritic events.
"""
try:
dt = 1.0 / sampling_rate
n_points = len(data)
def _build_kernel(td: float) -> np.ndarray:
"""Alpha/bi-exponential kernel for a given tau_decay."""
kernel_duration = 5 * max(td, tau_rise)
t_k = np.arange(0, kernel_duration, dt)
if td == tau_rise:
k = t_k * np.exp(-t_k / td)
else:
k = np.exp(-t_k / td) - np.exp(-t_k / tau_rise)
max_abs = np.max(np.abs(k))
if max_abs > 0:
k /= max_abs
return k
kernels = [_build_kernel(tau_decay * scale) for scale in (1.0, 2.0, 3.0)]
if rolling_baseline_window_ms is not None and rolling_baseline_window_ms > 0:
from scipy.ndimage import median_filter
window_samples = int((rolling_baseline_window_ms / 1000.0) * sampling_rate)
if window_samples % 2 == 0:
window_samples += 1
if window_samples >= 3:
baseline = median_filter(data, size=window_samples)
baseline_corrected_data = data - baseline
else:
baseline_corrected_data = data
else:
baseline_corrected_data = data
is_negative = polarity == "negative"
work_data = -baseline_corrected_data if is_negative else baseline_corrected_data
# Compute filtered traces and z-scores for each kernel.
# Each kernel's matched-filter peak is shifted from the true event time by
# (kernel_peak_idx - kernel_center) samples; align all z-score traces to
# the primary (1x) kernel reference so that the max-combination produces a
# single sharp peak per event rather than multiple spread-out humps.
primary_kernel = kernels[0]
kernel_center_0 = (len(primary_kernel) - 1) // 2
ref_offset = int(np.argmax(primary_kernel)) - kernel_center_0 # 1x shift
z_traces = []
for k in kernels:
matched_k = k[::-1]
filtered = signal.fftconvolve(work_data, matched_k, mode="same")
mad = median_abs_deviation(filtered, scale="normal")
if mad == 0:
mad = 1e-12
z = (filtered - np.median(filtered)) / mad
# Align this kernel's peak to the primary kernel's reference
k_offset = int(np.argmax(k)) - (len(k) - 1) // 2
relative_shift = k_offset - ref_offset
if relative_shift != 0:
z = np.roll(z, relative_shift)
if relative_shift > 0:
z[:relative_shift] = 0.0
else:
z[relative_shift:] = 0.0
z_traces.append(z)
z_score_trace = np.max(np.stack(z_traces, axis=0), axis=0)
# Noise estimate from the primary (unscaled) kernel for return metadata
mad = float(median_abs_deviation(signal.fftconvolve(work_data, kernels[0][::-1], mode="same"), scale="normal"))
if mad == 0:
mad = 1e-12
if min_event_distance_ms > 0:
min_dist_samples = int((min_event_distance_ms / 1000.0) * sampling_rate)
else:
min_dist_samples = int(tau_decay * sampling_rate)
if min_dist_samples < 1:
min_dist_samples = 1
peak_indices, _ = signal.find_peaks(z_score_trace, height=threshold_std, distance=min_dist_samples)
# Peak refinement: z_score peaks are aligned to the primary kernel reference,
# so apply only the primary kernel's offset when searching for the raw data peak.
kernel_peak_idx = int(np.argmax(primary_kernel))
template_offset = kernel_peak_idx - kernel_center_0
refine_window = max(3, int(tau_rise * sampling_rate))
if len(peak_indices) > 0:
corrected_indices = np.empty_like(peak_indices)
for i, idx in enumerate(peak_indices):
shifted = idx + template_offset
win_start = max(0, shifted - refine_window)
win_end = min(n_points, shifted + refine_window + 1)
local_peak = np.argmax(work_data[win_start:win_end])
corrected_indices[i] = win_start + local_peak
peak_indices = np.unique(corrected_indices)
if artifact_mask is not None and len(peak_indices) > 0:
n_mask = len(artifact_mask)
valid_mask = peak_indices < n_mask
not_artifact = ~artifact_mask[peak_indices[valid_mask]]
peak_indices = peak_indices[valid_mask][not_artifact]
event_count = len(peak_indices)
event_indices = peak_indices.astype(int)
event_amplitudes = baseline_corrected_data[event_indices] if event_count > 0 else np.array([])
if time is not None and len(time) == n_points:
time_axis = time
else:
time_axis = np.arange(n_points) * dt
event_times = time_axis[event_indices] if event_count > 0 else np.array([])
return EventDetectionResult(
value=event_count,
unit="counts",
is_valid=True,
event_count=event_count,
event_indices=event_indices,
event_times=event_times,
event_amplitudes=event_amplitudes,
detection_method="template_matching",
tau_rise_ms=tau_rise * 1000.0,
tau_decay_ms=tau_decay * 1000.0,
threshold_sd=threshold_std,
summary_stats={"noise_mad": mad},
direction=polarity,
artifact_mask=artifact_mask,
)
except Exception as e:
log.error(f"Error during template event detection: {e}", exc_info=True)
return EventDetectionResult(value=0, unit="Hz", is_valid=False, error_message=str(e))
[docs]
@AnalysisRegistry.register(
"event_detection_deconvolution",
label="Event (Template Match)",
plots=[
{"name": "Trace", "type": "trace", "show_spikes": True},
{"type": "markers", "x": "_event_times", "y": "_event_peaks", "color": "r", "symbol": "o"},
{"type": "threshold_line", "param": "threshold_sd"},
{"type": "artifact_overlay"},
],
ui_params=[
{
"name": "tau_rise_ms",
"label": "Tau Rise (ms):",
"type": "float",
"default": 0.5,
"min": 0.0,
"max": 1e9,
"decimals": 4,
},
{
"name": "tau_decay_ms",
"label": "Tau Decay (ms):",
"type": "float",
"default": 5.0,
"min": 0.0,
"max": 1e9,
"decimals": 4,
},
{
"name": "threshold_sd",
"label": "Threshold (SD):",
"type": "float",
"default": 4.0,
"min": 0.0,
"max": 1e9,
"decimals": 4,
},
{
"name": "direction",
"label": "Direction:",
"type": "choice",
"choices": ["negative", "positive"],
"default": "negative",
},
{
"name": "rolling_baseline_window_ms",
"label": "Rolling Baseline (ms):",
"type": "float",
"default": 100.0,
"min": 0.0,
"max": 5000.0,
"decimals": 1,
},
{"name": "reject_artifacts", "label": "Reject Artifacts", "type": "bool", "default": False},
{
"name": "artifact_slope_threshold",
"label": "Artifact Slope Thresh:",
"type": "float",
"default": 20.0,
"min": 0.0,
},
{"name": "artifact_padding_ms", "label": "Artifact Padding (ms):", "type": "float", "default": 2.0},
{
"name": "min_event_distance_ms",
"label": "Min Event Distance (ms):",
"type": "float",
"default": 0.0,
"min": 0.0,
"max": 1000.0,
"decimals": 1,
"tooltip": "Minimum distance between events (ms). 0 = use tau_decay.",
},
{
"name": "filter_freq_hz",
"label": "Filter Freq (Hz):",
"type": "float",
"default": 0.0,
"min": 0.0,
"max": 100000.0,
"decimals": 1,
"hidden": True,
},
],
)
def run_event_detection_template_wrapper(
data: np.ndarray, time: np.ndarray, sampling_rate: float, **kwargs
) -> Dict[str, Any]:
"""Wrapper for template-matching event detection."""
tau_rise_ms = kwargs.get("tau_rise_ms", 0.5)
tau_decay_ms = kwargs.get("tau_decay_ms", 5.0)
threshold_sd = kwargs.get("threshold_sd", 4.0)
direction = kwargs.get("direction", "negative")
tau_rise = tau_rise_ms / 1000.0
tau_decay = tau_decay_ms / 1000.0
reject_artifacts = kwargs.get("reject_artifacts", False)
artifact_mask = None
if reject_artifacts:
slope_thresh = kwargs.get("artifact_slope_threshold", 20.0)
padding_ms = kwargs.get("artifact_padding_ms", 2.0)
artifact_mask = find_artifact_windows(data, sampling_rate, slope_thresh, padding_ms)
result = detect_events_template(
data=data,
sampling_rate=sampling_rate,
threshold_std=threshold_sd,
tau_rise=tau_rise,
tau_decay=tau_decay,
polarity=direction,
rolling_baseline_window_ms=kwargs.get("rolling_baseline_window_ms", 100.0),
artifact_mask=artifact_mask,
time=time,
min_event_distance_ms=kwargs.get("min_event_distance_ms", 0.0),
)
if not result.is_valid:
return {"module_used": "synaptic_events", "metrics": {"event_error": result.error_message}}
_idx = np.asarray(result.event_indices if result.event_indices is not None else [], dtype=int)
# Compute local pre-event baseline for each event (handles summating events).
local_baselines = compute_local_pre_event_baseline(data, _idx, sampling_rate, polarity=direction)
if len(_idx) > 0:
if direction == "negative":
local_amplitudes = local_baselines - data[_idx]
else:
local_amplitudes = data[_idx] - local_baselines
else:
local_amplitudes = np.array([], dtype=float)
return {
"module_used": "synaptic_events",
"metrics": {
"event_count": result.event_count,
"tau_rise_ms": result.tau_rise_ms,
"tau_decay_ms": result.tau_decay_ms,
"threshold_sd": result.threshold_sd,
"mean_local_amplitude": float(np.mean(local_amplitudes)) if local_amplitudes.size > 0 else 0.0,
"_event_times": time[_idx].tolist() if len(_idx) > 0 else [],
"_event_peaks": data[_idx].tolist() if len(_idx) > 0 else [],
"_local_baselines": local_baselines.tolist() if local_baselines.size > 0 else [],
"_local_amplitudes": local_amplitudes.tolist() if local_amplitudes.size > 0 else [],
"_result_obj": result,
},
}
# ---------------------------------------------------------------------------
# 3. Baseline + Peak + Kinetics Detection
# ---------------------------------------------------------------------------
def _find_stable_baseline_segment(
data: np.ndarray,
sample_rate: float,
window_duration_s: float = 0.5,
step_duration_s: float = 0.1,
) -> Tuple[Optional[float], Optional[float], Optional[Tuple[int, int]]]:
"""Find the most stable (lowest-variance) baseline segment."""
n_points = len(data)
window_samples = max(2, int(window_duration_s * sample_rate))
step_samples = max(1, int(step_duration_s * sample_rate))
if window_samples >= n_points:
return float(np.mean(data)), float(np.std(data)), (0, n_points)
min_variance = np.inf
best: Optional[Tuple[int, int]] = None
best_mean: Optional[float] = None
best_sd: Optional[float] = None
for i in range(0, n_points - window_samples + 1, step_samples):
segment = data[i : i + window_samples]
variance = float(np.var(segment))
if variance < min_variance:
min_variance = variance
best = (i, i + window_samples)
best_mean = float(np.mean(segment))
best_sd = float(np.sqrt(variance))
return best_mean, best_sd, best
[docs]
def detect_events_baseline_peak_kinetics( # noqa: C901
data: np.ndarray,
sample_rate: float,
direction: str = "negative",
baseline_window_s: float = 0.5,
baseline_step_s: float = 0.1,
threshold_sd_factor: float = 3.0,
filter_freq_hz: Optional[float] = None,
min_event_separation_ms: float = 5.0,
auto_baseline: bool = True,
rolling_baseline_window_ms: float = 0.0,
) -> EventDetectionResult:
"""Detect events via stable-baseline estimation then prominence-based peak finding."""
if direction not in ["negative", "positive"]:
return EventDetectionResult(value=0, unit="counts", is_valid=False, error_message="Invalid direction")
baseline_mean, baseline_sd, _ = _find_stable_baseline_segment(data, sample_rate, baseline_window_s, baseline_step_s)
if baseline_mean is None:
return EventDetectionResult(value=0, unit="counts", is_valid=True, event_count=0)
is_negative = direction == "negative"
if rolling_baseline_window_ms > 0:
from scipy.ndimage import median_filter
window_samples = int((rolling_baseline_window_ms / 1000.0) * sample_rate)
if window_samples % 2 == 0:
window_samples += 1
if window_samples >= 3:
rolling_bl = median_filter(data, size=window_samples)
work_data = data - rolling_bl
else:
work_data = data
else:
work_data = data
signal_to_process = -work_data if is_negative else work_data
noise_sd = median_abs_deviation(signal_to_process, scale="normal")
if noise_sd == 0:
noise_sd = baseline_sd if baseline_sd and baseline_sd > 0 else 1e-12
threshold_val = threshold_sd_factor * noise_sd
if filter_freq_hz and filter_freq_hz > 0:
try:
sos = signal.butter(4, filter_freq_hz, "low", fs=sample_rate, output="sos")
signal_c = np.ascontiguousarray(signal_to_process, dtype=np.float64)
sos_c = np.ascontiguousarray(sos, dtype=np.float64)
fwd = signal.sosfilt(sos_c, signal_c)
filtered = signal.sosfilt(sos_c, fwd[::-1])[::-1]
except (ValueError, TypeError, IndexError):
filtered = signal_to_process
else:
filtered = signal_to_process
min_dist = max(1, int(min_event_separation_ms / 1000.0 * sample_rate))
min_width = max(2, int(0.0002 * sample_rate))
peaks, _ = signal.find_peaks(
filtered,
height=threshold_val,
prominence=threshold_val * 0.5,
distance=min_dist,
width=min_width,
)
display_threshold_val = -threshold_val if is_negative else threshold_val
return EventDetectionResult(
value=len(peaks),
unit="counts",
is_valid=True,
event_count=len(peaks),
event_indices=peaks,
detection_method="baseline_peak",
summary_stats={"baseline_mean": baseline_mean, "baseline_sd": float(noise_sd)},
threshold_value=display_threshold_val,
)
[docs]
@AnalysisRegistry.register(
"event_detection_baseline_peak",
label="Event (Baseline Peak)",
plots=[
{"name": "Trace", "type": "trace", "show_spikes": True},
{"type": "markers", "x": "_event_times", "y": "_event_peaks", "color": "r", "symbol": "o"},
{"type": "artifact_overlay"},
],
ui_params=[
{
"name": "direction",
"label": "Direction:",
"type": "choice",
"choices": ["negative", "positive"],
"default": "negative",
},
{"name": "auto_baseline", "label": "Auto-Detect Baseline", "type": "bool", "default": True},
{"name": "threshold_sd_factor", "label": "Threshold (SD Factor):", "type": "float", "default": 3.0},
{
"name": "min_event_separation_ms",
"label": "Min Separation (ms):",
"type": "float",
"default": 5.0,
"min": 0.1,
"max": 1000.0,
"decimals": 1,
},
{
"name": "rolling_baseline_window_ms",
"label": "Rolling Baseline (ms):",
"type": "float",
"default": 100.0,
"min": 0.0,
"max": 5000.0,
"decimals": 1,
},
{
"name": "baseline_window_s",
"label": "Baseline Win (s):",
"type": "float",
"default": 0.5,
"min": 0.01,
"max": 100.0,
"decimals": 2,
},
{
"name": "baseline_step_s",
"label": "Baseline Step (s):",
"type": "float",
"default": 0.1,
"min": 0.01,
"max": 100.0,
"decimals": 2,
},
],
)
def run_event_detection_baseline_peak_wrapper(
data: np.ndarray, time: np.ndarray, sampling_rate: float, **kwargs
) -> Dict[str, Any]:
"""Wrapper for baseline-peak event detection."""
direction = kwargs.get("direction", "negative")
result = detect_events_baseline_peak_kinetics(
data,
sampling_rate,
direction=direction,
threshold_sd_factor=kwargs.get("threshold_sd_factor", 3.0),
min_event_separation_ms=kwargs.get("min_event_separation_ms", 5.0),
auto_baseline=kwargs.get("auto_baseline", True),
baseline_window_s=kwargs.get("baseline_window_s", 0.5),
baseline_step_s=kwargs.get("baseline_step_s", 0.1),
rolling_baseline_window_ms=kwargs.get("rolling_baseline_window_ms", 100.0),
)
if not result.is_valid:
return {"module_used": "synaptic_events", "metrics": {"event_error": result.error_message}}
_idx = np.asarray(result.event_indices if result.event_indices is not None else [], dtype=int)
return {
"module_used": "synaptic_events",
"metrics": {
"event_count": result.event_count,
"_event_times": time[_idx].tolist() if len(_idx) > 0 else [],
"_event_peaks": data[_idx].tolist() if len(_idx) > 0 else [],
"_result_obj": result,
},
}
# ---------------------------------------------------------------------------
# Module-level tab aggregator
# ---------------------------------------------------------------------------
[docs]
@AnalysisRegistry.register(
"synaptic_events",
label="Synaptic Events",
method_selector={
"Threshold Based": "event_detection_threshold",
"Deconvolution (Custom)": "event_detection_deconvolution",
"Baseline + Peak + Kinetics": "event_detection_baseline_peak",
},
ui_params=[],
plots=[],
)
def synaptic_events_module(**kwargs):
"""Module-level aggregator tab for synaptic event detection analyses."""
return {}