Coverage for src / molecular_simulations / analysis / autocluster.py: 95%

121 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-12 10:07 -0600

1import json 

2import numpy as np 

3from pathlib import Path 

4import polars as pl 

5from sklearn.cluster import KMeans 

6from sklearn.decomposition import PCA 

7from sklearn.metrics import silhouette_score 

8from tqdm import tqdm 

9from typing import Any, Type, TypeVar, Union 

10 

11PathLike = Union[Path, str] 

12_T = TypeVar('_T') 

13 

14class GenericDataloader: 

15 """Loads any generic data stored in numpy arrays and stores the full 

16 dataset. Capable of loading data with variable row lengths but must 

17 be consistent in the columnar dimension. 

18 

19 Arguments: 

20 data_files (list[PathLike]): List of paths to input data files. 

21 """ 

22 def __init__(self, 

23 data_files: list[PathLike]): 

24 self.files = data_files 

25 self.load_data() 

26 

27 def load_data(self) -> None: 

28 """Lumps data into one large array. 

29 

30 Returns: 

31 None 

32 """ 

33 self.data_array = [] 

34 self.shapes = [] 

35 for f in self.files: 

36 temp = np.load(str(f)) 

37 self.shapes.append(temp.shape) 

38 self.data_array.append(temp) 

39 

40 self.data_array = np.vstack(self.data_array) 

41 if len(self.data_array) > 2: 

42 x, *y = self.data_array.shape 

43 shape2 = 1 

44 for shape in y: 

45 shape2 *= shape 

46 

47 self.data_array = self.data_array.reshape(x, shape2) 

48 

49 @property 

50 def data(self) -> np.ndarray: 

51 """Property for storing data array. 

52 

53 Returns: 

54 (np.ndarray): Internal data array. 

55 """ 

56 return self.data_array 

57 

58 @property 

59 def shape(self) -> tuple[int]: 

60 """Property for storing shapes of input data. 

61  

62 Returns: 

63 (tuple[int]): Shape of each individual data file, or if they have  

64 different shapes, the shape of each based on the order they were 

65 provided to this class. 

66 """ 

67 if len(set(self.shapes)) == 1: 

68 return self.shapes[0] 

69 

70 return self.shapes 

71 

72 

73class PeriodicDataloader(GenericDataloader): 

74 """Decomposes periodic data using sin and cos, returning double the features. 

75 """ 

76 def __init__(self, 

77 data_files: list[PathLike]): 

78 super().__init__(data_files) 

79 

80 def load_data(self) -> None: 

81 """Loads file of input periodic data. 

82 

83 Returns: 

84 None 

85 """ 

86 self.data_array = [] 

87 self.shapes = [] 

88 for f in self.files: 

89 temp = self.remove_periodicity(np.load(str(f))) 

90 

91 self.shapes.append(temp.shape) 

92 self.data_array.append(temp) 

93 

94 self.data_array = np.vstack(self.data_array) 

95 

96 def remove_periodicity(self, 

97 arr: np.ndarray) -> np.ndarray: 

98 """Removes periodicity from each feature using sin and cos. Each 

99 column is expanded into two such that the indices become 

100 i -> 2*i, 2*i + 1. 

101 

102 Arguments: 

103 arr (np.ndarray): Data to perform decomposition on. 

104 

105 Returns: 

106 (np.ndarray): New array which should be shape (arr.shape[0], arr.shape[1] * 2). 

107 """ 

108 n_features = arr.shape[1] 

109 return_arr = np.zeros((arr.shape[0], n_features * 2)) 

110 

111 for i in range(n_features): 

112 return_arr[:, 2*i] = np.cos(arr[:, i]) 

113 return_arr[:, 2*i+1] = np.sin(arr[:, i]) 

114 

115 return return_arr 

116 

117 

118class AutoKMeans: 

119 """Performs automatic clustering using KMeans++ including dimensionality  

120 reduction of the feature space. 

121 

122 Arguments: 

123 data_directory (PathLike): Directory where data files can be found. 

124 pattern (str): Optional filename pattern to select out subset of npy files 

125 using glob.  

126 dataloader (Type[_T]): Defaults to GenericDataLoader. Which dataloader to use. 

127 max_clusters (int): Defaults to 10. The maximum number of clusters to test 

128 during parameter sweep. 

129 stride (int): Defaults to 1. Linear stride of number of clusters during  

130 parameter sweep. Aids on not testing too many values if number of clusters 

131 is high. 

132 reduction_algorithm (str): Defaults to PCA. Which dimensionality reduction 

133 algorithm to use. Currently only PCA is supported. 

134 reduction_kws (dict[str, Any]): Defaults to {'n_components': 2} for PCA. kwargs 

135 for supplied reduction_algorithm. 

136 """ 

137 def __init__(self, 

138 data_directory: PathLike, 

139 pattern: str='', 

140 dataloader: Type[_T]=GenericDataloader, 

141 max_clusters: int=10, 

142 stride: int=1, 

143 reduction_algorithm: str='PCA', 

144 reduction_kws: dict[str, Any]={'n_components': 2}): 

145 self.data_dir = Path(data_directory) 

146 self.dataloader = dataloader(list(self.data_dir.glob(f'{pattern}*.npy'))) 

147 self.data = self.dataloader.data 

148 self.shape = self.dataloader.shape 

149 

150 self.n_clusters = max_clusters 

151 self.stride = stride 

