Coverage for src / molecular_simulations / analysis / autocluster.py: 95%
121 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 10:07 -0600
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 10:07 -0600
1import json
2import numpy as np
3from pathlib import Path
4import polars as pl
5from sklearn.cluster import KMeans
6from sklearn.decomposition import PCA
7from sklearn.metrics import silhouette_score
8from tqdm import tqdm
9from typing import Any, Type, TypeVar, Union
11PathLike = Union[Path, str]
12_T = TypeVar('_T')
14class GenericDataloader:
15 """Loads any generic data stored in numpy arrays and stores the full
16 dataset. Capable of loading data with variable row lengths but must
17 be consistent in the columnar dimension.
19 Arguments:
20 data_files (list[PathLike]): List of paths to input data files.
21 """
22 def __init__(self,
23 data_files: list[PathLike]):
24 self.files = data_files
25 self.load_data()
27 def load_data(self) -> None:
28 """Lumps data into one large array.
30 Returns:
31 None
32 """
33 self.data_array = []
34 self.shapes = []
35 for f in self.files:
36 temp = np.load(str(f))
37 self.shapes.append(temp.shape)
38 self.data_array.append(temp)
40 self.data_array = np.vstack(self.data_array)
41 if len(self.data_array) > 2:
42 x, *y = self.data_array.shape
43 shape2 = 1
44 for shape in y:
45 shape2 *= shape
47 self.data_array = self.data_array.reshape(x, shape2)
49 @property
50 def data(self) -> np.ndarray:
51 """Property for storing data array.
53 Returns:
54 (np.ndarray): Internal data array.
55 """
56 return self.data_array
58 @property
59 def shape(self) -> tuple[int]:
60 """Property for storing shapes of input data.
62 Returns:
63 (tuple[int]): Shape of each individual data file, or if they have
64 different shapes, the shape of each based on the order they were
65 provided to this class.
66 """
67 if len(set(self.shapes)) == 1:
68 return self.shapes[0]
70 return self.shapes
73class PeriodicDataloader(GenericDataloader):
74 """Decomposes periodic data using sin and cos, returning double the features.
75 """
76 def __init__(self,
77 data_files: list[PathLike]):
78 super().__init__(data_files)
80 def load_data(self) -> None:
81 """Loads file of input periodic data.
83 Returns:
84 None
85 """
86 self.data_array = []
87 self.shapes = []
88 for f in self.files:
89 temp = self.remove_periodicity(np.load(str(f)))
91 self.shapes.append(temp.shape)
92 self.data_array.append(temp)
94 self.data_array = np.vstack(self.data_array)
96 def remove_periodicity(self,
97 arr: np.ndarray) -> np.ndarray:
98 """Removes periodicity from each feature using sin and cos. Each
99 column is expanded into two such that the indices become
100 i -> 2*i, 2*i + 1.
102 Arguments:
103 arr (np.ndarray): Data to perform decomposition on.
105 Returns:
106 (np.ndarray): New array which should be shape (arr.shape[0], arr.shape[1] * 2).
107 """
108 n_features = arr.shape[1]
109 return_arr = np.zeros((arr.shape[0], n_features * 2))
111 for i in range(n_features):
112 return_arr[:, 2*i] = np.cos(arr[:, i])
113 return_arr[:, 2*i+1] = np.sin(arr[:, i])
115 return return_arr
118class AutoKMeans:
119 """Performs automatic clustering using KMeans++ including dimensionality
120 reduction of the feature space.
122 Arguments:
123 data_directory (PathLike): Directory where data files can be found.
124 pattern (str): Optional filename pattern to select out subset of npy files
125 using glob.
126 dataloader (Type[_T]): Defaults to GenericDataLoader. Which dataloader to use.
127 max_clusters (int): Defaults to 10. The maximum number of clusters to test
128 during parameter sweep.
129 stride (int): Defaults to 1. Linear stride of number of clusters during
130 parameter sweep. Aids on not testing too many values if number of clusters
131 is high.
132 reduction_algorithm (str): Defaults to PCA. Which dimensionality reduction
133 algorithm to use. Currently only PCA is supported.
134 reduction_kws (dict[str, Any]): Defaults to {'n_components': 2} for PCA. kwargs
135 for supplied reduction_algorithm.
136 """
137 def __init__(self,
138 data_directory: PathLike,
139 pattern: str='',
140 dataloader: Type[_T]=GenericDataloader,
141 max_clusters: int=10,
142 stride: int=1,
143 reduction_algorithm: str='PCA',
144 reduction_kws: dict[str, Any]={'n_components': 2}):
145 self.data_dir = Path(data_directory)
146 self.dataloader = dataloader(list(self.data_dir.glob(f'{pattern}*.npy')))
147 self.data = self.dataloader.data
148 self.shape = self.dataloader.shape
150 self.n_clusters = max_clusters
151 self.stride = stride
153 self.decomposition = Decomposition(reduction_algorithm, **reduction_kws)
155 def run(self) -> None:
156 """Runs the automated clustering workflow.
158 Returns:
159 None
160 """
161 self.reduce_dimensionality()
162 self.sweep_n_clusters([n for n in range(2, self.n_clusters, self.stride)])
163 self.map_centers_to_frames()
164 self.save_centers()
165 self.save_labels()
167 def reduce_dimensionality(self) -> None:
168 """Performs dimensionality reduction using decomposer of choice.
170 Returns:
171 None
172 """
173 self.reduced = self.decomposition.fit_transform(self.data)
175 def sweep_n_clusters(self,
176 n_clusters: list[int]) -> None:
177 """Uses silhouette score to perform a parameter sweep over number of clusters.
178 Stores the cluster centers for the best performing parameterization.
180 Arguments:
181 n_clusters (list[int]): List of number of clusters to test.
183 Returns:
184 None
185 """
186 best_centers = None
187 best_labels = None
188 best_score = 0.
189 for n in tqdm(n_clusters, total=len(n_clusters), position=0,
190 leave=False, desc='Sweeping `n_clusters`'):
191 clusterer = KMeans(n_clusters=n)
192 labels = clusterer.fit_predict(self.reduced)
193 average_score = silhouette_score(self.reduced, labels)
195 if average_score > best_score:
196 best_centers = clusterer.cluster_centers_
197 best_labels = labels
198 best_score = average_score
200 self.centers = best_centers
201 self.labels = best_labels
203 def map_centers_to_frames(self) -> None:
204 """Finds and stores the data point which lies closest to the cluster center
205 for each cluster.
207 Returns:
208 None
209 """
210 cluster_centers = {i: None for i in range(len(self.centers))}
211 for i, center in enumerate(self.centers):
212 closest = 100.
213 for p, point in enumerate(self.reduced):
214 if (dist := np.linalg.norm(point - center)) < closest:
215 rep = p // self.shape[0]
216 frame = p % self.shape[0]
217 cluster_centers[i] = (rep, frame)
218 closest = dist
220 self.cluster_centers = cluster_centers
222 def save_centers(self) -> None:
223 """Saves out cluster centers as a json file.
225 Returns:
226 None
227 """
228 with open(str(self.data_dir / 'cluster_centers.json'), 'w') as fout:
229 json.dump(self.cluster_centers, fout, indent=4)
231 def save_labels(self) -> None:
232 """Generates a polars dataframe containing system, frame and cluster label
233 assignments and saves to a parquet file.
235 Returns:
236 None
237 """
238 files = self.dataloader.files
239 if isinstance(self.dataloader.shape, tuple):
240 shapes = [self.dataloader.shape[0]] * len(files)
241 else:
242 shapes = [shape[0] for shape in self.dataloader.shape]
244 df = pl.DataFrame()
245 for file, shape in zip(files, shapes):
246 temp = pl.DataFrame({'system': [file.name] * shape, 'frame': np.arange(shape)})
247 df = pl.concat([df, temp], how='vertical')
249 df = df.with_columns(pl.Series(self.labels).alias('cluster'))
251 df.write_parquet(str(self.data_dir / 'cluster_assignments.parquet'))
254class Decomposition:
255 """Thin wrapper for various dimensionality reduction algorithms. Uses scikit-learn style
256 methods like `fit` and `fit_transform`.
258 Arguments:
259 algorithm (str): Which algorithm to use from PCA, TICA and UMAP. Currently only
260 PCA is supported.
261 kwargs: algorithm-specific kwargs to inject into the decomposer.
262 """
263 def __init__(self,
264 algorithm: str,
265 **kwargs):
266 algorithms = {
267 'PCA': PCA,
268 'TICA': None,
269 'UMAP': None
270 }
272 self.decomposer = algorithms[algorithm](**kwargs)
274 def fit(self,
275 X: np.ndarray) -> None:
276 """Fits the decomposer with data.
278 Arguments:
279 X (np.ndarray): Array of input data.
281 Returns:
282 None
283 """
284 self.decomposer.fit(X)
286 def transform(self,
287 X: np.ndarray) -> np.ndarray:
288 """Returns the reduced dimension data from a decomposer which has
289 already been fit.
291 Arguments:
292 X (np.ndarray): Array of input data.
294 Returns:
295 (np.ndarray): Reduced dimension data.
296 """
297 return self.decomposer.transform(X)
299 def fit_transform(self,
300 X: np.ndarray) -> np.ndarray:
301 """Fits the decomposer with data and returns the reduced dimension
302 data.
304 Arguments:
305 X (np.ndarray): Array of input data.
307 Returns:
308 (np.ndarray): Reduced dimension data.
309 """
310 return self.decomposer.fit_transform(X)