Coverage for src/driada/experiment/synthetic/time_series.py: 98.70%
77 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-25 15:40 +0300
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-25 15:40 +0300
1"""
2Time series utilities for synthetic data generation.
4This module contains functions for generating various types of time series
5used in synthetic neural data, including binary series, fractional Brownian motion,
6and signal processing utilities.
7"""
9import numpy as np
10from fbm import FBM
11import itertools
12from itertools import groupby
15def generate_binary_time_series(length, avg_islands, avg_duration):
16 """
17 Generate a binary time series with islands of 1s.
19 Parameters
20 ----------
21 length : int
22 Length of the time series.
23 avg_islands : int
24 Average number of islands (continuous stretches of 1s).
25 avg_duration : int
26 Average duration of each island.
28 Returns
29 -------
30 ndarray
31 Binary time series.
32 """
33 series = np.zeros(length, dtype=int)
34 islands_count = 0
35 current_state = 0 # 0 for off, 1 for on
36 position = 0
38 while position < length:
39 if current_state == 0:
40 # When off, decide how long to stay off based on desired number of islands
41 # Lower avg_islands means longer off periods to ensure fewer islands
42 off_duration = max(1, int(np.random.exponential(length / (avg_islands * 2))))
43 duration = min(off_duration, length - position)
44 else:
45 # When on, stay on for the average duration +/- some randomness
46 duration = max(1, int(np.random.normal(avg_duration, avg_duration / 2)))
47 islands_count += 1
49 # Ensure we don't go past the series length
50 duration = min(duration, length - position)
52 # Fill the series with the current state
53 series[position:position + duration] = current_state
55 # Switch state
56 current_state = 1 - current_state
57 position += duration
59 # Adjust series to match the desired number of islands
60 actual_islands = sum(1 for value, group in itertools.groupby(series) if value == 1)
61 while actual_islands != avg_islands:
62 if actual_islands < avg_islands:
63 # If we have too few islands, turn on a random '0' to create a new island
64 zero_positions = np.where(series == 0)[0]
65 if len(zero_positions) > 0:
66 turn_on = np.random.choice(zero_positions)
67 series[turn_on] = 1
68 actual_islands += 1
69 else:
70 # If we have too many islands, turn off a random '1' to merge islands
71 one_positions = np.where(series == 1)[0]
72 if len(one_positions) > 1:
73 turn_off = np.random.choice(one_positions)
74 series[turn_off] = 0
75 actual_islands -= 1
77 return series
80def apply_poisson_to_binary_series(binary_series, rate_0, rate_1):
81 """
82 Apply Poisson sampling to a binary series based on its state.
84 Parameters
85 ----------
86 binary_series : ndarray
87 Binary time series (0s and 1s).
88 rate_0 : float
89 Poisson rate for 0 state.
90 rate_1 : float
91 Poisson rate for 1 state.
93 Returns
94 -------
95 ndarray
96 Poisson-sampled series.
97 """
98 length = len(binary_series)
99 poisson_series = np.zeros(length, dtype=int)
101 current_pos = 0
102 for value, group in itertools.groupby(binary_series):
103 run_length = len(list(group))
104 if value == 0:
105 poisson_series[current_pos:current_pos + run_length] = np.random.poisson(rate_0, run_length)
106 else:
107 poisson_series[current_pos:current_pos + run_length] = np.random.poisson(rate_1, run_length)
108 current_pos += run_length
110 return poisson_series
113def delete_one_islands(binary_ts, probability):
114 """
115 Delete islands of 1s from a binary time series with given probability.
117 Parameters
118 ----------
119 binary_ts : ndarray
120 Binary time series.
121 probability : float
122 Probability of deleting each island.
124 Returns
125 -------
126 ndarray
127 Modified binary time series.
128 """
129 # Ensure binary_ts is binary
130 if not np.all(np.isin(binary_ts, [0, 1])):
131 raise ValueError("binary_ts must be binary (0s and 1s)")
133 # Create a copy of the input array
134 result = binary_ts.copy()
136 # Identify islands of 1s using groupby
137 start = 0
138 for key, group in groupby(binary_ts):
139 length = sum(1 for _ in group) # Count elements in the group
140 if key == 1 and np.random.random() < probability:
141 result[start:start + length] = 0
142 start += length
144 return result
147def generate_fbm_time_series(length, hurst, seed=None, roll_shift=None):
148 """
149 Generate fractional Brownian motion time series.
151 Parameters
152 ----------
153 length : int
154 Length of the series.
155 hurst : float
156 Hurst parameter (0.5 = standard Brownian motion).
157 seed : int, optional
158 Random seed.
159 roll_shift : int, optional
160 Circular shift to apply.
162 Returns
163 -------
164 ndarray
165 FBM time series.
166 """
167 if seed is not None:
168 np.random.seed(seed)
170 f = FBM(n=length-1, hurst=hurst, length=1.0, method='daviesharte')
171 fbm_series = f.fbm()
173 # Apply circular shift to break correlation with spatial trajectory
174 if roll_shift is not None:
175 fbm_series = np.roll(fbm_series, roll_shift)
177 return fbm_series
180def select_signal_roi(values, seed=42):
181 """
182 Select a region of interest (ROI) from signal values.
184 Parameters
185 ----------
186 values : ndarray
187 Signal values.
188 seed : int
189 Random seed.
191 Returns
192 -------
193 tuple
194 (center, lower_border, upper_border) of the ROI.
195 """
196 mean = np.mean(values)
197 std = np.std(values)
199 np.random.seed(seed)
200 # Select random location within mean ± 2*std
201 loc = np.random.uniform(mean - 1.5 * std, mean + 1.5 * std)
203 # Define borders
204 lower_border = loc - 0.5 * std
205 upper_border = loc + 0.5 * std
207 return loc, lower_border, upper_border
210def discretize_via_roi(continuous_signal, seed=None):
211 """
212 Discretize a continuous signal based on ROI selection.
214 Parameters
215 ----------
216 continuous_signal : ndarray
217 Continuous signal to discretize.
218 seed : int, optional
219 Random seed for ROI selection.
221 Returns
222 -------
223 ndarray
224 Binary discretized signal.
225 """
226 if seed is not None:
227 np.random.seed(seed)
229 # Get ROI boundaries
230 _, lower, upper = select_signal_roi(continuous_signal, seed=seed)
232 # Create binary signal based on ROI
233 binary_signal = ((continuous_signal >= lower) & (continuous_signal <= upper)).astype(int)
235 return binary_signal