152 

153 self.decomposition = Decomposition(reduction_algorithm, **reduction_kws) 

154 

155 def run(self) -> None: 

156 """Runs the automated clustering workflow. 

157 

158 Returns: 

159 None 

160 """ 

161 self.reduce_dimensionality() 

162 self.sweep_n_clusters([n for n in range(2, self.n_clusters, self.stride)]) 

163 self.map_centers_to_frames() 

164 self.save_centers() 

165 self.save_labels() 

166 

167 def reduce_dimensionality(self) -> None: 

168 """Performs dimensionality reduction using decomposer of choice. 

169 

170 Returns: 

171 None 

172 """ 

173 self.reduced = self.decomposition.fit_transform(self.data) 

174 

175 def sweep_n_clusters(self, 

176 n_clusters: list[int]) -> None: 

177 """Uses silhouette score to perform a parameter sweep over number of clusters. 

178 Stores the cluster centers for the best performing parameterization. 

179 

180 Arguments: 

181 n_clusters (list[int]): List of number of clusters to test. 

182 

183 Returns: 

184 None 

185 """ 

186 best_centers = None 

187 best_labels = None 

188 best_score = 0. 

189 for n in tqdm(n_clusters, total=len(n_clusters), position=0, 

190 leave=False, desc='Sweeping `n_clusters`'): 

191 clusterer = KMeans(n_clusters=n) 

192 labels = clusterer.fit_predict(self.reduced) 

193 average_score = silhouette_score(self.reduced, labels) 

194 

195 if average_score > best_score: 

196 best_centers = clusterer.cluster_centers_ 

197 best_labels = labels 

198 best_score = average_score 

199 

200 self.centers = best_centers 

201 self.labels = best_labels 

202 

203 def map_centers_to_frames(self) -> None: 

204 """Finds and stores the data point which lies closest to the cluster center  

205 for each cluster. 

206 

207 Returns: 

208 None 

209 """ 

210 cluster_centers = {i: None for i in range(len(self.centers))} 

211 for i, center in enumerate(self.centers): 

212 closest = 100. 

213 for p, point in enumerate(self.reduced): 

214 if (dist := np.linalg.norm(point - center)) < closest: 

215 rep = p // self.shape[0] 

216 frame = p % self.shape[0] 

217 cluster_centers[i] = (rep, frame) 

218 closest = dist 

219 

220 self.cluster_centers = cluster_centers 

221 

222 def save_centers(self) -> None: 

223 """Saves out cluster centers as a json file. 

224 

225 Returns: 

226 None 

227 """ 

228 with open(str(self.data_dir / 'cluster_centers.json'), 'w') as fout: 

229 json.dump(self.cluster_centers, fout, indent=4) 

230 

231 def save_labels(self) -> None: 

232 """Generates a polars dataframe containing system, frame and cluster label  

233 assignments and saves to a parquet file. 

234 

235 Returns: 

236 None 

237 """ 

238 files = self.dataloader.files 

239 if isinstance(self.dataloader.shape, tuple): 

240 shapes = [self.dataloader.shape[0]] * len(files) 

241 else: 

242 shapes = [shape[0] for shape in self.dataloader.shape] 

243 

244 df = pl.DataFrame() 

245 for file, shape in zip(files, shapes): 

246 temp = pl.DataFrame({'system': [file.name] * shape, 'frame': np.arange(shape)}) 

247 df = pl.concat([df, temp], how='vertical') 

248 

249 df = df.with_columns(pl.Series(self.labels).alias('cluster')) 

250 

251 df.write_parquet(str(self.data_dir / 'cluster_assignments.parquet')) 

252 

253 

254class Decomposition: 

255 """Thin wrapper for various dimensionality reduction algorithms. Uses scikit-learn style 

256 methods like `fit` and `fit_transform`. 

257 

258 Arguments: 

259 algorithm (str): Which algorithm to use from PCA, TICA and UMAP. Currently only 

260 PCA is supported. 

261 kwargs: algorithm-specific kwargs to inject into the decomposer. 

262 """ 

263 def __init__(self, 

264 algorithm: str, 

265 **kwargs): 

266 algorithms = { 

267 'PCA': PCA, 

268 'TICA': None, 

269 'UMAP': None 

270 } 

271 

272 self.decomposer = algorithms[algorithm](**kwargs) 

273 

274 def fit(self, 

275 X: np.ndarray) -> None: 

276 """Fits the decomposer with data. 

277 

278 Arguments: 

279 X (np.ndarray): Array of input data. 

280 

281 Returns: 

282 None 

283 """ 

284 self.decomposer.fit(X) 

285 

286 def transform(self, 

287 X: np.ndarray) -> np.ndarray: 

288 """Returns the reduced dimension data from a decomposer which has  

289 already been fit. 

290 

291 Arguments: 

292 X (np.ndarray): Array of input data. 

293 

294 Returns: 

295 (np.ndarray): Reduced dimension data. 

296 """ 

297 return self.decomposer.transform(X) 

298 

299 def fit_transform(self, 

300 X: np.ndarray) -> np.ndarray: 

301 """Fits the decomposer with data and returns the reduced dimension 

302 data. 

303 

304 Arguments: 

305 X (np.ndarray): Array of input data. 

306 

307 Returns: 

308 (np.ndarray): Reduced dimension data. 

309 """ 

310 return self.decomposer.fit_transform(X)