"""Automated clustering module for molecular dynamics data.
This module provides tools for automatic clustering of molecular dynamics
trajectory data using KMeans++ with dimensionality reduction.
"""
import json
import numpy as np
from pathlib import Path
import polars as pl
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score
from tqdm import tqdm
from typing import Any, Type, TypeVar, Union
PathLike = Union[Path, str]
_T = TypeVar('_T')
[docs]
class GenericDataloader:
"""Loads generic data stored in numpy arrays.
Stores the full dataset and is capable of loading data with variable
row lengths but must be consistent in the columnar dimension.
Attributes:
files: List of paths to the loaded data files.
data_array: The concatenated data array.
shapes: List of shapes of the original data files.
Args:
data_files: List of paths to input data files (.npy format).
Example:
>>> loader = GenericDataloader(['data1.npy', 'data2.npy'])
>>> print(loader.data.shape)
"""
[docs]
def __init__(self, data_files: list[PathLike]):
"""Initialize the dataloader with a list of data files.
Args:
data_files: List of paths to input data files (.npy format).
"""
self.files = data_files
self.load_data()
[docs]
def load_data(self) -> None:
"""Load and concatenate data from all files into one array.
Lumps data into one large array by vertical stacking. If the
resulting array has more than 2 dimensions, it is reshaped to 2D.
"""
self.data_array = []
self.shapes = []
for f in self.files:
temp = np.load(str(f))
self.shapes.append(temp.shape)
self.data_array.append(temp)
self.data_array = np.vstack(self.data_array)
if len(self.data_array) > 2:
x, *y = self.data_array.shape
shape2 = 1
for shape in y:
shape2 *= shape
self.data_array = self.data_array.reshape(x, shape2)
@property
def data(self) -> np.ndarray:
"""Return the internal data array.
Returns:
The concatenated and reshaped data array.
"""
return self.data_array
@property
def shape(self) -> tuple[int]:
"""Return the shape(s) of the input data.
Returns:
If all input files have the same shape, returns that shape.
Otherwise, returns a list of shapes in the order the files
were provided.
"""
if len(set(self.shapes)) == 1:
return self.shapes[0]
return self.shapes
[docs]
class PeriodicDataloader(GenericDataloader):
"""Dataloader that decomposes periodic data using sin and cos.
Extends GenericDataloader to handle periodic data by decomposing
each feature into sin and cos components, effectively doubling
the number of features.
Args:
data_files: List of paths to input data files containing
periodic data (e.g., dihedral angles).
Example:
>>> loader = PeriodicDataloader(['dihedrals.npy'])
>>> # Original 10 features become 20 features
"""
[docs]
def __init__(self, data_files: list[PathLike]):
"""Initialize the periodic dataloader.
Args:
data_files: List of paths to input data files.
"""
super().__init__(data_files)
[docs]
def load_data(self) -> None:
"""Load periodic data and remove periodicity.
Loads each file, applies the periodicity removal transformation,
and stores the results.
"""
self.data_array = []
self.shapes = []
for f in self.files:
temp = self.remove_periodicity(np.load(str(f)))
self.shapes.append(temp.shape)
self.data_array.append(temp)
self.data_array = np.vstack(self.data_array)
[docs]
def remove_periodicity(self, arr: np.ndarray) -> np.ndarray:
"""Remove periodicity from each feature using sin and cos.
Each column is expanded into two columns such that the indices
become i -> 2*i, 2*i + 1. This preserves the circular nature
of periodic variables like angles.
Args:
arr: Data array with periodic features. Shape should be
(n_samples, n_features).
Returns:
New array with shape (arr.shape[0], arr.shape[1] * 2) where
each original feature is replaced by its cos and sin values.
"""
n_features = arr.shape[1]
return_arr = np.zeros((arr.shape[0], n_features * 2))
for i in range(n_features):
return_arr[:, 2*i] = np.cos(arr[:, i])
return_arr[:, 2*i+1] = np.sin(arr[:, i])
return return_arr
[docs]
class AutoKMeans:
"""Automatic clustering using KMeans++ with dimensionality reduction.
Performs automatic clustering including dimensionality reduction of
the feature space and parameter sweep over number of clusters using
silhouette score optimization.
Attributes:
data: The loaded data array.
shape: Shape of the input data.
reduced: Dimensionality-reduced data.
centers: Cluster centers in reduced space.
labels: Cluster assignments for each data point.
cluster_centers: Mapping of cluster index to (replica, frame) tuple.
Args:
data_directory: Directory where data files can be found.
pattern: Optional filename pattern to select subset of npy files
using glob. Defaults to empty string (all .npy files).
dataloader: Which dataloader class to use. Defaults to
GenericDataloader.
max_clusters: Maximum number of clusters to test during parameter
sweep. Defaults to 10.
stride: Linear stride of number of clusters during parameter sweep.
Helps avoid testing too many values. Defaults to 1.
reduction_algorithm: Which dimensionality reduction algorithm to
use. Currently only 'PCA' is supported. Defaults to 'PCA'.
reduction_kws: Keyword arguments for the reduction algorithm.
Defaults to {'n_components': 2}.
Example:
>>> clusterer = AutoKMeans('data/', max_clusters=15)
>>> clusterer.run()
>>> print(clusterer.cluster_centers)
"""
[docs]
def __init__(
self,
data_directory: PathLike,
pattern: str = '',
dataloader: Type[_T] = GenericDataloader,
max_clusters: int = 10,
stride: int = 1,
reduction_algorithm: str = 'PCA',
reduction_kws: dict[str, Any] = {'n_components': 2}
):
"""Initialize the automatic clustering workflow.
Args:
data_directory: Directory where data files can be found.
pattern: Optional filename pattern for glob matching.
dataloader: Dataloader class to use for loading data.
max_clusters: Maximum number of clusters to test.
stride: Step size for cluster number sweep.
reduction_algorithm: Dimensionality reduction method.
reduction_kws: Arguments for the reduction algorithm.
"""
self.data_dir = Path(data_directory)
self.dataloader = dataloader(list(self.data_dir.glob(f'{pattern}*.npy')))
self.data = self.dataloader.data
self.shape = self.dataloader.shape
self.n_clusters = max_clusters
self.stride = stride
self.decomposition = Decomposition(reduction_algorithm, **reduction_kws)
[docs]
def run(self) -> None:
"""Run the complete automated clustering workflow.
Executes dimensionality reduction, parameter sweep, center mapping,
and saves results to disk.
"""
self.reduce_dimensionality()
self.sweep_n_clusters([n for n in range(2, self.n_clusters, self.stride)])
self.map_centers_to_frames()
self.save_centers()
self.save_labels()
[docs]
def reduce_dimensionality(self) -> None:
"""Perform dimensionality reduction on the data.
Uses the configured decomposition algorithm to reduce the
feature space dimensionality.
"""
self.reduced = self.decomposition.fit_transform(self.data)
[docs]
def sweep_n_clusters(self, n_clusters: list[int]) -> None:
"""Sweep over number of clusters to find optimal clustering.
Uses silhouette score to perform a parameter sweep over number
of clusters. Stores the cluster centers and labels for the best
performing parameterization.
Args:
n_clusters: List of cluster numbers to test.
"""
best_centers = None
best_labels = None
best_score = 0.
for n in tqdm(n_clusters, total=len(n_clusters), position=0,
leave=False, desc='Sweeping `n_clusters`'):
clusterer = KMeans(n_clusters=n)
labels = clusterer.fit_predict(self.reduced)
average_score = silhouette_score(self.reduced, labels)
if average_score > best_score:
best_centers = clusterer.cluster_centers_
best_labels = labels
best_score = average_score
self.centers = best_centers
self.labels = best_labels
[docs]
def map_centers_to_frames(self) -> None:
"""Map cluster centers to the closest actual data points.
Finds and stores the data point which lies closest to each
cluster center, recording the replica and frame indices.
"""
cluster_centers = {i: None for i in range(len(self.centers))}
for i, center in enumerate(self.centers):
closest = 100.
for p, point in enumerate(self.reduced):
if (dist := np.linalg.norm(point - center)) < closest:
rep = p // self.shape[0]
frame = p % self.shape[0]
cluster_centers[i] = (rep, frame)
closest = dist
self.cluster_centers = cluster_centers
[docs]
def save_centers(self) -> None:
"""Save cluster centers to a JSON file.
Writes the cluster_centers dictionary to 'cluster_centers.json'
in the data directory.
"""
with open(str(self.data_dir / 'cluster_centers.json'), 'w') as fout:
json.dump(self.cluster_centers, fout, indent=4)
[docs]
def save_labels(self) -> None:
"""Save cluster labels to a Parquet file.
Generates a Polars DataFrame containing system, frame, and cluster
label assignments and saves to 'cluster_assignments.parquet'.
"""
files = self.dataloader.files
if isinstance(self.dataloader.shape, tuple):
shapes = [self.dataloader.shape[0]] * len(files)
else:
shapes = [shape[0] for shape in self.dataloader.shape]
df = pl.DataFrame()
for file, shape in zip(files, shapes):
temp = pl.DataFrame({'system': [file.name] * shape, 'frame': np.arange(shape)})
df = pl.concat([df, temp], how='vertical')
df = df.with_columns(pl.Series(self.labels).alias('cluster'))
df.write_parquet(str(self.data_dir / 'cluster_assignments.parquet'))
[docs]
class Decomposition:
"""Wrapper for dimensionality reduction algorithms.
Provides a thin wrapper around various dimensionality reduction
algorithms using scikit-learn style methods.
Attributes:
decomposer: The underlying decomposition algorithm instance.
Args:
algorithm: Which algorithm to use. Options are 'PCA', 'TICA',
and 'UMAP'. Currently only 'PCA' is fully supported.
**kwargs: Algorithm-specific keyword arguments passed to the
decomposer constructor.
Example:
>>> decomp = Decomposition('PCA', n_components=3)
>>> reduced_data = decomp.fit_transform(data)
"""
[docs]
def __init__(self, algorithm: str, **kwargs):
"""Initialize the decomposition wrapper.
Args:
algorithm: Name of the decomposition algorithm.
**kwargs: Arguments passed to the algorithm constructor.
Raises:
KeyError: If an unsupported algorithm is specified.
"""
algorithms = {
'PCA': PCA,
'TICA': None,
'UMAP': None
}
self.decomposer = algorithms[algorithm](**kwargs)
[docs]
def fit(self, X: np.ndarray) -> None:
"""Fit the decomposer with data.
Args:
X: Array of input data with shape (n_samples, n_features).
"""
self.decomposer.fit(X)