Coverage for src/driada/experiment/synthetic/time_series.py: 98.70%

77 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-25 15:40 +0300

1""" 

2Time series utilities for synthetic data generation. 

3 

4This module contains functions for generating various types of time series 

5used in synthetic neural data, including binary series, fractional Brownian motion, 

6and signal processing utilities. 

7""" 

8 

9import numpy as np 

10from fbm import FBM 

11import itertools 

12from itertools import groupby 

13 

14 

15def generate_binary_time_series(length, avg_islands, avg_duration): 

16 """ 

17 Generate a binary time series with islands of 1s. 

18  

19 Parameters 

20 ---------- 

21 length : int 

22 Length of the time series. 

23 avg_islands : int 

24 Average number of islands (continuous stretches of 1s). 

25 avg_duration : int 

26 Average duration of each island. 

27  

28 Returns 

29 ------- 

30 ndarray 

31 Binary time series. 

32 """ 

33 series = np.zeros(length, dtype=int) 

34 islands_count = 0 

35 current_state = 0 # 0 for off, 1 for on 

36 position = 0 

37 

38 while position < length: 

39 if current_state == 0: 

40 # When off, decide how long to stay off based on desired number of islands 

41 # Lower avg_islands means longer off periods to ensure fewer islands 

42 off_duration = max(1, int(np.random.exponential(length / (avg_islands * 2)))) 

43 duration = min(off_duration, length - position) 

44 else: 

45 # When on, stay on for the average duration +/- some randomness 

46 duration = max(1, int(np.random.normal(avg_duration, avg_duration / 2))) 

47 islands_count += 1 

48 

49 # Ensure we don't go past the series length 

50 duration = min(duration, length - position) 

51 

52 # Fill the series with the current state 

53 series[position:position + duration] = current_state 

54 

55 # Switch state 

56 current_state = 1 - current_state 

57 position += duration 

58 

59 # Adjust series to match the desired number of islands 

60 actual_islands = sum(1 for value, group in itertools.groupby(series) if value == 1) 

61 while actual_islands != avg_islands: 

62 if actual_islands < avg_islands: 

63 # If we have too few islands, turn on a random '0' to create a new island 

64 zero_positions = np.where(series == 0)[0] 

65 if len(zero_positions) > 0: 

66 turn_on = np.random.choice(zero_positions) 

67 series[turn_on] = 1 

68 actual_islands += 1 

69 else: 

70 # If we have too many islands, turn off a random '1' to merge islands 

71 one_positions = np.where(series == 1)[0] 

72 if len(one_positions) > 1: 

73 turn_off = np.random.choice(one_positions) 

74 series[turn_off] = 0 

75 actual_islands -= 1 

76 

77 return series 

78 

79 

80def apply_poisson_to_binary_series(binary_series, rate_0, rate_1): 

81 """ 

82 Apply Poisson sampling to a binary series based on its state. 

83  

84 Parameters 

85 ---------- 

86 binary_series : ndarray 

87 Binary time series (0s and 1s). 

88 rate_0 : float 

89 Poisson rate for 0 state. 

90 rate_1 : float 

91 Poisson rate for 1 state. 

92  

93 Returns 

94 ------- 

95 ndarray 

96 Poisson-sampled series. 

97 """ 

98 length = len(binary_series) 

99 poisson_series = np.zeros(length, dtype=int) 

100 

101 current_pos = 0 

102 for value, group in itertools.groupby(binary_series): 

103 run_length = len(list(group)) 

104 if value == 0: 

105 poisson_series[current_pos:current_pos + run_length] = np.random.poisson(rate_0, run_length) 

106 else: 

107 poisson_series[current_pos:current_pos + run_length] = np.random.poisson(rate_1, run_length) 

108 current_pos += run_length 

109 

110 return poisson_series 

111 

112 

113def delete_one_islands(binary_ts, probability): 

114 """ 

115 Delete islands of 1s from a binary time series with given probability. 

116  

117 Parameters 

118 ---------- 

119 binary_ts : ndarray 

120 Binary time series. 

121 probability : float 

122 Probability of deleting each island. 

123  

124 Returns 

125 ------- 

126 ndarray 

127 Modified binary time series. 

128 """ 

129 # Ensure binary_ts is binary 

130 if not np.all(np.isin(binary_ts, [0, 1])): 

131 raise ValueError("binary_ts must be binary (0s and 1s)") 

132 

133 # Create a copy of the input array 

134 result = binary_ts.copy() 

135 

136 # Identify islands of 1s using groupby 

137 start = 0 

138 for key, group in groupby(binary_ts): 

139 length = sum(1 for _ in group) # Count elements in the group 

140 if key == 1 and np.random.random() < probability: 

141 result[start:start + length] = 0 

142 start += length 

143 

144 return result 

145 

146 

147def generate_fbm_time_series(length, hurst, seed=None, roll_shift=None): 

148 """ 

149 Generate fractional Brownian motion time series. 

150  

151 Parameters 

152 ---------- 

153 length : int 

154 Length of the series. 

155 hurst : float 

156 Hurst parameter (0.5 = standard Brownian motion). 

157 seed : int, optional 

158 Random seed. 

159 roll_shift : int, optional 

160 Circular shift to apply. 

161  

162 Returns 

163 ------- 

164 ndarray 

165 FBM time series. 

166 """ 

167 if seed is not None: 

168 np.random.seed(seed) 

169 

170 f = FBM(n=length-1, hurst=hurst, length=1.0, method='daviesharte') 

171 fbm_series = f.fbm() 

172 

173 # Apply circular shift to break correlation with spatial trajectory 

174 if roll_shift is not None: 

175 fbm_series = np.roll(fbm_series, roll_shift) 

176 

177 return fbm_series 

178 

179 

180def select_signal_roi(values, seed=42): 

181 """ 

182 Select a region of interest (ROI) from signal values. 

183  

184 Parameters 

185 ---------- 

186 values : ndarray 

187 Signal values. 

188 seed : int 

189 Random seed. 

190  

191 Returns 

192 ------- 

193 tuple 

194 (center, lower_border, upper_border) of the ROI. 

195 """ 

196 mean = np.mean(values) 

197 std = np.std(values) 

198 

199 np.random.seed(seed) 

200 # Select random location within mean ± 2*std 

201 loc = np.random.uniform(mean - 1.5 * std, mean + 1.5 * std) 

202 

203 # Define borders 

204 lower_border = loc - 0.5 * std 

205 upper_border = loc + 0.5 * std 

206 

207 return loc, lower_border, upper_border 

208 

209 

210def discretize_via_roi(continuous_signal, seed=None): 

211 """ 

212 Discretize a continuous signal based on ROI selection. 

213  

214 Parameters 

215 ---------- 

216 continuous_signal : ndarray 

217 Continuous signal to discretize. 

218 seed : int, optional 

219 Random seed for ROI selection. 

220  

221 Returns 

222 ------- 

223 ndarray 

224 Binary discretized signal. 

225 """ 

226 if seed is not None: 

227 np.random.seed(seed) 

228 

229 # Get ROI boundaries 

230 _, lower, upper = select_signal_roi(continuous_signal, seed=seed) 

231 

232 # Create binary signal based on ROI 

233 binary_signal = ((continuous_signal >= lower) & (continuous_signal <= upper)).astype(int) 

234 

235 return binary_signal